You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:04 UTC
[3/4] Include a timestamp with all read commands to determine column
expiration
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index be64ae1..5c42de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -46,15 +46,15 @@ public class SliceFromReadCommand extends ReadCommand
public final SliceQueryFilter filter;
- public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
+ public SliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
{
- super(table, key, cfName, Type.GET_SLICES);
+ super(table, key, cfName, timestamp, Type.GET_SLICES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
+ ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -62,7 +62,7 @@ public class SliceFromReadCommand extends ReadCommand
public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, cfName, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
@Override
@@ -77,7 +77,7 @@ public class SliceFromReadCommand extends ReadCommand
if (maxLiveColumns < count)
return null;
- int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+ int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
if (liveCountInRow < getOriginalRequestedCount())
{
// We asked t (= count) live columns and got l (=liveCountInRow) ones.
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
// round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
- return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
+ return new RetriedSliceFromReadCommand(table, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
}
return null;
@@ -98,7 +98,7 @@ public class SliceFromReadCommand extends ReadCommand
if ((row == null) || (row.cf == null))
return;
- filter.trim(row.cf, getOriginalRequestedCount());
+ filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
}
public IDiskAtomFilter filter()
@@ -123,6 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
"table='" + table + '\'' +
", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
", cfName='" + cfName + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", filter='" + filter + '\'' +
')';
}
@@ -147,6 +148,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
else
out.writeUTF(realRM.cfName);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(realRM.timestamp);
+
SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
}
@@ -169,6 +173,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
cfName = in.readUTF();
}
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
SliceQueryFilter filter;
if (version < MessagingService.VERSION_20)
@@ -183,7 +189,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
filter = SliceQueryFilter.serializer.deserialize(in, version);
}
- ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+ ReadCommand command = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
command.setDigestQuery(isDigest);
return command;
}
@@ -204,15 +210,15 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
size += sizes.sizeof((short) keySize) + keySize;
if (version < MessagingService.VERSION_20)
- {
size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- }
else
- {
size += sizes.sizeof(command.cfName);
- }
+
+ if (version >= MessagingService.VERSION_20)
+ size += sizes.sizeof(cmd.timestamp);
size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index 7b8e4d4..c1933ad 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,10 +51,11 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
if (exhausted)
return null;
+ long now = System.currentTimeMillis();
SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
- QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
+ QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
ColumnFamily cf = cfs.getColumnFamily(filter);
- if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
+ if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
{
exhausted = true;
}
@@ -62,7 +63,7 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
{
Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
Column lastColumn = iter.next();
- while (lastColumn.isMarkedForDelete())
+ while (lastColumn.isMarkedForDelete(now))
lastColumn = iter.next();
int i = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 753cd91..54f59d6 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -102,13 +102,13 @@ public class SuperColumns
return scMap;
}
- public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+ public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
{
// Note that there was no way to insert a range tombstone in a SCF in 1.2
cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
assert !cf.deletionInfo().rangeIterator().hasNext();
- Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, expireBefore);
+ Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE);
while (iter.hasNext())
cf.addAtom(iter.next());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index e62b258..90bfd8d 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -535,7 +535,8 @@ public class SystemTable
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
INDEX_CF,
- ByteBufferUtil.bytes(indexName));
+ ByteBufferUtil.bytes(indexName),
+ System.currentTimeMillis());
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
}
@@ -596,7 +597,8 @@ public class SystemTable
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
true,
- 1);
+ 1,
+ System.currentTimeMillis());
ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
if (cf != null && cf.getColumnCount() != 0)
return CounterId.wrap(cf.iterator().next().name());
@@ -629,7 +631,7 @@ public class SystemTable
List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
Table table = Table.open(Table.SYSTEM_KS);
- QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
+ QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
CounterId previous = null;
@@ -673,11 +675,10 @@ public class SystemTable
{
Token minToken = StorageService.getPartitioner().getMinimumToken();
- return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
- minToken.maxKeyBound()),
- Integer.MAX_VALUE,
+ return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+ null,
new IdentityQueryFilter(),
- null);
+ Integer.MAX_VALUE);
}
public static Collection<RowMutation> serializeSchema()
@@ -729,7 +730,7 @@ public class SystemTable
DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
- ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
+ ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
return new Row(key, result);
}
@@ -743,7 +744,8 @@ public class SystemTable
DefsTable.searchComposite(cfName, true),
DefsTable.searchComposite(cfName, false),
false,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
return new Row(key, result);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 90f6dfa..6c9f50d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -628,7 +628,7 @@ public class CompactionManager implements CompactionManagerMBean
// this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
// time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
// 'as good as in the non-snapshot' case)
- gcBefore = (int) (cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
}
else
{
@@ -731,9 +731,7 @@ public class CompactionManager implements CompactionManagerMBean
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
// add any GcGrace however since 2ndary indexes are local to a node.
- return cfs.isIndex()
- ? (int) (System.currentTimeMillis() / 1000)
- : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis());
}
private static class ValidationCompactionIterable extends CompactionIterable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e9cd2f1..32361b2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -209,7 +209,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
Column column = (Column) current;
container.addColumn(column);
if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete()
+ && !column.isMarkedForDelete(System.currentTimeMillis())
&& !container.getColumn(column.name()).equals(column))
{
indexer.remove(column);
@@ -247,7 +247,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
// not the range tombstone. For that we use the columnIndexer tombstone tracker.
// Note that this doesn't work for super columns.
- if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+ if (indexBuilder.tombstoneTracker().isDeleted(reduced, System.currentTimeMillis()))
return null;
columns++;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index f69a955..85f2a23 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -133,7 +133,7 @@ public class PrecompactedRow extends AbstractCompactedRow
{
container.addColumn(column);
if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete()
+ && !column.isMarkedForDelete(System.currentTimeMillis())
&& !container.getColumn(column.name()).equals(column))
{
indexer.remove(column);
@@ -149,7 +149,7 @@ public class PrecompactedRow extends AbstractCompactedRow
};
Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
- filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
+ filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 697ad02..a5d54ce 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -31,18 +31,24 @@ public class ColumnCounter
{
protected int live;
protected int ignored;
+ protected final long timestamp;
+
+ public ColumnCounter(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
public void count(Column column, ColumnFamily container)
{
- if (!isLive(column, container))
+ if (!isLive(column, container, timestamp))
ignored++;
else
live++;
}
- protected static boolean isLive(Column column, ColumnFamily container)
+ protected static boolean isLive(Column column, ColumnFamily container, long timestamp)
{
- return column.isLive() && (!container.deletionInfo().isDeleted(column));
+ return column.isLive(timestamp) && (!container.deletionInfo().isDeleted(column));
}
public int live()
@@ -71,8 +77,9 @@ public class ColumnCounter
* column. If 0, all columns are grouped, otherwise we group
* those for which the {@code toGroup} first component are equals.
*/
- public GroupByPrefix(CompositeType type, int toGroup)
+ public GroupByPrefix(long timestamp, CompositeType type, int toGroup)
{
+ super(timestamp);
this.type = type;
this.toGroup = toGroup;
@@ -81,7 +88,7 @@ public class ColumnCounter
public void count(Column column, ColumnFamily container)
{
- if (!isLive(column, container))
+ if (!isLive(column, container, timestamp))
{
ignored++;
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 064cc6e..d061ac2 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -44,34 +44,42 @@ public abstract class ExtendedFilter
private static final Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
public final ColumnFamilyStore cfs;
+ public final long timestamp;
protected final IDiskAtomFilter originalFilter;
private final int maxResults;
private final boolean countCQL3Rows;
private final boolean isPaging;
- public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public static ExtendedFilter create(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
if (clause == null || clause.isEmpty())
{
- return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging);
+ return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
}
else
{
if (isPaging)
throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
return cfs.getComparator() instanceof CompositeType
- ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows)
- : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows);
+ ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
+ : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
}
}
- protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
{
assert cfs != null;
assert filter != null;
this.cfs = cfs;
this.originalFilter = filter;
this.maxResults = maxResults;
+ this.timestamp = timestamp;
this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
if (countCQL3Rows)
@@ -112,7 +120,7 @@ public abstract class ExtendedFilter
if (initialFilter() instanceof SliceQueryFilter)
return ((SliceQueryFilter)initialFilter()).lastCounted();
else
- return initialFilter().getLiveCount(data);
+ return initialFilter().getLiveCount(data, timestamp);
}
/** The initial filter we'll do our first slice with (either the original or a superset of it) */
@@ -167,9 +175,14 @@ public abstract class ExtendedFilter
protected final List<IndexExpression> clause;
protected final IDiskAtomFilter initialFilter;
- public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+ public FilterWithClauses(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows)
{
- super(cfs, filter, maxResults, countCQL3Rows, false);
+ super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
assert clause != null;
this.clause = clause;
this.initialFilter = computeInitialFilter();
@@ -271,7 +284,7 @@ public abstract class ExtendedFilter
return data;
ColumnFamily pruned = data.cloneMeShallow();
OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
- originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore());
+ originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
return pruned;
}
@@ -335,9 +348,14 @@ public abstract class ExtendedFilter
private static class FilterWithCompositeClauses extends FilterWithClauses
{
- public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+ public FilterWithCompositeClauses(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows)
{
- super(cfs, filter, clause, maxResults, countCQL3Rows);
+ super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
}
/*
@@ -359,9 +377,14 @@ public abstract class ExtendedFilter
private static class EmptyClauseFilter extends ExtendedFilter
{
- public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public EmptyClauseFilter(ColumnFamilyStore cfs,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
- super(cfs, filter, maxResults, countCQL3Rows, isPaging);
+ super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
}
public IDiskAtomFilter initialFilter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index e032f1d..35f71e5 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,14 +66,14 @@ public interface IDiskAtomFilter
* by the filter code, which should have some limit on the number of columns
* to avoid running out of memory on large rows.
*/
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now);
public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
public boolean isReversed();
public void updateColumnsLimit(int newLimit);
- public int getLiveCount(ColumnFamily cf);
+ public int getLiveCount(ColumnFamily cf, long now);
public IDiskAtomFilter cloneShallow();
public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 570eb29..297f227 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -90,7 +90,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
}
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
while (reducedColumns.hasNext())
container.addIfRelevant(reducedColumns.next(), gcBefore);
@@ -118,15 +118,15 @@ public class NamesQueryFilter implements IDiskAtomFilter
{
}
- public int getLiveCount(ColumnFamily cf)
+ public int getLiveCount(ColumnFamily cf, long now)
{
if (countCQL3Rows)
- return cf.hasOnlyTombstones() ? 0 : 1;
+ return cf.hasOnlyTombstones(now) ? 0 : 1;
int count = 0;
for (Column column : cf)
{
- if (column.isLive())
+ if (column.isLive(now))
count++;
}
return count;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 126b240..ac0c632 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,12 +33,14 @@ public class QueryFilter
public final DecoratedKey key;
public final String cfName;
public final IDiskAtomFilter filter;
+ public final long timestamp;
- public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
+ public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
{
this.key = key;
this.cfName = cfName;
this.filter = filter;
+ this.timestamp = timestamp;
}
public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -79,7 +81,7 @@ public class QueryFilter
public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
{
Iterator<Column> columns = gatherTombstones(returnCF, toCollate);
- filter.collectReducedColumns(returnCF, columns, gcBefore);
+ filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
}
public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
@@ -107,7 +109,7 @@ public class QueryFilter
};
Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
- filter.collectReducedColumns(returnCF, reduced, gcBefore);
+ filter.collectReducedColumns(returnCF, reduced, gcBefore, timestamp);
}
/**
@@ -178,19 +180,26 @@ public class QueryFilter
* @param finish column to stop slice at, inclusive; empty for "the last column"
* @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
* @param limit maximum number of non-deleted columns to return
+ * @param timestamp time to use for determining expiring columns' state
*/
- public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+ public static QueryFilter getSliceFilter(DecoratedKey key,
+ String cfName,
+ ByteBuffer start,
+ ByteBuffer finish,
+ boolean reversed,
+ int limit,
+ long timestamp)
{
- return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
+ return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
}
/**
* return a QueryFilter object that includes every column in the row.
* This is dangerous on large rows; avoid except for test code.
*/
- public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
+ public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
{
- return new QueryFilter(key, cfName, new IdentityQueryFilter());
+ return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
}
/**
@@ -199,17 +208,17 @@ public class QueryFilter
* @param cfName column family to query
* @param columns the column names to restrict the results to, sorted in comparator order
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns, long timestamp)
{
- return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
}
/**
* convenience method for creating a name filter matching a single column
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column, long timestamp)
{
- return new QueryFilter(key, cfName, new NamesQueryFilter(column));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(column), timestamp);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index fda7ac5..b76ce04 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -120,9 +120,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
}
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
- columnCounter = getColumnCounter(container);
+ columnCounter = getColumnCounter(container, now);
while (reducedColumns.hasNext())
{
@@ -142,28 +142,28 @@ public class SliceQueryFilter implements IDiskAtomFilter
Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
}
- public int getLiveCount(ColumnFamily cf)
+ public int getLiveCount(ColumnFamily cf, long now)
{
- ColumnCounter counter = getColumnCounter(cf);
+ ColumnCounter counter = getColumnCounter(cf, now);
for (Column column : cf)
counter.count(column, cf);
return counter.live();
}
- private ColumnCounter getColumnCounter(ColumnFamily container)
+ private ColumnCounter getColumnCounter(ColumnFamily container, long now)
{
AbstractType<?> comparator = container.getComparator();
if (compositesToGroup < 0)
- return new ColumnCounter();
+ return new ColumnCounter(now);
else if (compositesToGroup == 0)
- return new ColumnCounter.GroupByPrefix(null, 0);
+ return new ColumnCounter.GroupByPrefix(now, null, 0);
else
- return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+ return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
}
- public void trim(ColumnFamily cf, int trimTo)
+ public void trim(ColumnFamily cf, int trimTo, long now)
{
- ColumnCounter counter = getColumnCounter(cf);
+ ColumnCounter counter = getColumnCounter(cf, now);
Collection<Column> columns = reversed
? cf.getReverseSortedColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f12acdc..4d0914a 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -91,7 +91,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
public void delete(ByteBuffer rowKey, Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index af370d5..17ac81f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -509,12 +509,17 @@ public class SecondaryIndexManager
* Performs a search across a number of column indexes
* TODO: add support for querying across index types
*
- * @param clause the index query clause
* @param range the row range to restrict to
- * @param dataFilter the column range to restrict to
+ * @param clause the index query clause
+ * @param columnFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
@@ -525,8 +530,7 @@ public class SecondaryIndexManager
if (indexSearchers.size() > 1)
throw new RuntimeException("Unable to search across multiple secondary index types");
-
- return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows);
+ return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
}
public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -584,7 +588,7 @@ public class SecondaryIndexManager
public void insert(Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
for (SecondaryIndex index : indexFor(column.name()))
@@ -605,7 +609,7 @@ public class SecondaryIndexManager
{
// insert the new value before removing the old one, so we never have a period
// where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
- if (!column.isMarkedForDelete())
+ if (!column.isMarkedForDelete(System.currentTimeMillis()))
((PerColumnSecondaryIndex) index).insert(key.key, column);
((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
}
@@ -614,7 +618,7 @@ public class SecondaryIndexManager
public void remove(Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
for (SecondaryIndex index : indexFor(column.name()))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 16ac091..ddd79dd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -39,7 +39,12 @@ public abstract class SecondaryIndexSearcher
this.baseCfs = indexManager.baseCfs;
}
- public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows);
+ public abstract List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows);
/**
* @return true this index is able to handle given clauses.
@@ -49,16 +54,6 @@ public abstract class SecondaryIndexSearcher
return highestSelectivityPredicate(clause) != null;
}
- protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
- {
- Column liveColumn = liveData.getColumn(indexedColumnName);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
- return true;
-
- ByteBuffer liveValue = liveColumn.value();
- return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
- }
-
protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
{
IndexExpression best = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 131b8c6..0720e83 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -93,7 +93,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
- public abstract boolean isStale(IndexedEntry entry, ColumnFamily data);
+ public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
public void delete(IndexedEntry entry)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 8ad990e..b767d6c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -107,9 +107,9 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- return data == null || data.hasOnlyTombstones();
+ return data == null || data.hasOnlyTombstones(now);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index 2034c71..7a00de8 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -95,9 +95,9 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- return data == null || data.hasOnlyTombstones();
+ return data == null || data.hasOnlyTombstones(now);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 40a2ee1..7159c23 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -91,11 +91,11 @@ public class CompositesIndexOnRegular extends CompositesIndex
&& comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
Column liveColumn = data.getColumn(bb);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
+ if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
ByteBuffer liveValue = liveColumn.value();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e16d94d..e8c0a09 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -21,18 +21,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class CompositesSearcher extends SecondaryIndexSearcher
{
@@ -44,10 +44,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
@@ -69,7 +74,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return isStart ? builder.build() : builder.buildAsEndOfRange();
}
- public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -80,8 +85,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
if (logger.isDebugEnabled())
- logger.debug("Most-selective indexed predicate is {}",
- ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
+ logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
/*
* XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
@@ -155,14 +159,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (logger.isTraceEnabled())
logger.trace("Scanning index {} starting with {}",
- ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
+ index.expressionString(primary), indexComparator.getString(startPrefix));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
index.getIndexCfs().name,
lastSeenPrefix,
endPrefix,
false,
- rowsPerQuery);
+ rowsPerQuery,
+ filter.timestamp);
ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
if (indexRow == null || indexRow.getColumnCount() == 0)
return makeReturn(currentKey, data);
@@ -185,7 +190,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
{
Column column = indexColumns.poll();
lastSeenPrefix = column.name();
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(filter.timestamp))
{
logger.trace("skipping {}", column.name());
continue;
@@ -242,8 +247,8 @@ public class CompositesSearcher extends SecondaryIndexSearcher
false,
Integer.MAX_VALUE,
baseCfs.metadata.clusteringKeyColumns().size());
- ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
- if (index.isStale(entry, newData))
+ ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
+ if (index.isStale(entry, newData, filter.timestamp))
{
index.delete(entry);
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index cd89e92..f47b1e1 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -48,10 +48,10 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
return new KeysSearcher(baseCfs.indexManager, columns);
}
- public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data)
+ public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
{
Column liveColumn = data.getColumn(columnDef.name);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
+ if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
ByteBuffer liveValue = liveColumn.value();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 8901bf5..e919d8a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -33,8 +36,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class KeysSearcher extends SecondaryIndexSearcher
{
@@ -46,14 +47,19 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
- public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -106,7 +112,8 @@ public class KeysSearcher extends SecondaryIndexSearcher
lastSeenKey,
endKey,
false,
- rowsPerQuery);
+ rowsPerQuery,
+ filter.timestamp);
ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
logger.trace("fetched {}", indexRow);
if (indexRow == null)
@@ -139,7 +146,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
{
Column column = indexColumns.next();
lastSeenKey = column.name();
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(filter.timestamp))
{
logger.trace("skipping {}", column.name());
continue;
@@ -158,7 +165,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -168,12 +175,12 @@ public class KeysSearcher extends SecondaryIndexSearcher
IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
if (extraFilter != null)
{
- ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
+ ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
if (cf != null)
data.addAll(cf, HeapAllocator.instance);
}
- if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data))
+ if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
{
// delete the index entry w/ its own timestamp
Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d35e14d..65164ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -181,7 +181,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
{
ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
// since we already read column count, just pass that value and continue deserialization
- columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
+ Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, columnCount, flag, expireBefore, dataVersion);
+ while (iter.hasNext())
+ cf.addAtom(iter.next());
+
if (validateColumns)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8119388..f63ea6a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1076,12 +1076,12 @@ public class SSTableReader extends SSTable
return getScanner((RateLimiter) null);
}
- public SSTableScanner getScanner(RateLimiter limiter)
- {
- return new SSTableScanner(this, null, limiter);
- }
+ public SSTableScanner getScanner(RateLimiter limiter)
+ {
+ return new SSTableScanner(this, null, limiter);
+ }
- /**
+ /**
* Direct I/O SSTableScanner over a defined range of tokens.
*
* @param range the range of keys to cover
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b692ab0..82ecbe9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -836,7 +836,7 @@ public class ActiveRepairService
if (isSequential)
makeSnapshots(endpoints);
- int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+ int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
for (InetAddress endpoint : allEndpoints)
treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index d301507..b3cef49 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -300,7 +300,7 @@ public class CacheService implements CacheServiceMBean
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(buffer);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
}
});
@@ -311,7 +311,7 @@ public class CacheService implements CacheServiceMBean
for (ByteBuffer key : buffers)
{
DecoratedKey dk = cfs.partitioner.decorateKey(key);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
if (data != null)
rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 5ffcdbe..4aab87d 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -48,13 +48,15 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
};
private final String table;
+ private final long timestamp;
private List<InetAddress> sources;
protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>();
- public RangeSliceResponseResolver(String table)
+ public RangeSliceResponseResolver(String table, long timestamp)
{
this.table = table;
+ this.timestamp = timestamp;
}
public void setSources(List<InetAddress> endpoints)
@@ -142,7 +144,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
protected Row getReduced()
{
ColumnFamily resolved = versions.size() > 1
- ? RowDataResolver.resolveSuperset(versions)
+ ? RowDataResolver.resolveSuperset(versions, timestamp)
: versions.get(0);
if (versions.size() < sources.size())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index b6f257e..f63fcb1 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -35,9 +35,20 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
{
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
if (cfs.indexManager.hasIndexFor(command.row_filter))
- return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows);
+ return cfs.search(command.range,
+ command.row_filter,
+ command.predicate,
+ command.maxResults,
+ command.timestamp,
+ command.countCQL3Rows);
else
- return cfs.getRangeSlice(command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
+ return cfs.getRangeSlice(command.range,
+ command.row_filter,
+ command.predicate,
+ command.maxResults,
+ command.timestamp,
+ command.countCQL3Rows,
+ command.isPaging);
}
public void doVerb(MessageIn<RangeSliceCommand> message, int id)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index fe7f4d7..7cb5a23 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -186,7 +186,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
ReadRepairMetrics.repairedBackground.mark();
ReadCommand readCommand = (ReadCommand) command;
- final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
+ final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter(), readCommand.timestamp);
AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 8533f4f..69cd381 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -39,11 +39,13 @@ public class RowDataResolver extends AbstractRowResolver
private int maxLiveCount = 0;
public List<AsyncOneResponse> repairResults = Collections.emptyList();
private final IDiskAtomFilter filter;
+ private final long timestamp;
- public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+ public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp)
{
super(key, table);
this.filter = qFilter;
+ this.timestamp = timestamp;
}
/*
@@ -74,12 +76,12 @@ public class RowDataResolver extends AbstractRowResolver
endpoints.add(message.from);
// compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+ int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp);
if (liveCount > maxLiveCount)
maxLiveCount = liveCount;
}
- resolved = resolveSuperset(versions);
+ resolved = resolveSuperset(versions, timestamp);
if (logger.isDebugEnabled())
logger.debug("versions merged");
@@ -125,7 +127,7 @@ public class RowDataResolver extends AbstractRowResolver
return results;
}
- static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+ static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now)
{
assert Iterables.size(versions) > 0;
@@ -146,7 +148,7 @@ public class RowDataResolver extends AbstractRowResolver
// mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
// this will handle removing columns and subcolumns that are supressed by a row or
// supercolumn tombstone.
- QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter());
+ QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now);
List<CloseableIterator<Column>> iters = new ArrayList<CloseableIterator<Column>>();
for (ColumnFamily version : versions)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1383be7..9d2f6c1 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -212,9 +212,13 @@ public class StorageProxy implements StorageProxyMBean
// read the current value and compare with expected
Tracing.trace("Reading existing values for CAS precondition");
- ReadCommand readCommand = expected == null
- ? new SliceFromReadCommand(table, key, cfName, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1))
- : new SliceByNamesReadCommand(table, key, cfName, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+ long timestamp = System.currentTimeMillis();
+ IDiskAtomFilter filter = expected == null
+ ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+ : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames()));
+ ReadCommand readCommand = filter instanceof SliceQueryFilter
+ ? new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter) filter)
+ : new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter) filter);
List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
ColumnFamily current = rows.get(0).cf;
if (!casApplies(expected, current))
@@ -245,16 +249,18 @@ public class StorageProxy implements StorageProxyMBean
throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1);
}
- private static boolean hasLiveColumns(ColumnFamily cf)
+ private static boolean hasLiveColumns(ColumnFamily cf, long now)
{
- return cf != null && !cf.hasOnlyTombstones();
+ return cf != null && !cf.hasOnlyTombstones(now);
}
private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
{
- if (!hasLiveColumns(expected))
- return !hasLiveColumns(current);
- else if (!hasLiveColumns(current))
+ long now = System.currentTimeMillis();
+
+ if (!hasLiveColumns(expected, now))
+ return !hasLiveColumns(current, now);
+ else if (!hasLiveColumns(current, now))
return false;
// current has been built from expected, so we know that it can't have columns
@@ -264,14 +270,14 @@ public class StorageProxy implements StorageProxyMBean
for (Column e : expected)
{
Column c = current.getColumn(e.name());
- if (e.isLive())
+ if (e.isLive(now))
{
- if (!(c != null && c.isLive() && c.value().equals(e.value())))
+ if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
return false;
}
else
{
- if (c != null && c.isLive())
+ if (c != null && c.isLive(now))
return false;
}
}
@@ -1179,7 +1185,7 @@ public class StorageProxy implements StorageProxyMBean
ReadRepairMetrics.repairedBlocking.mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+ RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
if (repairCommands == null)
@@ -1397,6 +1403,7 @@ public class StorageProxy implements StorageProxyMBean
RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
command.column_family,
+ command.timestamp,
commandPredicate,
range,
command.row_filter,
@@ -1405,7 +1412,7 @@ public class StorageProxy implements StorageProxyMBean
command.isPaging);
// collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
handler.assureSufficientLiveNodes();
resolver.setSources(filteredEndpoints);
@@ -1431,7 +1438,7 @@ public class StorageProxy implements StorageProxyMBean
{
rows.add(row);
if (nodeCmd.countCQL3Rows)
- cql3RowCount += row.getLiveCount(commandPredicate);
+ cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}