You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:05 UTC
[4/4] git commit: Include a timestamp with all read commands to
determine column expiration
Include a timestamp with all read commands to determine column expiration
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-5149
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f7628ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f7628ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f7628ce
Branch: refs/heads/trunk
Commit: 1f7628ce7c1b3f820717eaa44df9b182158eb49e
Parents: 62295f6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 18 19:15:02 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 18 19:15:02 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/config/CFMetaData.java | 2 +-
.../cassandra/config/ColumnDefinition.java | 3 +-
.../apache/cassandra/cql/QueryProcessor.java | 17 +-
.../cql3/statements/ColumnGroupMap.java | 6 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../cql3/statements/SelectStatement.java | 49 ++--
.../cassandra/cql3/statements/Selection.java | 12 +-
.../cassandra/db/CollationController.java | 6 +-
src/java/org/apache/cassandra/db/Column.java | 25 +-
.../org/apache/cassandra/db/ColumnFamily.java | 9 +-
.../cassandra/db/ColumnFamilySerializer.java | 22 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 107 ++++----
.../apache/cassandra/db/ColumnSerializer.java | 2 +-
.../org/apache/cassandra/db/CounterColumn.java | 8 +-
.../apache/cassandra/db/CounterMutation.java | 7 +-
.../cassandra/db/CounterUpdateColumn.java | 2 +-
src/java/org/apache/cassandra/db/DefsTable.java | 4 +-
.../org/apache/cassandra/db/DeletedColumn.java | 4 +-
.../org/apache/cassandra/db/DeletionTime.java | 4 +-
.../org/apache/cassandra/db/ExpiringColumn.java | 15 +-
.../cassandra/db/HintedHandOffManager.java | 23 +-
.../org/apache/cassandra/db/OnDiskAtom.java | 2 +-
.../apache/cassandra/db/RangeSliceCommand.java | 83 +++---
.../org/apache/cassandra/db/RangeTombstone.java | 4 +-
.../org/apache/cassandra/db/ReadCommand.java | 18 +-
.../db/RetriedSliceFromReadCommand.java | 9 +-
src/java/org/apache/cassandra/db/Row.java | 4 +-
.../apache/cassandra/db/RowIteratorFactory.java | 16 +-
.../cassandra/db/SliceByNamesReadCommand.java | 28 +-
.../cassandra/db/SliceFromReadCommand.java | 30 ++-
.../apache/cassandra/db/SliceQueryPager.java | 7 +-
.../org/apache/cassandra/db/SuperColumns.java | 4 +-
.../org/apache/cassandra/db/SystemTable.java | 20 +-
.../db/compaction/CompactionManager.java | 6 +-
.../db/compaction/LazilyCompactedRow.java | 4 +-
.../db/compaction/PrecompactedRow.java | 4 +-
.../cassandra/db/filter/ColumnCounter.java | 17 +-
.../cassandra/db/filter/ExtendedFilter.java | 49 +++-
.../cassandra/db/filter/IDiskAtomFilter.java | 4 +-
.../cassandra/db/filter/NamesQueryFilter.java | 8 +-
.../apache/cassandra/db/filter/QueryFilter.java | 31 ++-
.../cassandra/db/filter/SliceQueryFilter.java | 20 +-
.../AbstractSimplePerColumnSecondaryIndex.java | 2 +-
.../db/index/SecondaryIndexManager.java | 20 +-
.../db/index/SecondaryIndexSearcher.java | 17 +-
.../db/index/composites/CompositesIndex.java | 2 +-
.../CompositesIndexOnClusteringKey.java | 4 +-
.../CompositesIndexOnPartitionKey.java | 4 +-
.../composites/CompositesIndexOnRegular.java | 4 +-
.../db/index/composites/CompositesSearcher.java | 31 ++-
.../cassandra/db/index/keys/KeysIndex.java | 4 +-
.../cassandra/db/index/keys/KeysSearcher.java | 27 +-
.../io/sstable/SSTableIdentityIterator.java | 5 +-
.../cassandra/io/sstable/SSTableReader.java | 10 +-
.../cassandra/service/ActiveRepairService.java | 2 +-
.../apache/cassandra/service/CacheService.java | 4 +-
.../service/RangeSliceResponseResolver.java | 6 +-
.../service/RangeSliceVerbHandler.java | 15 +-
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../cassandra/service/RowDataResolver.java | 12 +-
.../apache/cassandra/service/StorageProxy.java | 35 ++-
.../cassandra/thrift/CassandraServer.java | 211 ++++++++-------
.../serialization/2.0/db.RangeSliceCommand.bin | Bin 753 -> 801 bytes
.../2.0/db.SliceByNamesReadCommand.bin | Bin 437 -> 485 bytes
.../2.0/db.SliceFromReadCommand.bin | Bin 437 -> 485 bytes
.../org/apache/cassandra/db/LongTableTest.java | 5 +-
.../unit/org/apache/cassandra/SchemaLoader.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 9 +-
.../org/apache/cassandra/config/DefsTest.java | 6 +-
.../org/apache/cassandra/db/CleanupTest.java | 4 +-
.../cassandra/db/CollationControllerTest.java | 4 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 259 +++++++++++++------
.../apache/cassandra/db/ColumnFamilyTest.java | 4 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 12 +-
.../apache/cassandra/db/KeyCollisionTest.java | 2 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 10 +-
.../apache/cassandra/db/ReadMessageTest.java | 13 +-
.../db/RecoveryManagerTruncateTest.java | 6 +-
.../cassandra/db/RemoveColumnFamilyTest.java | 2 +-
.../db/RemoveColumnFamilyWithFlush1Test.java | 2 +-
.../db/RemoveColumnFamilyWithFlush2Test.java | 2 +-
.../apache/cassandra/db/RemoveColumnTest.java | 18 +-
.../cassandra/db/RemoveSubColumnTest.java | 10 +-
.../org/apache/cassandra/db/RowCacheTest.java | 28 +-
test/unit/org/apache/cassandra/db/RowTest.java | 4 +-
.../apache/cassandra/db/SerializationsTest.java | 24 +-
.../unit/org/apache/cassandra/db/TableTest.java | 94 ++++---
.../org/apache/cassandra/db/TimeSortTest.java | 13 +-
.../db/compaction/CompactionsPurgeTest.java | 22 +-
.../db/compaction/CompactionsTest.java | 4 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/db/compaction/TTLExpiryTest.java | 2 +-
.../db/index/PerRowSecondaryIndexTest.java | 7 +-
.../cassandra/db/marshal/CompositeTypeTest.java | 2 +-
.../db/marshal/DynamicCompositeTypeTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
.../service/AntiEntropyServiceTestAbstract.java | 2 +-
.../cassandra/service/RowResolverTest.java | 14 +-
.../streaming/StreamingTransferTest.java | 12 +-
.../cassandra/tools/SSTableExportTest.java | 4 +-
.../cassandra/tools/SSTableImportTest.java | 12 +-
102 files changed, 1008 insertions(+), 730 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a620d31..46b67e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,8 @@
* Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
* Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
* cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+ (CASSANDRA-5149)
1.2.6
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b5ece0c..15713bb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1251,7 +1251,7 @@ public final class CFMetaData
public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
{
- return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, (int) (System.currentTimeMillis() / 1000), version);
+ return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
}
public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 9d21435..470e7d7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -306,7 +306,8 @@ public class ColumnDefinition
DefsTable.searchComposite(cfName, true),
DefsTable.searchComposite(cfName, false),
false,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
return new Row(key, cf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 57fec4a..a40bfea 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -71,7 +71,7 @@ public class QueryProcessor
public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME);
- private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+ private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException
{
List<ReadCommand> commands = new ArrayList<ReadCommand>();
@@ -87,7 +87,7 @@ public class QueryProcessor
ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables);
validateKey(key);
- commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), new NamesQueryFilter(columnNames)));
+ commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), now, new NamesQueryFilter(columnNames)));
}
}
// ...a range (slice) of column names
@@ -106,6 +106,7 @@ public class QueryProcessor
commands.add(new SliceFromReadCommand(metadata.ksName,
key,
select.getColumnFamily(),
+ now,
new SliceQueryFilter(start, finish, select.isColumnsReversed(), select.getColumnsLimit())));
}
}
@@ -128,7 +129,7 @@ public class QueryProcessor
return columnNames;
}
- private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+ private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
throws ReadTimeoutException, UnavailableException, InvalidRequestException
{
IPartitioner<?> p = StorageService.getPartitioner();
@@ -175,6 +176,7 @@ public class QueryProcessor
List<org.apache.cassandra.db.Row> rows = StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
select.getColumnFamily(),
+ now,
columnFilter,
bounds,
expressions,
@@ -379,14 +381,15 @@ public class QueryProcessor
List<org.apache.cassandra.db.Row> rows;
+ long now = System.currentTimeMillis();
// By-key
if (!select.isKeyRange() && (select.getKeys().size() > 0))
{
- rows = getSlice(metadata, select, variables);
+ rows = getSlice(metadata, select, variables, now);
}
else
{
- rows = multiRangeSlice(metadata, select, variables);
+ rows = multiRangeSlice(metadata, select, variables, now);
}
// count resultset is a single column named "count"
@@ -429,7 +432,7 @@ public class QueryProcessor
{
for (org.apache.cassandra.db.Column c : row.cf.getSortedColumns())
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(c.name());
@@ -474,7 +477,7 @@ public class QueryProcessor
if (cd != null)
result.schema.value_types.put(name, TypeParser.getShortName(cd.getValidator()));
org.apache.cassandra.db.Column c = row.cf.getColumn(name);
- if (c == null || c.isMarkedForDelete())
+ if (c == null || c.isMarkedForDelete(now))
thriftColumns.add(new Column().setName(name));
else
thriftColumns.add(thriftify(c));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 20fa3bd..8974523 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -104,20 +104,22 @@ public class ColumnGroupMap
{
private final CompositeType composite;
private final int idx;
+ private final long now;
private ByteBuffer[] previous;
private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
private ColumnGroupMap currentGroup;
- public Builder(CompositeType composite, boolean hasCollections)
+ public Builder(CompositeType composite, boolean hasCollections, long now)
{
this.composite = composite;
this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+ this.now = now;
}
public void add(Column c)
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
return;
ByteBuffer[] current = composite.split(c.name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f6b7140..62f7fbd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -297,10 +297,12 @@ public abstract class ModificationStatement implements CQLStatement
}
List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
+ long now = System.currentTimeMillis();
for (ByteBuffer key : partitionKeys)
commands.add(new SliceFromReadCommand(keyspace(),
key,
columnFamily(),
+ now,
new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
List<Row> rows = local
@@ -313,7 +315,7 @@ public abstract class ModificationStatement implements CQLStatement
if (row.cf == null || row.cf.getColumnCount() == 0)
continue;
- ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+ ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
for (Column column : row.cf)
groupBuilder.add(column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a96eb1d..fd47ba8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -127,17 +127,18 @@ public class SelectStatement implements CQLStatement
cl.validateForRead(keyspace());
int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit), cl)
- : StorageProxy.read(getSliceCommands(variables, limit), cl);
+ ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
+ : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
- return processResults(rows, variables, limit);
+ return processResults(rows, variables, limit, now);
}
- private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some column together in sparse column families
- ResultSet rset = process(rows, variables, limit);
+ ResultSet rset = process(rows, variables, limit, now);
rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
return new ResultMessage.Rows(rset);
}
@@ -153,19 +154,20 @@ public class SelectStatement implements CQLStatement
public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
+ List<ByteBuffer> variables = Collections.emptyList();
int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit))
- : readLocally(keyspace(), getSliceCommands(variables, limit));
+ ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+ : readLocally(keyspace(), getSliceCommands(variables, limit, now));
- return processResults(rows, variables, limit);
+ return processResults(rows, variables, limit, now);
}
public ResultSet process(List<Row> rows) throws InvalidRequestException
{
assert !parameters.isCount; // not yet needed
- return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()));
+ return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
}
public String keyspace()
@@ -178,7 +180,7 @@ public class SelectStatement implements CQLStatement
return cfDef.cfm.cfName;
}
- private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
Collection<ByteBuffer> keys = getKeys(variables);
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
@@ -192,25 +194,18 @@ public class SelectStatement implements CQLStatement
// We should not share the slice filter amongst the commands (hence the cloneShallow), due to
// SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
// (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
- commands.add(ReadCommand.create(keyspace(), key, columnFamily(), filter.cloneShallow()));
+ commands.add(ReadCommand.create(keyspace(), key, columnFamily(), now, filter.cloneShallow()));
}
return commands;
}
- private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
IDiskAtomFilter filter = makeFilter(variables, limit);
List<IndexExpression> expressions = getIndexExpressions(variables);
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
- return new RangeSliceCommand(keyspace(),
- columnFamily(),
- filter,
- getKeyBounds(variables),
- expressions,
- limit,
- true,
- false);
+ return new RangeSliceCommand(keyspace(), columnFamily(), now, filter, getKeyBounds(variables), expressions, limit, true, false);
}
private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
@@ -661,9 +656,9 @@ public class SelectStatement implements CQLStatement
};
}
- private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit) throws InvalidRequestException
+ private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
{
- Selection.ResultSetBuilder result = selection.resultSetBuilder();
+ Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
for (org.apache.cassandra.db.Row row : rows)
{
// Not columns match the query, skip
@@ -679,7 +674,7 @@ public class SelectStatement implements CQLStatement
// One cqlRow per column
for (Column c : columnsInOrder(row.cf, variables))
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
ByteBuffer[] components = null;
@@ -728,11 +723,11 @@ public class SelectStatement implements CQLStatement
// Sparse case: group column in cqlRow when composite prefix is equal
CompositeType composite = (CompositeType)cfDef.cfm.comparator;
- ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections);
+ ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections, now);
for (Column c : row.cf)
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
builder.add(c);
@@ -743,7 +738,7 @@ public class SelectStatement implements CQLStatement
}
else
{
- if (row.cf.hasOnlyTombstones())
+ if (row.cf.hasOnlyTombstones(now))
continue;
// Static case: One cqlRow for all columns
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index d3018e5..cf2b62e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -213,9 +213,9 @@ public abstract class Selection
return columnsList;
}
- public ResultSetBuilder resultSetBuilder()
+ public ResultSetBuilder resultSetBuilder(long now)
{
- return new ResultSetBuilder();
+ return new ResultSetBuilder(now);
}
private static ByteBuffer value(Column c)
@@ -240,12 +240,14 @@ public abstract class Selection
List<ByteBuffer> current;
final long[] timestamps;
final int[] ttls;
+ final long now;
- private ResultSetBuilder()
+ private ResultSetBuilder(long now)
{
this.resultSet = new ResultSet(metadata);
this.timestamps = collectTimestamps ? new long[columnsList.size()] : null;
this.ttls = collectTTLs ? new int[columnsList.size()] : null;
+ this.now = now;
}
public void add(ByteBuffer v)
@@ -264,14 +266,14 @@ public abstract class Selection
{
int ttl = -1;
if (!isDead(c) && c instanceof ExpiringColumn)
- ttl = c.getLocalDeletionTime() - (int) (System.currentTimeMillis() / 1000);
+ ttl = c.getLocalDeletionTime() - (int) (now / 1000);
ttls[current.size() - 1] = ttl;
}
}
private boolean isDead(Column c)
{
- return c == null || c.isMarkedForDelete();
+ return c == null || c.isMarkedForDelete(now);
}
public void newRow() throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index f1fccef..d0d22c5 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@ -37,8 +35,6 @@ import org.apache.cassandra.utils.HeapAllocator;
public class CollationController
{
- private static final Logger logger = LoggerFactory.getLogger(CollationController.class);
-
private final ColumnFamilyStore cfs;
private final QueryFilter filter;
private final int gcBefore;
@@ -104,7 +100,7 @@ public class CollationController
// (reduceNameFilter removes columns that are known to be irrelevant)
NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
- QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns));
+ QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTable.maxTimestampComparator);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index b42097c..b210d22 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -143,14 +143,20 @@ public class Column implements OnDiskAtom
return timestamp;
}
- public boolean isMarkedForDelete()
+ public boolean isMarkedForDelete(long now)
{
- return (int) (System.currentTimeMillis() / 1000) >= getLocalDeletionTime();
+ return false;
}
+ public boolean isLive(long now)
+ {
+ return !isMarkedForDelete(now);
+ }
+
+ // Don't call unless the column is actually marked for delete.
public long getMarkedForDeleteAt()
{
- throw new IllegalStateException("column is not marked for delete");
+ return Long.MAX_VALUE;
}
public int dataSize()
@@ -186,9 +192,7 @@ public class Column implements OnDiskAtom
public Column diff(Column column)
{
if (timestamp() < column.timestamp())
- {
return column;
- }
return null;
}
@@ -223,9 +227,9 @@ public class Column implements OnDiskAtom
public Column reconcile(Column column, Allocator allocator)
{
// tombstones take precedence. (if both are tombstones, then it doesn't matter which one we use.)
- if (isMarkedForDelete())
+ if (isMarkedForDelete(System.currentTimeMillis()))
return timestamp() < column.timestamp() ? column : this;
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return timestamp() > column.timestamp() ? this : column;
// break ties by comparing values.
if (timestamp() == column.timestamp())
@@ -276,7 +280,7 @@ public class Column implements OnDiskAtom
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
sb.append(":");
- sb.append(isMarkedForDelete());
+ sb.append(isMarkedForDelete(System.currentTimeMillis()));
sb.append(":");
sb.append(value.remaining());
sb.append("@");
@@ -284,11 +288,6 @@ public class Column implements OnDiskAtom
return sb.toString();
}
- public boolean isLive()
- {
- return !isMarkedForDelete();
- }
-
protected void validateName(CFMetaData metadata) throws MarshalException
{
metadata.comparator.validate(name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index cda2d9b..36396f8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -31,7 +31,6 @@ import java.util.UUID;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
-
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.cassandra.cache.IRowCacheEntry;
@@ -194,7 +193,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
* </code>
* but is potentially faster.
*/
- public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+ public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
/**
* Replace oldColumn if present by newColumn.
@@ -426,13 +425,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return metadata.comparator;
}
- public boolean hasOnlyTombstones()
+ public boolean hasOnlyTombstones(long now)
{
for (Column column : this)
- {
- if (column.isLive())
+ if (column.isLive(now))
return false;
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index f5cf3d4..411b040 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -102,11 +102,10 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
return null;
ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
{
- SuperColumns.deserializerSuperColumnFamily(in, cf, flag, expireBefore, version);
+ SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
}
else
{
@@ -115,9 +114,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
ColumnSerializer columnSerializer = Column.serializer;
int size = in.readInt();
for (int i = 0; i < size; ++i)
- {
- cf.addColumn(columnSerializer.deserialize(in, flag, expireBefore));
- }
+ cf.addColumn(columnSerializer.deserialize(in, flag));
}
return cf;
}
@@ -170,21 +167,6 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
throw new UnsupportedOperationException();
}
- public void deserializeColumnsFromSSTable(DataInput in, ColumnFamily cf, int size, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
- {
- Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, size, flag, expireBefore, version);
- while (iter.hasNext())
- cf.addAtom(iter.next());
- }
-
- public void deserializeFromSSTable(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
- {
- cf.delete(DeletionInfo.serializer().deserializeFromSSTable(in, version));
- int size = in.readInt();
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
- deserializeColumnsFromSSTable(in, cf, size, flag, expireBefore, version);
- }
-
public void serializeCfId(UUID cfId, DataOutput out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(cfId, out, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4287df6..648c25a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.compaction.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +48,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -73,6 +70,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static org.apache.cassandra.config.CFMetaData.Caching;
@@ -1180,24 +1178,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
}
- public ColumnFamily getColumnFamily(DecoratedKey key, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+ public ColumnFamily getColumnFamily(DecoratedKey key,
+ ByteBuffer start,
+ ByteBuffer finish,
+ boolean reversed,
+ int limit,
+ long timestamp)
{
- return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit));
- }
-
- /**
- * get a list of columns starting from a given column, in a specified order.
- * only the latest version of a column is returned.
- * @return null if there is no data and no tombstones; otherwise a ColumnFamily
- */
- public ColumnFamily getColumnFamily(QueryFilter filter)
- {
- return getColumnFamily(filter, gcBefore());
- }
-
- public int gcBefore()
- {
- return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
+ return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
}
/**
@@ -1236,7 +1224,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
- ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name), Integer.MIN_VALUE);
+ ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp),
+ Integer.MIN_VALUE);
if (sentinelSuccess && data != null)
CacheService.instance.rowCache.replace(key, sentinel, data);
@@ -1249,7 +1238,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
+ public int gcBefore(long now)
+ {
+ return (int) (now / 1000) - metadata.getGcGraceSeconds();
+ }
+
+ /**
+ * get a list of columns starting from a given column, in a specified order.
+ * only the latest version of a column is returned.
+ * @return null if there is no data and no tombstones; otherwise a ColumnFamily
+ */
+ public ColumnFamily getColumnFamily(QueryFilter filter)
{
assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
@@ -1258,6 +1257,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
long start = System.nanoTime();
try
{
+ int gcBefore = gcBefore(filter.timestamp);
if (isRowCacheEnabled())
{
UUID cfId = Schema.instance.getId(table.getName(), name);
@@ -1274,7 +1274,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return null;
}
- result = filterColumnFamily(cached, filter, gcBefore);
+ result = filterColumnFamily(cached, filter);
}
else
{
@@ -1301,12 +1301,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* tombstones that are no longer relevant.
* The returned column family won't be thread safe.
*/
- ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, int gcBefore)
+ ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
{
ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
- filter.collateOnDiskAtom(cf, ci, gcBefore);
- return removeDeletedCF(cf, gcBefore);
+ filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
+ return removeDeletedCF(cf, gcBefore(filter.timestamp));
}
/**
@@ -1440,7 +1440,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
return files;
}
- finally {
+ finally
+ {
SSTableReader.releaseReferences(view.sstables);
}
}
@@ -1468,14 +1469,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @param range Either a Bounds, which includes start key, or a Range, which does not.
* @param columnFilter description of the columns we're interested in for each row
*/
- public AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+ private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
+ IDiskAtomFilter columnFilter,
+ long timestamp)
{
assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
final RowPosition startWith = range.left;
final RowPosition stopAt = range.right;
- QueryFilter filter = new QueryFilter(null, name, columnFilter);
+ QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
final ViewFragment view = markReferenced(range);
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
@@ -1524,25 +1527,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
+ public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+ List<IndexExpression> rowFilter,
+ IDiskAtomFilter columnFilter,
+ int maxResults)
{
- return getRangeSlice(range, maxResults, columnFilter, rowFilter, false, false);
+ return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
}
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
+ public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+ List<IndexExpression> rowFilter,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
- return filter(getSequentialIterator(range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
+ return filter(getSequentialIterator(range, columnFilter, now),
+ ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
}
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults)
{
- return search(clause, range, maxResults, dataFilter, false);
+ return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
}
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
- return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows);
+ return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
}
public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1566,7 +1587,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
if (extraFilter != null)
{
- ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter));
+ ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
if (cf != null)
data.addAll(cf, HeapAllocator.instance);
}
@@ -1751,14 +1772,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return Iterables.concat(stores);
}
- public static List<ColumnFamilyStore> allUserDefined()
- {
- List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
- for (Table table : Sets.difference(ImmutableSet.copyOf(Table.all()), Schema.systemKeyspaceNames))
- cfses.addAll(table.getColumnFamilyStores());
- return cfses;
- }
-
public Iterable<DecoratedKey> keySamples(Range<Token> range)
{
Collection<SSTableReader> sstables = getSSTables();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 353dda9..fb38b5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -88,7 +88,7 @@ public class ColumnSerializer implements ISerializer<Column>
*/
public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
{
- return deserialize(in, flag, (int) (System.currentTimeMillis() / 1000));
+ return deserialize(in, flag, Integer.MIN_VALUE);
}
public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 15da1df..207ded6 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -169,9 +169,11 @@ public class CounterColumn extends Column
{
assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
- if (column.isMarkedForDelete()) // live + tombstone: track last tombstone
+ // live + tombstone: track last tombstone
+ if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
{
- if (timestamp() < column.timestamp()) // live < tombstone
+ // live < tombstone
+ if (timestamp() < column.timestamp())
{
return column;
}
@@ -230,7 +232,7 @@ public class CounterColumn extends Column
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
sb.append(":");
- sb.append(isMarkedForDelete());
+ sb.append(false);
sb.append(":");
sb.append(contextManager.toString(value));
sb.append("@");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6f60a26..9ace314 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -84,11 +84,12 @@ public class CounterMutation implements IMutation
public RowMutation makeReplicationMutation()
{
List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+ long timestamp = System.currentTimeMillis();
for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
{
if (!columnFamily.metadata().getReplicateOnWrite())
continue;
- addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, readCommands);
+ addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, timestamp, readCommands);
}
// create a replication RowMutation
@@ -106,11 +107,11 @@ public class CounterMutation implements IMutation
return replicationMutation;
}
- private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, List<ReadCommand> commands)
+ private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
{
SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
Iterables.addAll(s, columnFamily.getColumnNames());
- commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
+ commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
}
public MessageOut<CounterMutation> makeMutationMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 9d9530e..1ae7dd7 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -64,7 +64,7 @@ public class CounterUpdateColumn extends Column
assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
// tombstones take precedence
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
return timestamp() > column.timestamp() ? this : column;
// neither is tombstoned
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index df551e0..4b6c1c2 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -152,7 +152,9 @@ public class DefsTable
private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
{
ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
- return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
+ return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+ SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+ System.currentTimeMillis())));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index f9bda78..57c9bf9 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -53,10 +53,8 @@ public class DeletedColumn extends Column
}
@Override
- public boolean isMarkedForDelete()
+ public boolean isMarkedForDelete(long now)
{
- // We don't rely on the column implementation because it could mistakenly return false if
- // some node are not exactly synchronized, which is problematic (see #4307)
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d1ce0eb..5296529 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -83,9 +83,9 @@ public class DeletionTime implements Comparable<DeletionTime>
return localDeletionTime < gcBefore;
}
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Column column, long now)
{
- return column.isMarkedForDelete() && column.getMarkedForDeleteAt() <= markedForDeleteAt;
+ return column.isMarkedForDelete(now) && column.getMarkedForDeleteAt() <= markedForDeleteAt;
}
public long memorySize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 7660bad..f342310 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -158,16 +158,15 @@ public class ExpiringColumn extends Column
}
@Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return (int) (now / 1000) >= getLocalDeletionTime();
+ }
+
+ @Override
public long getMarkedForDeleteAt()
{
- if (isMarkedForDelete())
- {
- return timestamp;
- }
- else
- {
- throw new IllegalStateException("column is not marked for delete");
- }
+ return timestamp;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index c10bed6..e89c769 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -326,15 +326,16 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
delivery:
while (true)
{
+ long now = System.currentTimeMillis();
QueryFilter filter = QueryFilter.getSliceFilter(epkey,
SystemTable.HINTS_CF,
startColumn,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- pageSize);
+ pageSize,
+ now);
- ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
- (int) (System.currentTimeMillis() / 1000));
+ ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
if (pagingFinished(hintsPage, startColumn))
break;
@@ -362,7 +363,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// in which the local deletion timestamp was generated on the last column in the old page, in which
// case the hint will have no columns (since it's deleted) but will still be included in the resultset
// since (even with gcgs=0) it's still a "relevant" tombstone.
- if (!hint.isLive())
+ if (!hint.isLive(System.currentTimeMillis()))
continue;
startColumn = hint.name();
@@ -479,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
- List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
+ List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
for (Row row : rows)
{
UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -576,18 +577,22 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
- // Get a bunch of rows!
- List<Row> rows;
try
{
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+ RangeSliceCommand cmd = new RangeSliceCommand(Table.SYSTEM_KS,
+ SystemTable.HINTS_CF,
+ System.currentTimeMillis(),
+ predicate,
+ range,
+ null,
+ LARGE_NUMBER);
+ return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
}
catch (Exception e)
{
logger.info("HintsCF getEPPendingHints timed out.");
throw new RuntimeException(e);
}
- return rows;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2fdb7ad..14a21c8 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -66,7 +66,7 @@ public interface OnDiskAtom
public OnDiskAtom deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
{
- return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+ return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
}
public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4020a15..c7e71f3 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -15,24 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.cassandra.db;
import java.io.DataInput;
@@ -46,8 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
@@ -57,8 +37,6 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
public class RangeSliceCommand implements IReadCommand
@@ -69,6 +47,8 @@ public class RangeSliceCommand implements IReadCommand
public final String column_family;
+ public final long timestamp;
+
public final IDiskAtomFilter predicate;
public final List<IndexExpression> row_filter;
@@ -77,20 +57,40 @@ public class RangeSliceCommand implements IReadCommand
public final boolean countCQL3Rows;
public final boolean isPaging;
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ int maxResults)
{
- this(keyspace, column_family, predicate, range, null, maxResults, false, false);
+ this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ List<IndexExpression> row_filter,
+ int maxResults)
{
- this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
+ this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ List<IndexExpression> row_filter,
+ int maxResults,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
this.keyspace = keyspace;
this.column_family = column_family;
+ this.timestamp = timestamp;
this.predicate = predicate;
this.range = range;
this.row_filter = row_filter;
@@ -110,6 +110,7 @@ public class RangeSliceCommand implements IReadCommand
return "RangeSliceCommand{" +
"keyspace='" + keyspace + '\'' +
", column_family='" + column_family + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", predicate=" + predicate +
", range=" + range +
", row_filter =" + row_filter +
@@ -131,27 +132,14 @@ public class RangeSliceCommand implements IReadCommand
class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
{
- // For compatibility with pre-1.2 sake. We should remove at some point.
- public static SlicePredicate asSlicePredicate(IDiskAtomFilter predicate)
- {
- SlicePredicate sp = new SlicePredicate();
- if (predicate instanceof NamesQueryFilter)
- {
- sp.setColumn_names(new ArrayList<ByteBuffer>(((NamesQueryFilter)predicate).columns));
- }
- else
- {
- SliceQueryFilter sqf = (SliceQueryFilter)predicate;
- sp.setSlice_range(new SliceRange(sqf.start(), sqf.finish(), sqf.reversed, sqf.count));
- }
- return sp;
- }
-
public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
{
out.writeUTF(sliceCommand.keyspace);
out.writeUTF(sliceCommand.column_family);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(sliceCommand.timestamp);
+
IDiskAtomFilter filter = sliceCommand.predicate;
if (version < MessagingService.VERSION_20)
{
@@ -199,6 +187,8 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
IDiskAtomFilter predicate;
@@ -234,7 +224,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
}
- List<IndexExpression> rowFilter = null;
+ List<IndexExpression> rowFilter;
int filterCount = in.readInt();
rowFilter = new ArrayList<IndexExpression>(filterCount);
for (int i = 0; i < filterCount; i++)
@@ -250,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
int maxResults = in.readInt();
boolean countCQL3Rows = in.readBoolean();
boolean isPaging = in.readBoolean();
- return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+ return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
}
public long serializedSize(RangeSliceCommand rsc, int version)
@@ -258,6 +248,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
size += TypeSizes.NATIVE.sizeof(rsc.column_family);
+ if (version >= MessagingService.VERSION_20)
+ size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+
IDiskAtomFilter filter = rsc.predicate;
if (version < MessagingService.VERSION_20)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 9342570..e30cd5b 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -237,11 +237,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
}
}
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Column column, long now)
{
for (RangeTombstone tombstone : ranges)
{
- if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+ if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column, now))
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3cff8b6..61a2478 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -35,10 +35,10 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.RowDataResolver;
-
public abstract class ReadCommand implements IReadCommand
{
- public enum Type {
+ public enum Type
+ {
GET_BY_NAMES((byte)1),
GET_SLICES((byte)2);
@@ -65,23 +65,25 @@ public abstract class ReadCommand implements IReadCommand
public final String table;
public final String cfName;
public final ByteBuffer key;
+ public final long timestamp;
private boolean isDigestQuery = false;
protected final Type commandType;
- protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
+ protected ReadCommand(String table, ByteBuffer key, String cfName, long timestamp, Type cmdType)
{
this.table = table;
this.key = key;
this.cfName = cfName;
+ this.timestamp = timestamp;
this.commandType = cmdType;
}
- public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+ public static ReadCommand create(String table, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
{
if (filter instanceof SliceQueryFilter)
- return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ return new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
else
- return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ return new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
}
public boolean isDigestQuery()
@@ -143,7 +145,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
newCommand.setDigestQuery(command.isDigestQuery());
superColumn = scFilter.scName;
}
@@ -187,7 +189,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
newCommand.setDigestQuery(command.isDigestQuery());
superColumn = scFilter.scName;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index 52bad89..7ca57a8 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -19,25 +19,26 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.filter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+
public class RetriedSliceFromReadCommand extends SliceFromReadCommand
{
static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
public final int originalCount;
- public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
+ public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
{
- super(table, key, cfName, filter);
+ super(table, key, cfName, timestamp, filter);
this.originalCount = originalCount;
}
@Override
public ReadCommand copy()
{
- ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
+ ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, timestamp, filter, originalCount);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 49aa426..13e6f67 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -54,9 +54,9 @@ public class Row
')';
}
- public int getLiveCount(IDiskAtomFilter filter)
+ public int getLiveCount(IDiskAtomFilter filter, long now)
{
- return cf == null ? 0 : filter.getLiveCount(cf);
+ return cf == null ? 0 : filter.getLiveCount(cf, now);
}
public static class RowSerializer implements IVersionedSerializer<Row>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 4588146..c9f715c 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -52,11 +52,11 @@ public class RowIteratorFactory
* @return A row iterator following all the given restrictions
*/
public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
- final Collection<SSTableReader> sstables,
- final RowPosition startWith,
- final RowPosition stopAt,
- final QueryFilter filter,
- final ColumnFamilyStore cfs)
+ final Collection<SSTableReader> sstables,
+ final RowPosition startWith,
+ final RowPosition stopAt,
+ final QueryFilter filter,
+ final ColumnFamilyStore cfs)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -76,7 +76,7 @@ public class RowIteratorFactory
// reduce rows from all sources into a single row
return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
{
- private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ private final int gcBefore = cfs.gcBefore(filter.timestamp);
private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
private DecoratedKey key;
private ColumnFamily returnCF;
@@ -106,8 +106,8 @@ public class RowIteratorFactory
}
else
{
- QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
- returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
+ QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+ returnCF = cfs.filterColumnFamily(cached, keyFilter);
}
Row rv = new Row(key, returnCF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 909ba76..2942249 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -36,15 +36,15 @@ public class SliceByNamesReadCommand extends ReadCommand
public final NamesQueryFilter filter;
- public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
+ public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
{
- super(table, key, cfName, Type.GET_BY_NAMES);
+ super(table, key, cfName, timestamp, Type.GET_BY_NAMES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
+ ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -52,7 +52,7 @@ public class SliceByNamesReadCommand extends ReadCommand
public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, cfName, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
@Override
@@ -62,6 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
"table='" + table + '\'' +
", key=" + ByteBufferUtil.bytesToHex(key) +
", cfName='" + cfName + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", filter=" + filter +
')';
}
@@ -91,6 +92,9 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
else
out.writeUTF(command.cfName);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(cmd.timestamp);
+
NamesQueryFilter.serializer.serialize(command.filter, out, version);
}
@@ -113,6 +117,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
cfName = in.readUTF();
}
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
ReadCommand command;
if (version < MessagingService.VERSION_20)
@@ -135,14 +141,14 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
// Due to SC compat, it's possible we get back a slice filter at this point
if (filter instanceof NamesQueryFilter)
- command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ command = new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
else
- command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ command = new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
}
else
{
NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
- command = new SliceByNamesReadCommand(table, key, cfName, filter);
+ command = new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
}
command.setDigestQuery(isDigest);
@@ -165,15 +171,15 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
size += sizes.sizeof((short)keySize) + keySize;
if (version < MessagingService.VERSION_20)
- {
size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- }
else
- {
size += sizes.sizeof(command.cfName);
- }
+
+ if (version >= MessagingService.VERSION_20)
+ size += sizes.sizeof(cmd.timestamp);
size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
+
return size;
}
}