You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2011/11/11 17:55:34 UTC
svn commit: r1200948 - in /cassandra/trunk: ./ bin/ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/cql/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cas...
Author: xedin
Date: Fri Nov 11 16:55:33 2011
New Revision: 1200948
URL: http://svn.apache.org/viewvc?rev=1200948&view=rev
Log:
merge from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/bin/cqlsh (props changed)
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7:1026516-1183000
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0:1167085-1200945
/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Nov 11 16:55:33 2011
@@ -16,6 +16,9 @@
* automatically compute sha1 sum for uncompressed data files (CASSANDRA-3456)
* fix reading metadata/statistics component for version < h (CASSANDRA-3474)
* add sstable forward-compatibility (CASSANDRA-3478)
+ * report compression ratio in CFSMBean (CASSANDRA-3393)
+ * fix incorrect size exception during streaming of counters (CASSANDRA-3481)
+ * (CQL) fix for counter decrement syntax (CASSANDRA-3418)
Merged from 0.8:
* Make counter shard merging thread safe (CASSANDRA-3178)
* fix updating CF row_cache_provider (CASSANDRA-3414)
@@ -26,7 +29,7 @@ Merged from 0.8:
* Revert CASSANDRA-2855
* Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
* `describe ring` command for CLI (CASSANDRA-3220)
-
+ * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
1.0.2
* "defragment" rows for name-based queries under STCS (CASSANDRA-2503)
@@ -46,6 +49,7 @@ Merged from 0.8:
* acquire compactionlock during truncate (CASSANDRA-3399)
* fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
+
1.0.1
* acquire references during index build to prevent delete problems
on Windows (CASSANDRA-3314)
@@ -124,6 +128,7 @@ Merged from 0.8:
* remove incorrect optimization from slice read path (CASSANDRA-3390)
* Fix race in AntiEntropyService (CASSANDRA-3400)
+
1.0.0-final
* close scrubbed sstable fd before deleting it (CASSANDRA-3318)
* fix bug preventing obsolete commitlog segments from being removed
@@ -143,6 +148,7 @@ Merged from 0.8:
* Fix hsha thrift server (CASSANDRA-3346)
* Make sure repair only stream needed sstables (CASSANDRA-3345)
+
1.0.0-rc2
* Log a meaningful warning when a node receives a message for a repair session
that doesn't exist anymore (CASSANDRA-3256)
@@ -165,6 +171,7 @@ Merged from 0.8:
* Evict gossip state immediately when a token is taken over by a new IP
(CASSANDRA-3259)
+
1.0.0-rc1
* Update CQL to generate microsecond timestamps by default (CASSANDRA-3227)
* Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
@@ -187,6 +194,7 @@ Merged from 0.8:
* File descriptor limit increased in packaging (CASSANDRA-3206)
* Fix deadlock in commit log during flush (CASSANDRA-3253)
+
1.0.0-beta1
* removed binarymemtable (CASSANDRA-2692)
* add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
@@ -273,6 +281,7 @@ Merged from 0.8:
* Pluggable compaction strategy (CASSANDRA-1610)
* Add new broadcast_address config option (CASSANDRA-2491)
+
0.8.7
* Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
* Allow using quotes in "USE <keyspace>;" CLI command (CASSANDRA-3208)
@@ -308,6 +317,7 @@ Merged from 0.8:
* Allow using number as DC name when creating keyspace in CQL (CASSANDRA-3239)
* Force flush of system table after updating/removing a token (CASSANDRA-3243)
+
0.8.6
* revert CASSANDRA-2388
* change TokenRange.endpoints back to listen/broadcast address to match
Propchange: cassandra/trunk/bin/cqlsh
------------------------------------------------------------------------------
svn:executable = *
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1183000
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/contrib:1167085-1200945
/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1200945
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1200945
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1200945
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1200945
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1200945
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Fri Nov 11 16:55:33 2011
@@ -59,6 +59,13 @@ options {
if (recognitionErrors.size() > 0)
throw new InvalidRequestException(recognitionErrors.get((recognitionErrors.size()-1)));
}
+
+ // used by UPDATE of the counter columns to validate if '-' was supplied by user
+ public void validateMinusSupplied(Object op, final Term value, IntStream stream) throws MissingTokenException
+ {
+ if (op == null && Long.parseLong(value.getText()) > 0)
+ throw new MissingTokenException(102, stream, value);
+ }
}
@lexer::header {
@@ -455,10 +462,16 @@ termPair[Map<Term, Term> columns]
: key=term '=' value=term { columns.put(key, value); }
;
+intTerm returns [Term integer]
+ : t=INTEGER { $integer = new Term($t.text, $t.type); }
+ ;
+
termPairWithOperation[Map<Term, Operation> columns]
: key=term '=' (value=term { columns.put(key, new Operation(value)); }
- | c=term ( '+' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
- | '-' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
+ | c=term ( '+' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
+ | op='-'? v=intTerm
+ { validateMinusSupplied(op, v, input);
+ columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
;
// Note: ranges are inclusive so >= and >, and < and <= all have the same semantics.
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Fri Nov 11 16:55:33 2011
@@ -220,7 +220,7 @@ public class UpdateStatement extends Abs
if (op.type == OperationType.MINUS)
{
- value *= -1;
+ if (value > 0) value *= -1;
}
}
catch (NumberFormatException e)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Nov 11 16:55:33 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.SSTableMetadata;
@@ -114,10 +115,10 @@ public class ColumnFamilySerializer impl
public ColumnFamily deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false, ThreadSafeSortedColumns.factory());
+ return deserialize(dis, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
}
- public ColumnFamily deserialize(DataInput dis, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+ public ColumnFamily deserialize(DataInput dis, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
{
if (!dis.readBoolean())
return null;
@@ -128,22 +129,22 @@ public class ColumnFamilySerializer impl
throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
ColumnFamily cf = ColumnFamily.create(cfId, factory);
deserializeFromSSTableNoColumns(cf, dis);
- deserializeColumns(dis, cf, fromRemote);
+ deserializeColumns(dis, cf, flag);
return cf;
}
- public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean fromRemote) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag) throws IOException
{
int size = dis.readInt();
- deserializeColumns(dis, cf, size, fromRemote);
+ deserializeColumns(dis, cf, size, flag);
}
/* column count is already read from DataInput */
- public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean fromRemote) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, IColumnSerializer.Flag flag) throws IOException
{
for (int i = 0; i < size; ++i)
{
- IColumn column = cf.getColumnSerializer().deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+ IColumn column = cf.getColumnSerializer().deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
cf.addColumn(column);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Nov 11 16:55:33 2011
@@ -1772,6 +1772,12 @@ public class ColumnFamilyStore implement
return data.getEstimatedColumnCountHistogram();
}
+ @Override
+ public double getCompressionRatio()
+ {
+ return data.getCompressionRatio();
+ }
+
/** true if this CFS contains secondary index data */
public boolean isIndex()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Fri Nov 11 16:55:33 2011
@@ -213,6 +213,7 @@ public interface ColumnFamilyStoreMBean
public long[] getEstimatedRowSizeHistogram();
public long[] getEstimatedColumnCountHistogram();
+ public double getCompressionRatio();
/**
* Returns a list of the names of the built column indexes for current store
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Nov 11 16:55:33 2011
@@ -70,7 +70,7 @@ public class ColumnSerializer implements
public Column deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false);
+ return deserialize(dis, Flag.LOCAL);
}
/*
@@ -78,12 +78,12 @@ public class ColumnSerializer implements
* deserialize comes from a remote host. If it does, then we must clear
* the delta.
*/
- public Column deserialize(DataInput dis, boolean fromRemote) throws IOException
+ public Column deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
{
- return deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+ return deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
}
- public Column deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+ public Column deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
if (name.remaining() <= 0)
@@ -104,7 +104,7 @@ public class ColumnSerializer implements
long timestampOfLastDelete = dis.readLong();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- return CounterColumn.create(name, value, ts, timestampOfLastDelete, fromRemote);
+ return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
}
else if ((b & EXPIRATION_MASK) != 0)
{
@@ -112,7 +112,7 @@ public class ColumnSerializer implements
int expiration = dis.readInt();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore);
+ return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
}
else
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Fri Nov 11 16:55:33 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.context.C
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.service.IWriteResponseHandler;
@@ -76,11 +77,11 @@ public class CounterColumn extends Colum
this.timestampOfLastDelete = timestampOfLastDelete;
}
- public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, boolean fromRemote)
+ public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, IColumnSerializer.Flag flag)
{
// #elt being negative means we have to clean delta
short count = value.getShort(value.position());
- if (fromRemote || count < 0)
+ if (flag == IColumnSerializer.Flag.FROM_REMOTE || (flag == IColumnSerializer.Flag.LOCAL && count < 0))
value = CounterContext.instance().clearAllDelta(value);
return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Nov 11 16:55:33 2011
@@ -412,6 +412,21 @@ public class DataTracker
return histogram;
}
+ public double getCompressionRatio()
+ {
+ double sum = 0;
+ int total = 0;
+ for (SSTableReader sstable : getSSTables())
+ {
+ if (sstable.getCompressionRatio() != Double.MIN_VALUE)
+ {
+ sum += sstable.getCompressionRatio();
+ total++;
+ }
+ }
+ return total != 0 ? (double)sum/total: 0;
+ }
+
public long getMinRowSize()
{
long min = 0;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Fri Nov 11 16:55:33 2011
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -60,9 +61,9 @@ public class ExpiringColumn extends Colu
}
/** @return Either a DeletedColumn, or an ExpiringColumn. */
- public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore)
+ public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, IColumnSerializer.Flag flag)
{
- if (localExpirationTime >= expireBefore)
+ if (localExpirationTime >= expireBefore || flag == IColumnSerializer.Flag.PRESERVE_SIZE)
return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
// the column is now expired, we can safely return a simple tombstone
return new DeletedColumn(name, localExpirationTime, timestamp);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Fri Nov 11 16:55:33 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -105,7 +106,7 @@ class ReadResponseSerializer implements
if (!isDigest)
{
// This is coming from a remote host
- row = Row.serializer().deserialize(dis, version, true, ArrayBackedSortedColumns.factory());
+ row = Row.serializer().deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
}
return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Fri Nov 11 16:55:33 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.*;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -61,15 +62,15 @@ public class Row
ColumnFamily.serializer().serialize(row.cf, dos);
}
- public Row deserialize(DataInput dis, int version, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+ public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
{
return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
- ColumnFamily.serializer().deserialize(dis, fromRemote, factory));
+ ColumnFamily.serializer().deserialize(dis, flag, factory));
}
public Row deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
+ return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
}
public long serializedSize(Row row, int version)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
@@ -388,7 +389,7 @@ public class RowMutation implements IMut
}
}
- public RowMutation deserialize(DataInput dis, int version, boolean fromRemote) throws IOException
+ public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
{
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -397,7 +398,7 @@ public class RowMutation implements IMut
for (int i = 0; i < size; ++i)
{
Integer cfid = Integer.valueOf(dis.readInt());
- ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, fromRemote, ThreadSafeSortedColumns.factory());
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, flag, ThreadSafeSortedColumns.factory());
modifications.put(cfid, cf);
}
return new RowMutation(table, key, modifications);
@@ -405,7 +406,7 @@ public class RowMutation implements IMut
public RowMutation deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, true);
+ return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
}
public long serializedSize(RowMutation rm, int version)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Nov 11 16:55:33 2011
@@ -336,15 +336,15 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false);
+ return deserialize(dis, IColumnSerializer.Flag.LOCAL);
}
- public IColumn deserialize(DataInput dis, boolean fromRemote) throws IOException
+ public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
{
- return deserialize(dis, fromRemote, (int)(System.currentTimeMillis() / 1000));
+ return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
}
- public IColumn deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+ public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
int localDeleteTime = dis.readInt();
@@ -357,7 +357,7 @@ class SuperColumnSerializer implements I
/* read the number of columns */
int size = dis.readInt();
ColumnSerializer serializer = Column.serializer();
- ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, fromRemote, expireBefore);
+ ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
SuperColumn superColumn = new SuperColumn(name, ThreadSafeSortedColumns.factory().fromSorted(preSortedMap, false));
superColumn.delete(localDeleteTime, markedForDeleteAt);
return superColumn;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Nov 11 16:55:33 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -281,7 +282,7 @@ public class CommitLog implements Commit
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, false);
+ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
}
catch (UnserializableColumnFamilyException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Fri Nov 11 16:55:33 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -55,6 +56,7 @@ public class ColumnFamilyRecordReader ex
private RowIterator iter;
private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
private SlicePredicate predicate;
+ private boolean isEmptyPredicate;
private int totalRowCount; // total number of rows to fetch
private int batchRowCount; // fetch this many per batch
private String cfName;
@@ -89,11 +91,33 @@ public class ColumnFamilyRecordReader ex
return ((float)iter.rowsRead()) / totalRowCount;
}
+ static boolean isEmptyPredicate(SlicePredicate predicate)
+ {
+ if (predicate == null)
+ return true;
+
+ if (predicate.isSetColumn_names() && predicate.getSlice_range() == null)
+ return false;
+
+ if (predicate.getSlice_range() == null)
+ return true;
+
+ byte[] start = predicate.getSlice_range().getStart();
+ byte[] finish = predicate.getSlice_range().getFinish();
+ if ( (start == null || start == ArrayUtils.EMPTY_BYTE_ARRAY) &&
+ (finish == null || finish == ArrayUtils.EMPTY_BYTE_ARRAY) )
+ return true;
+
+
+ return false;
+ }
+
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
Configuration conf = context.getConfiguration();
predicate = ConfigHelper.getInputSlicePredicate(conf);
+ isEmptyPredicate = isEmptyPredicate(predicate);
totalRowCount = ConfigHelper.getInputSplitSize(conf);
batchRowCount = ConfigHelper.getRangeBatchSize(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
@@ -237,6 +261,7 @@ public class ColumnFamilyRecordReader ex
}
else if (startToken.equals(split.getEndToken()))
{
+ // reached end of the split
rows = null;
return;
}
@@ -257,14 +282,37 @@ public class ColumnFamilyRecordReader ex
rows = null;
return;
}
-
- // reset to iterate through this new batch
- i = 0;
// prepare for the next slice to be read
KeySlice lastRow = rows.get(rows.size() - 1);
ByteBuffer rowkey = lastRow.key;
startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey));
+
+ // remove ghosts when fetching all columns
+ if (isEmptyPredicate)
+ {
+ Iterator<KeySlice> it = rows.iterator();
+
+ while(it.hasNext())
+ {
+ KeySlice ks = it.next();
+
+ if (ks.getColumnsSize() == 0)
+ {
+ it.remove();
+ }
+ }
+
+ // all ghosts, spooky
+ if (rows.isEmpty())
+ {
+ maybeInit();
+ return;
+ }
+ }
+
+ // reset to iterate through this new batch
+ i = 0;
}
catch (Exception e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java Fri Nov 11 16:55:33 2011
@@ -27,5 +27,21 @@ import org.apache.cassandra.db.IColumn;
public interface IColumnSerializer extends ISerializer<IColumn>
{
- public IColumn deserialize(DataInput in, boolean fromRemote, int expireBefore) throws IOException;
+ /**
+ * Flag affecting deserialization behavior.
+ * - LOCAL: for deserialization of local data (Expired columns are
+ * converted to tombstones (to gain disk space)).
+ * - FROM_REMOTE: for deserialization of data received from remote hosts
+ * (Expired columns are converted to tombstone and counters have
+ * their delta cleared)
+ * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+ * when we must ensure that deserializing and reserializing the
+ * result yield the exact same bytes. Streaming uses this.
+ */
+ public static enum Flag
+ {
+ LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+ }
+
+ public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java Fri Nov 11 16:55:33 2011
@@ -23,14 +23,15 @@ import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
public class CompressedSequentialWriter extends SequentialWriter
{
- public static SequentialWriter open(String dataFilePath, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
+ public static SequentialWriter open(String dataFilePath, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
{
- return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters);
+ return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
}
// holds offset in the file where current chunk should be written
@@ -49,7 +50,11 @@ public class CompressedSequentialWriter
private final Checksum checksum = new CRC32();
- public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
+ private long originalSize = 0, compressedSize = 0;
+
+ private Collector sstableMetadataCollector;
+
+ public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
{
super(file, parameters.chunkLength(), skipIOCache);
this.compressor = parameters.sstableCompressor;
@@ -60,6 +65,7 @@ public class CompressedSequentialWriter
/* Index File (-CompressionInfo.db component) and it's header */
metadataWriter = new CompressionMetadata.Writer(indexFilePath);
metadataWriter.writeHeader(parameters);
+ this.sstableMetadataCollector = sstableMetadataCollector;
}
@Override
@@ -82,6 +88,9 @@ public class CompressedSequentialWriter
// compressing data with buffer re-use
int compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
+ originalSize += validBufferBytes;
+ compressedSize += compressedLength;
+
// update checksum
checksum.update(buffer, 0, validBufferBytes);
@@ -179,7 +188,7 @@ public class CompressedSequentialWriter
return; // already closed
super.close();
-
+ sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
metadataWriter.finalizeHeader(current, chunkCount);
metadataWriter.close();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Nov 11 16:55:33 2011
@@ -56,7 +56,7 @@ public class Descriptor
// f (0.7.0): switched bloom filter implementations in data component
// g (0.8): tracks flushed-at context in metadata component
// h (1.0): tracks max client timestamp in metadata component
- public static final String CURRENT_VERSION = "h";
+ public static final String CURRENT_VERSION = "hb";
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
@@ -74,6 +74,7 @@ public class Descriptor
public final boolean usesOldBloomFilter;
public final boolean metadataIncludesReplayPosition;
public final boolean tracksMaxTimestamp;
+ public final boolean hasCompressionRatio;
public enum TempState
{
@@ -115,6 +116,7 @@ public class Descriptor
usesOldBloomFilter = version.compareTo("f") < 0;
metadataIncludesReplayPosition = version.compareTo("g") >= 0;
tracksMaxTimestamp = version.compareTo("h") >= 0;
+ hasCompressionRatio = version.compareTo("hb") >= 0;
isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Nov 11 16:55:33 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.BytesReadTracker;
@@ -41,7 +42,7 @@ public class SSTableIdentityIterator imp
private final DataInput input;
private final long dataStart;
public final long dataSize;
- public final boolean fromRemote;
+ public final IColumnSerializer.Flag flag;
private final ColumnFamily columnFamily;
private final int columnCount;
@@ -82,17 +83,17 @@ public class SSTableIdentityIterator imp
public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData)
throws IOException
{
- this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
+ this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
}
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, boolean fromRemote)
+ public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
throws IOException
{
- this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
+ this(metadata, file, key, dataStart, dataSize, false, null, flag);
}
// sstable may be null *if* deserializeRowHeader is false
- private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+ private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, IColumnSerializer.Flag flag)
throws IOException
{
this.input = input;
@@ -101,7 +102,7 @@ public class SSTableIdentityIterator imp
this.dataStart = dataStart;
this.dataSize = dataSize;
this.expireBefore = (int)(System.currentTimeMillis() / 1000);
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.validateColumns = checkData;
try
@@ -173,7 +174,7 @@ public class SSTableIdentityIterator imp
{
try
{
- IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, fromRemote, expireBefore);
+ IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, flag, expireBefore);
if (validateColumns)
column.validateFields(columnFamily.metadata());
return column;
@@ -228,7 +229,7 @@ public class SSTableIdentityIterator imp
assert inputWithTracker.getBytesRead() == headerSize();
ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
// since we already read column count, just pass that value and continue deserialization
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
+ ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, flag);
if (validateColumns)
{
try
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java Fri Nov 11 16:55:33 2011
@@ -51,19 +51,21 @@ public class SSTableMetadata
protected final EstimatedHistogram estimatedColumnCount;
protected final ReplayPosition replayPosition;
protected final long maxTimestamp;
+ protected final double compressionRatio;
public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
private SSTableMetadata()
{
- this(defaultRowSizeHistogram(), defaultColumnCountHistogram(), ReplayPosition.NONE, Long.MIN_VALUE);
+ this(defaultRowSizeHistogram(), defaultColumnCountHistogram(), ReplayPosition.NONE, Long.MIN_VALUE, Double.MIN_VALUE);
}
- private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp)
+ private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr)
{
this.estimatedRowSize = rowSizes;
this.estimatedColumnCount = columnCounts;
this.replayPosition = replayPosition;
this.maxTimestamp = maxTimestamp;
+ this.compressionRatio = cr;
}
public static SSTableMetadata createDefaultInstance()
@@ -96,6 +98,11 @@ public class SSTableMetadata
return maxTimestamp;
}
+ public double getCompressionRatio()
+ {
+ return compressionRatio;
+ }
+
static EstimatedHistogram defaultColumnCountHistogram()
{
// EH of 114 can track a max value of 2395318855, i.e., > 2B columns
@@ -114,6 +121,7 @@ public class SSTableMetadata
protected EstimatedHistogram estimatedColumnCount;
protected ReplayPosition replayPosition;
protected long maxTimestamp;
+ protected double compressionRatio;
private Collector()
{
@@ -121,6 +129,7 @@ public class SSTableMetadata
this.estimatedColumnCount = defaultColumnCountHistogram();
this.replayPosition = ReplayPosition.NONE;
this.maxTimestamp = Long.MIN_VALUE;
+ this.compressionRatio = Double.MIN_VALUE;
}
public void addRowSize(long rowSize)
@@ -133,6 +142,15 @@ public class SSTableMetadata
estimatedColumnCount.add(columnCount);
}
+ /**
+ * Ratio is compressed/uncompressed and it is
+ * if you have 1.x then compression isn't helping
+ */
+ public void addCompressionRatio(long compressed, long uncompressed)
+ {
+ compressionRatio = (double) compressed/uncompressed;
+ }
+
public void updateMaxTimestamp(long potentialMax)
{
maxTimestamp = Math.max(maxTimestamp, potentialMax);
@@ -140,7 +158,7 @@ public class SSTableMetadata
public SSTableMetadata finalizeMetadata()
{
- return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp);
+ return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp, compressionRatio);
}
public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -172,6 +190,7 @@ public class SSTableMetadata
EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedColumnCount(), dos);
ReplayPosition.serializer.serialize(sstableStats.getReplayPosition(), dos);
dos.writeLong(sstableStats.getMaxTimestamp());
+ dos.writeDouble(sstableStats.getCompressionRatio());
}
public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
@@ -187,7 +206,7 @@ public class SSTableMetadata
DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
try
{
- return deserialize(dis, descriptor.metadataIncludesReplayPosition, descriptor.tracksMaxTimestamp);
+ return deserialize(dis, descriptor);
}
finally
{
@@ -195,16 +214,18 @@ public class SSTableMetadata
}
}
- public SSTableMetadata deserialize(DataInputStream dis, boolean includesReplayPosition, boolean tracksMaxTimestamp) throws IOException
+ public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc) throws IOException
{
EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
- ReplayPosition replayPosition = includesReplayPosition
+ ReplayPosition replayPosition = desc.metadataIncludesReplayPosition
? ReplayPosition.serializer.deserialize(dis)
: ReplayPosition.NONE;
- long maxTimestamp = tracksMaxTimestamp ? dis.readLong() : Long.MIN_VALUE;
-
- return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp);
+ long maxTimestamp = desc.tracksMaxTimestamp ? dis.readLong() : Long.MIN_VALUE;
+ double compressionRatio = desc.hasCompressionRatio
+ ? dis.readDouble()
+ : Double.MIN_VALUE;
+ return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Nov 11 16:55:33 2011
@@ -899,6 +899,11 @@ public class SSTableReader extends SSTab
return sstableMetadata.getEstimatedColumnCount();
}
+ public double getCompressionRatio()
+ {
+ return sstableMetadata.getCompressionRatio();
+ }
+
public ReplayPosition getReplayPosition()
{
return sstableMetadata.getReplayPosition();
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Nov 11 16:55:33 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.StorageService;
@@ -89,7 +90,8 @@ public class SSTableWriter extends SSTab
dataFile = CompressedSequentialWriter.open(getFilename(),
descriptor.filenameFor(Component.COMPRESSION_INFO),
true,
- metadata.compressionParameters());
+ metadata.compressionParameters(),
+ sstableMetadataCollector);
}
else
{
@@ -230,8 +232,9 @@ public class SSTableWriter extends SSTab
ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
for (int i = 0; i < columnCount; i++)
{
- // deserialize column with fromRemote false, in order to keep size of streamed column
- IColumn column = cf.getColumnSerializer().deserialize(in, false, Integer.MIN_VALUE);
+ // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+ // data size received, so we must reserialize the exact same data
+ IColumn column = cf.getColumnSerializer().deserialize(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE);
if (column instanceof CounterColumn)
{
column = ((CounterColumn) column).markDeltaToBeCleared();
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.io.IColumnSerializer;
/**
* Facade over a DataInput that contains IColumns in sorted order.
@@ -43,16 +44,16 @@ public class ColumnSortedMap implements
private final DataInput dis;
private final Comparator<ByteBuffer> comparator;
private final int length;
- private final boolean fromRemote;
+ private final IColumnSerializer.Flag flag;
private final int expireBefore;
- public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.comparator = comparator;
this.serializer = serializer;
this.dis = dis;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -143,7 +144,7 @@ public class ColumnSortedMap implements
public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
{
- return new ColumnSet(serializer, dis, length, fromRemote, expireBefore);
+ return new ColumnSet(serializer, dis, length, flag, expireBefore);
}
}
@@ -152,15 +153,15 @@ class ColumnSet implements Set<Map.Entry
private final ColumnSerializer serializer;
private final DataInput dis;
private final int length;
- private boolean fromRemote;
+ private IColumnSerializer.Flag flag;
private final int expireBefore;
- public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.serializer = serializer;
this.dis = dis;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -181,7 +182,7 @@ class ColumnSet implements Set<Map.Entry
public Iterator<Entry<ByteBuffer, IColumn>> iterator()
{
- return new ColumnIterator(serializer, dis, length, fromRemote, expireBefore);
+ return new ColumnIterator(serializer, dis, length, flag, expireBefore);
}
public Object[] toArray()
@@ -234,16 +235,16 @@ class ColumnIterator implements Iterator
private final ColumnSerializer serializer;
private final DataInput dis;
private final int length;
- private final boolean fromRemote;
+ private final IColumnSerializer.Flag flag;
private int count = 0;
private final int expireBefore;
- public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.dis = dis;
this.serializer = serializer;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -252,7 +253,7 @@ class ColumnIterator implements Iterator
try
{
count++;
- return serializer.deserialize(dis, fromRemote, expireBefore);
+ return serializer.deserialize(dis, flag, expireBefore);
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
@@ -129,7 +130,8 @@ public class IncomingStreamReader
// need to update row cache
if (controller == null)
controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE, true);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+ // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
// We don't expire anything so the row shouldn't be empty
assert !row.isEmpty();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java Fri Nov 11 16:55:33 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.db.context.CounterContext;
import static org.apache.cassandra.db.context.CounterContext.ContextState;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.HeapAllocator;
@@ -295,7 +296,7 @@ public class CounterColumnTest extends S
assert original.equals(deserialized);
bufIn = new ByteArrayInputStream(serialized, 0, serialized.length);
- CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), true);
+ CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), IColumnSerializer.Flag.FROM_REMOTE);
assert deserializedOnRemote.name().equals(original.name());
assert deserializedOnRemote.total() == original.total();
assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java Fri Nov 11 16:55:33 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
import org.junit.Test;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.util.*;
import static org.junit.Assert.assertEquals;
@@ -54,8 +55,9 @@ public class CompressedRandomAccessReade
try
{
+ SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
SequentialWriter writer = compressed
- ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance))
+ ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
: new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
writer.write("The quick ".getBytes());
@@ -104,7 +106,8 @@ public class CompressedRandomAccessReade
File metadata = new File(file.getPath() + ".meta");
metadata.deleteOnExit();
- SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance));
+ SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
+ SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
writer.write(CONTENT.getBytes());
writer.close();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java Fri Nov 11 16:55:33 2011
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
import org.junit.Test;
@@ -58,7 +59,8 @@ public class SSTableMetadataSerializerTe
ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
DataInputStream dis = new DataInputStream(byteInput);
- SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis, true, true);
+ Descriptor desc = new Descriptor(Descriptor.CURRENT_VERSION, new File("."), "", "", 0, false);
+ SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis, desc);
assert stats.getEstimatedRowSize().equals(originalMetadata.getEstimatedRowSize());
assert stats.getEstimatedRowSize().equals(rowSizes);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Nov 11 16:55:33 2011
@@ -83,19 +83,13 @@ public class StreamingTransferTest exten
// transfer the first and last key
logger.debug("Transferring " + cfs.columnFamily);
- int[] offs = new int[]{1, 3};
- IPartitioner p = StorageService.getPartitioner();
- List<Range> ranges = new ArrayList<Range>();
- ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
- ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
- StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
- StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
- session.await();
+ transfer(table, sstable);
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
// and that the index and filter were properly recovered
+ int[] offs = new int[]{1, 3};
List<Row> rows = Util.getRangeSlice(cfs);
assertEquals(offs.length, rows.size());
for (int i = 0; i < offs.length; i++)
@@ -119,6 +113,17 @@ public class StreamingTransferTest exten
return keys;
}
+ private void transfer(Table table, SSTableReader sstable) throws Exception
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ List<Range> ranges = new ArrayList<Range>();
+ ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
+ ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+ StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
+ session.await();
+ }
+
@Test
public void testTransferTable() throws Exception
{
@@ -222,6 +227,12 @@ public class StreamingTransferTest exten
.write(cleanedEntries);
SSTableReader streamed = cfs.getSSTables().iterator().next();
SSTableUtils.assertContentEquals(cleaned, streamed);
+
+ // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
+ cfs.clearUnsafe();
+ transfer(table, streamed);
+ SSTableReader restreamed = cfs.getSSTables().iterator().next();
+ SSTableUtils.assertContentEquals(streamed, restreamed);
}
@Test
@@ -320,7 +331,7 @@ public class StreamingTransferTest exten
assertEquals(entry.getKey(), rows.get(0).key);
}
}
-
+
public interface Mutator
{
public void mutate(String key, String col, long timestamp) throws Exception;