You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/06/18 18:16:02 UTC
[1/4] Include a timestamp with all read commands to determine column
expiration
Updated Branches:
refs/heads/trunk 62295f68c -> 1f7628ce7
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 87700fe..cc15fe7 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -48,24 +48,25 @@ public class ReadMessageTest extends SchemaLoader
ReadCommand rm, rm2;
DecoratedKey dk = Util.dk("row1");
+ long ts = System.currentTimeMillis();
- rm = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", new NamesQueryFilter(colList));
+ rm = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", ts, new NamesQueryFilter(colList));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
- rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
+ rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
- rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
+ rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
rm2 = serializeAndDeserializeReadMessage(rm);
assertEquals(rm2.toString(), rm.toString());
- rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
+ rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2));
rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
- rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
+ rm = new SliceFromReadCommand("Keyspace1", dk.key, "Standard1", ts, new SliceQueryFilter(ByteBufferUtil.bytes("a"), ByteBufferUtil.bytes("z"), true, 5));
rm2 = serializeAndDeserializeReadMessage(rm);
assertEquals(rm2.toString(), rm.toString());
}
@@ -93,7 +94,7 @@ public class ReadMessageTest extends SchemaLoader
rm.add("Standard1", ByteBufferUtil.bytes("Column1"), ByteBufferUtil.bytes("abcd"), 0);
rm.apply();
- ReadCommand command = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", new NamesQueryFilter(ByteBufferUtil.bytes("Column1")));
+ ReadCommand command = new SliceByNamesReadCommand("Keyspace1", dk.key, "Standard1", System.currentTimeMillis(), new NamesQueryFilter(ByteBufferUtil.bytes("Column1")));
Row row = command.getRow(table);
Column col = row.cf.getColumn(ByteBufferUtil.bytes("Column1"));
assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 38b3385..5e34e91 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -72,8 +72,10 @@ public class RecoveryManagerTruncateTest extends SchemaLoader
{
return null;
}
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(
- Util.dk(keyName), cfName, ByteBufferUtil.bytes(columnName)));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk(keyName),
+ cfName,
+ ByteBufferUtil.bytes(columnName),
+ System.currentTimeMillis()));
if (cf == null)
{
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
index fb42361..345f967 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
@@ -51,7 +51,7 @@ public class RemoveColumnFamilyTest extends SchemaLoader
rm.delete("Standard1", 1);
rm.apply();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+ ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
assert retrieved.isMarkedForDelete();
assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
index b32b3a9..bd97e00 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
@@ -53,7 +53,7 @@ public class RemoveColumnFamilyWithFlush1Test extends SchemaLoader
rm.delete("Standard1", 1);
rm.apply();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+ ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
assert retrieved.isMarkedForDelete();
assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
index d59d09b..037c5dd 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
@@ -51,7 +51,7 @@ public class RemoveColumnFamilyWithFlush2Test extends SchemaLoader
rm.apply();
store.forceBlockingFlush();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1"));
+ ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis()));
assert retrieved.isMarkedForDelete();
assertNull(retrieved.getColumn(ByteBufferUtil.bytes("Column1")));
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveColumnTest.java b/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
index 4c3f3aa..eccbfbc 100644
--- a/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveColumnTest.java
@@ -54,10 +54,16 @@ public class RemoveColumnTest extends SchemaLoader
rm.delete("Standard1", ByteBufferUtil.bytes("Column1"), 1);
rm.apply();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1")));
- assert retrieved.getColumn(ByteBufferUtil.bytes("Column1")).isMarkedForDelete();
+ ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getNamesFilter(dk,
+ "Standard1",
+ ByteBufferUtil.bytes("Column1"),
+ System.currentTimeMillis()));
+ assert retrieved.getColumn(ByteBufferUtil.bytes("Column1")).isMarkedForDelete(System.currentTimeMillis());
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
- assertNull(Util.cloneAndRemoveDeleted(store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Standard1")), Integer.MAX_VALUE));
+ assertNull(Util.cloneAndRemoveDeleted(store.getColumnFamily(QueryFilter.getIdentityFilter(dk,
+ "Standard1",
+ System.currentTimeMillis())),
+ Integer.MAX_VALUE));
}
@Test
@@ -67,15 +73,15 @@ public class RemoveColumnTest extends SchemaLoader
long timestamp = System.currentTimeMillis();
int localDeletionTime = (int) (timestamp / 1000);
Column c = DeletedColumn.create(localDeletionTime, timestamp, "dc1");
- assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+ assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
// Simulate a node that is 30 seconds behind
c = DeletedColumn.create(localDeletionTime + 30, timestamp + 30000, "dc2");
- assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+ assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
// Simulate a node that is 30 ahead behind
c = DeletedColumn.create(localDeletionTime - 30, timestamp - 30000, "dc3");
- assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete());
+ assertTrue("DeletedColumn was not marked for delete", c.isMarkedForDelete(timestamp));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java b/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
index 08971a7..d88ee60 100644
--- a/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
+++ b/test/unit/org/apache/cassandra/db/RemoveSubColumnTest.java
@@ -58,8 +58,8 @@ public class RemoveSubColumnTest extends SchemaLoader
rm.delete("Super1", cname, 1);
rm.apply();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1"));
- assert retrieved.getColumn(cname).isMarkedForDelete();
+ ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1", System.currentTimeMillis()));
+ assert retrieved.getColumn(cname).isMarkedForDelete(System.currentTimeMillis());
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
}
@@ -86,7 +86,7 @@ public class RemoveSubColumnTest extends SchemaLoader
// Mark current time and make sure the next insert happens at least
// one second after the previous one (since gc resolution is the second)
- int gcbefore = (int)(System.currentTimeMillis() / 1000);
+ QueryFilter filter = QueryFilter.getIdentityFilter(dk, "Super1", System.currentTimeMillis());
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
// remove the column itself
@@ -94,8 +94,8 @@ public class RemoveSubColumnTest extends SchemaLoader
rm.delete("Super1", cname, 2);
rm.apply();
- ColumnFamily retrieved = store.getColumnFamily(QueryFilter.getIdentityFilter(dk, "Super1"), gcbefore);
- assert retrieved.getColumn(cname).isMarkedForDelete();
+ ColumnFamily retrieved = store.getColumnFamily(filter);
+ assert retrieved.getColumn(cname).isMarkedForDelete(System.currentTimeMillis());
assertNull(Util.cloneAndRemoveDeleted(retrieved, Integer.MAX_VALUE));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 3dfef4d..11fe86c 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -62,12 +62,22 @@ public class RowCacheTest extends SchemaLoader
{
DecoratedKey key = Util.dk("key" + i);
- cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ cachedStore.getColumnFamily(key,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1,
+ System.currentTimeMillis());
assert CacheService.instance.rowCache.size() == i + 1;
assert cachedStore.containsCachedRow(key); // current key should be stored in the cache
// checking if column is read correctly after cache
- ColumnFamily cf = cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ ColumnFamily cf = cachedStore.getColumnFamily(key,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1,
+ System.currentTimeMillis());
Collection<Column> columns = cf.getSortedColumns();
Column column = columns.iterator().next();
@@ -84,11 +94,21 @@ public class RowCacheTest extends SchemaLoader
{
DecoratedKey key = Util.dk("key" + i);
- cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ cachedStore.getColumnFamily(key,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1,
+ System.currentTimeMillis());
assert cachedStore.containsCachedRow(key); // cache should be populated with the latest rows read (old ones should be popped)
// checking if column is read correctly after cache
- ColumnFamily cf = cachedStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ ColumnFamily cf = cachedStore.getColumnFamily(key,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1,
+ System.currentTimeMillis());
Collection<Column> columns = cf.getSortedColumns();
Column column = columns.iterator().next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 5babea5..e45977a 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -67,12 +67,12 @@ public class RowTest extends SchemaLoader
public void testExpiringColumnExpiration()
{
Column c = new ExpiringColumn(ByteBufferUtil.bytes("one"), ByteBufferUtil.bytes("A"), 0, 1);
- assert !c.isMarkedForDelete();
+ assert !c.isMarkedForDelete(System.currentTimeMillis());
// Because we keep the local deletion time with a precision of a
// second, we could have to wait 2 seconds in worst case scenario.
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
- assert c.isMarkedForDelete() && c.getMarkedForDeleteAt() == 0;
+ assert c.isMarkedForDelete(System.currentTimeMillis()) && c.getMarkedForDeleteAt() == 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index 267353f..728a03c 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -68,17 +68,17 @@ public class SerializationsTest extends AbstractSerializationsTester
IPartitioner part = StorageService.getPartitioner();
AbstractBounds<RowPosition> bounds = new Range<Token>(part.getRandomToken(), part.getRandomToken()).toRowBounds();
- RangeSliceCommand namesCmd = new RangeSliceCommand(statics.KS, "Standard1", namesPred, bounds, 100);
+ RangeSliceCommand namesCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, namesPred, bounds, 100);
MessageOut<RangeSliceCommand> namesCmdMsg = namesCmd.createMessage();
- RangeSliceCommand emptyRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", emptyRangePred, bounds, 100);
+ RangeSliceCommand emptyRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, emptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> emptyRangeCmdMsg = emptyRangeCmd.createMessage();
- RangeSliceCommand regRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", nonEmptyRangePred, bounds, 100);
+ RangeSliceCommand regRangeCmd = new RangeSliceCommand(statics.KS, "Standard1", statics.readTs, nonEmptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdMsg = regRangeCmd.createMessage();
- RangeSliceCommand namesCmdSup = new RangeSliceCommand(statics.KS, "Super1", namesSCPred, bounds, 100);
+ RangeSliceCommand namesCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, namesSCPred, bounds, 100);
MessageOut<RangeSliceCommand> namesCmdSupMsg = namesCmdSup.createMessage();
- RangeSliceCommand emptyRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", emptyRangePred, bounds, 100);
+ RangeSliceCommand emptyRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, emptyRangePred, bounds, 100);
MessageOut<RangeSliceCommand> emptyRangeCmdSupMsg = emptyRangeCmdSup.createMessage();
- RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", nonEmptyRangeSCPred, bounds, 100);
+ RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
DataOutputStream out = getOutput("db.RangeSliceCommand.bin");
@@ -113,8 +113,8 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testSliceByNamesReadCommandWrite() throws IOException
{
- SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, namesPred);
- SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, namesSCPred);
+ SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
+ SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
DataOutputStream out = getOutput("db.SliceByNamesReadCommand.bin");
SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -148,8 +148,8 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testSliceFromReadCommandWrite() throws IOException
{
- SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, nonEmptyRangePred);
- SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, nonEmptyRangeSCPred);
+ SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
+ SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
DataOutputStream out = getOutput("db.SliceFromReadCommand.bin");
SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -357,8 +357,8 @@ public class SerializationsTest extends AbstractSerializationsTester
}};
private final String StandardCF = "Standard1";
private final String SuperCF = "Super1";
- private final ByteBuffer Start = ByteBufferUtil.bytes("Start");
- private final ByteBuffer Stop = ByteBufferUtil.bytes("Stop");
+
+ private final long readTs = 1369935512292L;
private final ColumnFamily StandardCf = TreeMapBackedSortedColumns.factory.create(KS, StandardCF);
private final ColumnFamily SuperCf = TreeMapBackedSortedColumns.factory.create(KS, SuperCF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java
index 8db1769..0c3b44d 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -81,13 +81,25 @@ public class TableTest extends SchemaLoader
{
ColumnFamily cf;
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard3", new TreeSet<ByteBuffer>()));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+ "Standard3",
+ new TreeSet<ByteBuffer>(),
+ System.currentTimeMillis()));
assertColumns(cf);
- cf = cfStore.getColumnFamily(QueryFilter.getSliceFilter(TEST_KEY, "Standard3", ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 0));
+ cf = cfStore.getColumnFamily(QueryFilter.getSliceFilter(TEST_KEY,
+ "Standard3",
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 0,
+ System.currentTimeMillis()));
assertColumns(cf);
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard3", ByteBufferUtil.bytes("col99")));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+ "Standard3",
+ ByteBufferUtil.bytes("col99"),
+ System.currentTimeMillis()));
assertColumns(cf);
}
};
@@ -113,10 +125,16 @@ public class TableTest extends SchemaLoader
{
ColumnFamily cf;
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard1", ByteBufferUtil.bytes("col1")));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+ "Standard1",
+ ByteBufferUtil.bytes("col1"),
+ System.currentTimeMillis()));
assertColumns(cf, "col1");
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY, "Standard1", ByteBufferUtil.bytes("col3")));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(TEST_KEY,
+ "Standard1",
+ ByteBufferUtil.bytes("col3"),
+ System.currentTimeMillis()));
assertColumns(cf, "col3");
}
};
@@ -137,13 +155,13 @@ public class TableTest extends SchemaLoader
RowMutation rm = new RowMutation("Keyspace1", key.key, cf);
rm.apply();
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 100);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 100, System.currentTimeMillis());
assertEquals(2, cf.getColumnCount());
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("b"), false, 100);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("b"), false, 100, System.currentTimeMillis());
assertEquals(1, cf.getColumnCount());
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 1);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("c"), false, 1, System.currentTimeMillis());
assertEquals(1, cf.getColumnCount());
}
@@ -193,30 +211,30 @@ public class TableTest extends SchemaLoader
assert DatabaseDescriptor.getColumnIndexSize() == 4096 : "Unexpected column index size, block boundaries won't be where tests expect them.";
// test forward, spanning a segment.
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col096"), ByteBufferUtil.bytes("col099"), false, 4);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col096"), ByteBufferUtil.bytes("col099"), false, 4, System.currentTimeMillis());
assertColumns(cf, "col096", "col097", "col098", "col099");
// test reversed, spanning a segment.
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col099"), ByteBufferUtil.bytes("col096"), true, 4);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col099"), ByteBufferUtil.bytes("col096"), true, 4, System.currentTimeMillis());
assertColumns(cf, "col096", "col097", "col098", "col099");
// test forward, within a segment.
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col100"), ByteBufferUtil.bytes("col103"), false, 4);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col100"), ByteBufferUtil.bytes("col103"), false, 4, System.currentTimeMillis());
assertColumns(cf, "col100", "col101", "col102", "col103");
// test reversed, within a segment.
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col103"), ByteBufferUtil.bytes("col100"), true, 4);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col103"), ByteBufferUtil.bytes("col100"), true, 4, System.currentTimeMillis());
assertColumns(cf, "col100", "col101", "col102", "col103");
// test forward from beginning, spanning a segment.
String[] strCols = new String[100]; // col000-col099
for (int i = 0; i < 100; i++)
strCols[i] = "col" + fmt.format(i);
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col099"), false, 100);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col099"), false, 100, System.currentTimeMillis());
assertColumns(cf, strCols);
// test reversed, from end, spanning a segment.
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col288"), true, 12);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.bytes("col288"), true, 12, System.currentTimeMillis());
assertColumns(cf, "col288", "col289", "col290", "col291", "col292", "col293", "col294", "col295", "col296", "col297", "col298", "col299");
}
};
@@ -248,7 +266,7 @@ public class TableTest extends SchemaLoader
RowMutation rm = new RowMutation("Keyspace1", ROW.key, cf);
rm.apply();
- cf = cfs.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1);
+ cf = cfs.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1, System.currentTimeMillis());
assertEquals(1, Iterables.size(cf.getColumnNames()));
assertEquals(i, cf.getColumnNames().iterator().next().getLong());
}
@@ -260,11 +278,11 @@ public class TableTest extends SchemaLoader
ColumnFamily cf;
// key before the rows that exists
- cf = cfStore.getColumnFamily(Util.dk("a"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ cf = cfStore.getColumnFamily(Util.dk("a"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
assertColumns(cf);
// key after the rows that exist
- cf = cfStore.getColumnFamily(Util.dk("z"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ cf = cfStore.getColumnFamily(Util.dk("z"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
assertColumns(cf);
}
@@ -296,26 +314,26 @@ public class TableTest extends SchemaLoader
{
ColumnFamily cf;
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
assertColumns(cf, "col5", "col7");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col4"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col4"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
assertColumns(cf, "col4", "col5", "col7");
assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col5", "col7");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col5"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
assertColumns(cf, "col3", "col4", "col5");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col6"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col6"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
assertColumns(cf, "col3", "col4", "col5");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
assertColumns(cf, "col7", "col9");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col95"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col95"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
assertColumns(cf);
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 2, System.currentTimeMillis());
assertColumns(cf);
}
};
@@ -344,11 +362,11 @@ public class TableTest extends SchemaLoader
{
ColumnFamily cf;
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2, System.currentTimeMillis());
assertColumns(cf, "col1", "col2");
assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col1");
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1, System.currentTimeMillis());
assertColumns(cf, "col2");
assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE));
}
@@ -389,7 +407,7 @@ public class TableTest extends SchemaLoader
{
ColumnFamily cf;
- cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+ cf = cfStore.getColumnFamily(ROW, ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
assertColumns(cf, "col2", "col3", "col4");
ByteBuffer col = cf.getColumn(ByteBufferUtil.bytes("col2")).value();
@@ -454,7 +472,7 @@ public class TableTest extends SchemaLoader
cfStore.forceBlockingFlush();
}
cfStore.metric.sstablesPerReadHistogram.clear();
- ColumnFamily cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes(""), ByteBufferUtil.bytes("col1499"), false, 1000);
+ ColumnFamily cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes(""), ByteBufferUtil.bytes("col1499"), false, 1000, System.currentTimeMillis());
assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
int i = 0;
for (Column c : cf.getSortedColumns())
@@ -463,7 +481,7 @@ public class TableTest extends SchemaLoader
}
assertEquals(i, 500);
cfStore.metric.sstablesPerReadHistogram.clear();
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1500"), ByteBufferUtil.bytes("col2000"), false, 1000);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1500"), ByteBufferUtil.bytes("col2000"), false, 1000, System.currentTimeMillis());
assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
for (Column c : cf.getSortedColumns())
@@ -474,7 +492,7 @@ public class TableTest extends SchemaLoader
// reverse
cfStore.metric.sstablesPerReadHistogram.clear();
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col2000"), ByteBufferUtil.bytes("col1500"), true, 1000);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col2000"), ByteBufferUtil.bytes("col1500"), true, 1000, System.currentTimeMillis());
assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
i = 500;
for (Column c : cf.getSortedColumns())
@@ -523,7 +541,7 @@ public class TableTest extends SchemaLoader
ByteBuffer start = ct.builder().add(ByteBufferUtil.bytes("a5")).add(ByteBufferUtil.bytes(85)).build();
ByteBuffer finish = ct.builder().add(ByteBufferUtil.bytes("a5")).buildAsEndOfRange();
cfs.metric.sstablesPerReadHistogram.clear();
- ColumnFamily cf = cfs.getColumnFamily(key, start, finish, false, 1000);
+ ColumnFamily cf = cfs.getColumnFamily(key, start, finish, false, 1000, System.currentTimeMillis());
int colCount = 0;
for (Column c : cf)
colCount++;
@@ -535,7 +553,7 @@ public class TableTest extends SchemaLoader
{
DecoratedKey key = Util.dk("row3");
ColumnFamily cf;
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
assertColumns(cf, "col1000", "col1001", "col1002");
ByteBuffer col;
@@ -546,7 +564,7 @@ public class TableTest extends SchemaLoader
col = cf.getColumn(ByteBufferUtil.bytes("col1002")).value();
assertEquals(ByteBufferUtil.string(col), "v1002");
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1195"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1195"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
assertColumns(cf, "col1195", "col1196", "col1197");
col = cf.getColumn(ByteBufferUtil.bytes("col1195")).value();
@@ -557,7 +575,7 @@ public class TableTest extends SchemaLoader
assertEquals(ByteBufferUtil.string(col), "v1197");
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1996"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1000);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1996"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 1000, System.currentTimeMillis());
Column[] columns = cf.getSortedColumns().toArray(new Column[0]);
for (int i = 1000; i < 1996; i++)
{
@@ -567,7 +585,7 @@ public class TableTest extends SchemaLoader
assertEquals(ByteBufferUtil.string(column.value()), ("v" + i));
}
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1990"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1990"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
assertColumns(cf, "col1990", "col1991", "col1992");
col = cf.getColumn(ByteBufferUtil.bytes("col1990")).value();
assertEquals(ByteBufferUtil.string(col), "v1990");
@@ -576,7 +594,7 @@ public class TableTest extends SchemaLoader
col = cf.getColumn(ByteBufferUtil.bytes("col1992")).value();
assertEquals(ByteBufferUtil.string(col), "v1992");
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3, System.currentTimeMillis());
assertColumns(cf, "col1997", "col1998", "col1999");
col = cf.getColumn(ByteBufferUtil.bytes("col1997")).value();
assertEquals(ByteBufferUtil.string(col), "v1997");
@@ -585,10 +603,10 @@ public class TableTest extends SchemaLoader
col = cf.getColumn(ByteBufferUtil.bytes("col1999")).value();
assertEquals(ByteBufferUtil.string(col), "v1999");
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, true, 3, System.currentTimeMillis());
assertColumns(cf, "col1997", "col1998", "col1999");
- cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3);
+ cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col9000"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 3, System.currentTimeMillis());
assertColumns(cf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/TimeSortTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TimeSortTest.java b/test/unit/org/apache/cassandra/db/TimeSortTest.java
index 87777ca..3ca996b 100644
--- a/test/unit/org/apache/cassandra/db/TimeSortTest.java
+++ b/test/unit/org/apache/cassandra/db/TimeSortTest.java
@@ -54,7 +54,7 @@ public class TimeSortTest extends SchemaLoader
rm.add("StandardLong1", getBytes(0), ByteBufferUtil.bytes("b"), 0);
rm.apply();
- ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(10), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+ ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(10), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000, System.currentTimeMillis());
Collection<Column> columns = cf.getSortedColumns();
assert columns.size() == 1;
}
@@ -95,7 +95,7 @@ public class TimeSortTest extends SchemaLoader
rm.apply();
// verify
- ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(0), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+ ColumnFamily cf = cfStore.getColumnFamily(key, getBytes(0), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000, System.currentTimeMillis());
Collection<Column> columns = cf.getSortedColumns();
assertEquals(12, columns.size());
Iterator<Column> iter = columns.iterator();
@@ -108,7 +108,7 @@ public class TimeSortTest extends SchemaLoader
TreeSet<ByteBuffer> columnNames = new TreeSet<ByteBuffer>(LongType.instance);
columnNames.add(getBytes(10));
columnNames.add(getBytes(0));
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("900"), "StandardLong1", columnNames));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("900"), "StandardLong1", columnNames, System.currentTimeMillis()));
assert "c".equals(ByteBufferUtil.string(cf.getColumn(getBytes(0)).value()));
assert "c".equals(ByteBufferUtil.string(cf.getColumn(getBytes(10)).value()));
}
@@ -120,7 +120,12 @@ public class TimeSortTest extends SchemaLoader
DecoratedKey key = Util.dk(Integer.toString(i));
for (int j = 0; j < 8; j += 3)
{
- ColumnFamily cf = table.getColumnFamilyStore("StandardLong1").getColumnFamily(key, getBytes(j * 2), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000);
+ ColumnFamily cf = table.getColumnFamilyStore("StandardLong1").getColumnFamily(key,
+ getBytes(j * 2),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1000,
+ System.currentTimeMillis());
Collection<Column> columns = cf.getSortedColumns();
assert columns.size() == 8 - j;
int k = j;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index a351fd2..3ac6418 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -86,7 +86,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// major compact and test that all columns but the resurrected one is completely gone
CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE).get();
cfs.invalidateCachedRow(key);
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assertColumns(cf, "5");
assert cf.getColumn(ByteBufferUtil.bytes(String.valueOf(5))) != null;
}
@@ -138,12 +138,12 @@ public class CompactionsPurgeTest extends SchemaLoader
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, cfName, System.currentTimeMillis()));
assert cf == null;
// verify that minor compaction still GC when key is present
// in a non-compacted sstable but the timestamp ensures we won't miss anything
- cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName));
+ cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
Assert.assertEquals(1, cf.getColumnCount());
}
@@ -180,8 +180,8 @@ public class CompactionsPurgeTest extends SchemaLoader
// we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
// so it would be invalid to assume we can throw out the c1 entry.
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName));
- Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
+ Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
Assert.assertEquals(2, cf.getColumnCount());
}
@@ -218,7 +218,7 @@ public class CompactionsPurgeTest extends SchemaLoader
// compact and test that the row is completely gone
Util.compactAll(cfs).get();
assert cfs.getSSTables().isEmpty();
- ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assert cf == null : cf;
}
@@ -244,7 +244,7 @@ public class CompactionsPurgeTest extends SchemaLoader
rm.apply();
// move the key up in row cache
- cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
// deletes row
rm = new RowMutation(tableName, key.key);
@@ -264,10 +264,10 @@ public class CompactionsPurgeTest extends SchemaLoader
rm.apply();
// Check that the second insert did went in
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assertEquals(10, cf.getColumnCount());
for (Column c : cf)
- assert !c.isMarkedForDelete();
+ assert !c.isMarkedForDelete(System.currentTimeMillis());
}
@Test
@@ -309,9 +309,9 @@ public class CompactionsPurgeTest extends SchemaLoader
rm.apply();
// Check that the second insert did went in
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assertEquals(10, cf.getColumnCount());
for (Column c : cf)
- assert !c.isMarkedForDelete();
+ assert !c.isMarkedForDelete(System.currentTimeMillis());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 9df5d25..1fe7a46 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -147,7 +147,7 @@ public class CompactionsTest extends SchemaLoader
// check that the shadowed column is gone
SSTableReader sstable = cfs.getSSTables().iterator().next();
- SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()), key);
+ SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter(), System.currentTimeMillis()), key);
OnDiskAtomIterator iter = scanner.next();
assertEquals(key, iter.getKey());
assert iter.next() instanceof RangeTombstone;
@@ -314,7 +314,7 @@ public class CompactionsTest extends SchemaLoader
Collection<SSTableReader> sstablesBefore = cfs.getSSTables();
- QueryFilter filter = QueryFilter.getIdentityFilter(key, cfname);
+ QueryFilter filter = QueryFilter.getIdentityFilter(key, cfname, System.currentTimeMillis());
assert !(cfs.getColumnFamily(filter).getColumnCount() == 0);
// Remove key
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index d332ec3..3fb828b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -89,7 +89,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
ActiveRepairService.CFPair p = new ActiveRepairService.CFPair(ksname, cfname);
Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
- int gcBefore = (int)(System.currentTimeMillis()/1000) - table.getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+ int gcBefore = table.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
ActiveRepairService.TreeRequest req = new ActiveRepairService.TreeRequest("1", FBUtilities.getLocalAddress(), range, gcBefore, p);
ActiveRepairService.Validator validator = new ActiveRepairService.Validator(req);
CompactionManager.instance.submitValidation(cfs, validator).get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index da83ffd..c14e860 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -140,7 +140,7 @@ public class TTLExpiryTest extends SchemaLoader
cfs.enableAutoCompaction(true);
assertEquals(1, cfs.getSSTables().size());
SSTableReader sstable = cfs.getSSTables().iterator().next();
- SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter()));
+ SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter(), System.currentTimeMillis()));
assertTrue(scanner.hasNext());
while(scanner.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 44bf6a1..6ea1554 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -90,7 +90,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
for (Column column : indexedRow.getSortedColumns())
{
- assertTrue(column.isMarkedForDelete());
+ assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
}
assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
}
@@ -108,7 +108,7 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
assertNotNull(indexedRow);
for (Column column : indexedRow.getSortedColumns())
{
- assertTrue(column.isMarkedForDelete());
+ assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
}
assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array()));
}
@@ -133,7 +133,8 @@ public class PerRowSecondaryIndexTest extends SchemaLoader
public void index(ByteBuffer rowKey)
{
QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
- baseCfs.getColumnFamilyName());
+ baseCfs.getColumnFamilyName(),
+ System.currentTimeMillis());
LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
LAST_INDEXED_KEY = rowKey;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
index 3614980..8314af8 100644
--- a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
@@ -183,7 +183,7 @@ public class CompositeTypeTest extends SchemaLoader
addColumn(rm, cname3);
rm.apply();
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName, System.currentTimeMillis()));
Iterator<Column> iter = cf.getSortedColumns().iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
index f681403..14ec219 100644
--- a/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
@@ -179,7 +179,7 @@ public class DynamicCompositeTypeTest extends SchemaLoader
addColumn(rm, cname3);
rm.apply();
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("k"), cfName, System.currentTimeMillis()));
Iterator<Column> iter = cf.getSortedColumns().iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 3f2241e..eeec8d9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -318,7 +318,7 @@ public class SSTableReaderTest extends SchemaLoader
List<IndexExpression> clause = Arrays.asList(expr);
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = indexedCFS.search(clause, range, 100, new IdentityQueryFilter());
+ List<Row> rows = indexedCFS.search(range, clause, new IdentityQueryFilter(), 100);
assert rows.size() == 1;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index b120a3f..79aed42 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -107,7 +107,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
local_range = StorageService.instance.getPrimaryRangesForEndpoint(tablename, LOCAL).iterator().next();
// (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost)
- int gcBefore = (int)(System.currentTimeMillis()/1000) - store.metadata.getGcGraceSeconds();
+ int gcBefore = store.gcBefore(System.currentTimeMillis());
request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, gcBefore, new CFPair(tablename, cfname));
// Set a fake session corresponding to this fake request
ActiveRepairService.instance.submitArtificialRepairSession(request, tablename, cfname);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java
index c2e0c1d..d19677c 100644
--- a/test/unit/org/apache/cassandra/service/RowResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java
@@ -45,7 +45,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf2.addColumn(column("c1", "v2", 1));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
assertColumns(resolved, "c1");
assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
assertNull(ColumnFamily.diff(cf2, resolved));
@@ -60,7 +60,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
assertColumns(resolved, "c1", "c2");
assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
@@ -72,7 +72,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2), System.currentTimeMillis());
assertColumns(resolved, "c2");
assertColumns(ColumnFamily.diff(null, resolved), "c2");
assertNull(ColumnFamily.diff(cf2, resolved));
@@ -84,7 +84,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf1.addColumn(column("c1", "v1", 0));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null), System.currentTimeMillis());
assertColumns(resolved, "c1");
assertNull(ColumnFamily.diff(cf1, resolved));
assertColumns(ColumnFamily.diff(null, resolved), "c1");
@@ -93,7 +93,7 @@ public class RowResolverTest extends SchemaLoader
@Test
public void testResolveSupersetNullBoth()
{
- assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+ assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null), System.currentTimeMillis()));
}
@Test
@@ -106,7 +106,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2), System.currentTimeMillis());
// no columns in the cf
assertColumns(resolved);
assertTrue(resolved.isMarkedForDelete());
@@ -133,7 +133,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf4 = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4), System.currentTimeMillis());
// will have deleted marker and one column
assertColumns(resolved, "two");
assertColumn(resolved, "two", "B", 3);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index e3d9b8a..a86330c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -100,7 +100,7 @@ public class StreamingTransferTest extends SchemaLoader
{
String key = "key" + offs[i];
String col = "col" + offs[i];
- assert cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key), cfs.name)) != null;
+ assert cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key), cfs.name, System.currentTimeMillis())) != null;
assert rows.get(i).key.key.equals(ByteBufferUtil.bytes(key));
assert rows.get(i).cf.getColumn(ByteBufferUtil.bytes(col)) != null;
}
@@ -197,7 +197,7 @@ public class StreamingTransferTest extends SchemaLoader
List<IndexExpression> clause = Arrays.asList(expr);
IDiskAtomFilter filter = new IdentityQueryFilter();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = cfs.search(clause, range, 100, filter);
+ List<Row> rows = cfs.search(range, clause, filter, 100);
assertEquals(1, rows.size());
assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
}
@@ -299,10 +299,10 @@ public class StreamingTransferTest extends SchemaLoader
assert rows.get(1).cf.getColumnCount() == 1;
// these keys fall outside of the ranges and should not be transferred
- assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1")) == null;
- assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1")) == null;
- assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1")) == null;
- assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1")) == null;
+ assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), "Standard1", System.currentTimeMillis())) == null;
+ assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), "Standard1", System.currentTimeMillis())) == null;
+ assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), "Standard1", System.currentTimeMillis())) == null;
+ assert cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), "Standard1", System.currentTimeMillis())) == null;
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 6fd1764..5528d4c 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -202,13 +202,13 @@ public class SSTableExportTest extends SchemaLoader
new SSTableImport().importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());
reader = SSTableReader.open(Descriptor.fromFilename(tempSS2.getPath()));
- QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), "Standard1", ByteBufferUtil.bytes("name"));
+ QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), "Standard1", ByteBufferUtil.bytes("name"), System.currentTimeMillis());
ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
qf.collateOnDiskAtom(cf, qf.getSSTableColumnIterator(reader), Integer.MIN_VALUE);
assertTrue(cf != null);
assertTrue(cf.getColumn(ByteBufferUtil.bytes("name")).value().equals(hexToBytes("76616c")));
- qf = QueryFilter.getNamesFilter(Util.dk("rowExclude"), "Standard1", ByteBufferUtil.bytes("name"));
+ qf = QueryFilter.getNamesFilter(Util.dk("rowExclude"), "Standard1", ByteBufferUtil.bytes("name"), System.currentTimeMillis());
cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
assert cf == null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
index a1a7673..b0f22be 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
@@ -53,7 +53,7 @@ public class SSTableImportTest extends SchemaLoader
// Verify results
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
ColumnFamily cf = cloneForAdditions(iter);
while (iter.hasNext()) cf.addAtom(iter.next());
@@ -87,7 +87,7 @@ public class SSTableImportTest extends SchemaLoader
// Verify results
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
ColumnFamily cf = cloneForAdditions(iter);
while (iter.hasNext()) cf.addAtom(iter.next());
@@ -108,7 +108,7 @@ public class SSTableImportTest extends SchemaLoader
// Verify results
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Super4");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Super4", System.currentTimeMillis());
ColumnFamily cf = cloneForAdditions(qf.getSSTableColumnIterator(reader));
qf.collateOnDiskAtom(cf, qf.getSSTableColumnIterator(reader), Integer.MIN_VALUE);
@@ -137,7 +137,7 @@ public class SSTableImportTest extends SchemaLoader
new SSTableImport().importJson(jsonUrl, "Keyspace1", "Standard1", tempSS.getPath());
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
ColumnFamily cf = cloneForAdditions(iter);
while (iter.hasNext())
@@ -160,7 +160,7 @@ public class SSTableImportTest extends SchemaLoader
// Verify results
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Standard1", System.currentTimeMillis());
OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
ColumnFamily cf = cloneForAdditions(iter);
assertEquals(cf.deletionInfo(), new DeletionInfo(0, 0));
@@ -184,7 +184,7 @@ public class SSTableImportTest extends SchemaLoader
// Verify results
SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
- QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Counter1");
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), "Counter1", System.currentTimeMillis());
OnDiskAtomIterator iter = qf.getSSTableColumnIterator(reader);
ColumnFamily cf = cloneForAdditions(iter);
while (iter.hasNext()) cf.addAtom(iter.next());
[4/4] git commit: Include a timestamp with all read commands to
determine column expiration
Posted by al...@apache.org.
Include a timestamp with all read commands to determine column expiration
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-5149
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f7628ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f7628ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f7628ce
Branch: refs/heads/trunk
Commit: 1f7628ce7c1b3f820717eaa44df9b182158eb49e
Parents: 62295f6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 18 19:15:02 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 18 19:15:02 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/config/CFMetaData.java | 2 +-
.../cassandra/config/ColumnDefinition.java | 3 +-
.../apache/cassandra/cql/QueryProcessor.java | 17 +-
.../cql3/statements/ColumnGroupMap.java | 6 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../cql3/statements/SelectStatement.java | 49 ++--
.../cassandra/cql3/statements/Selection.java | 12 +-
.../cassandra/db/CollationController.java | 6 +-
src/java/org/apache/cassandra/db/Column.java | 25 +-
.../org/apache/cassandra/db/ColumnFamily.java | 9 +-
.../cassandra/db/ColumnFamilySerializer.java | 22 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 107 ++++----
.../apache/cassandra/db/ColumnSerializer.java | 2 +-
.../org/apache/cassandra/db/CounterColumn.java | 8 +-
.../apache/cassandra/db/CounterMutation.java | 7 +-
.../cassandra/db/CounterUpdateColumn.java | 2 +-
src/java/org/apache/cassandra/db/DefsTable.java | 4 +-
.../org/apache/cassandra/db/DeletedColumn.java | 4 +-
.../org/apache/cassandra/db/DeletionTime.java | 4 +-
.../org/apache/cassandra/db/ExpiringColumn.java | 15 +-
.../cassandra/db/HintedHandOffManager.java | 23 +-
.../org/apache/cassandra/db/OnDiskAtom.java | 2 +-
.../apache/cassandra/db/RangeSliceCommand.java | 83 +++---
.../org/apache/cassandra/db/RangeTombstone.java | 4 +-
.../org/apache/cassandra/db/ReadCommand.java | 18 +-
.../db/RetriedSliceFromReadCommand.java | 9 +-
src/java/org/apache/cassandra/db/Row.java | 4 +-
.../apache/cassandra/db/RowIteratorFactory.java | 16 +-
.../cassandra/db/SliceByNamesReadCommand.java | 28 +-
.../cassandra/db/SliceFromReadCommand.java | 30 ++-
.../apache/cassandra/db/SliceQueryPager.java | 7 +-
.../org/apache/cassandra/db/SuperColumns.java | 4 +-
.../org/apache/cassandra/db/SystemTable.java | 20 +-
.../db/compaction/CompactionManager.java | 6 +-
.../db/compaction/LazilyCompactedRow.java | 4 +-
.../db/compaction/PrecompactedRow.java | 4 +-
.../cassandra/db/filter/ColumnCounter.java | 17 +-
.../cassandra/db/filter/ExtendedFilter.java | 49 +++-
.../cassandra/db/filter/IDiskAtomFilter.java | 4 +-
.../cassandra/db/filter/NamesQueryFilter.java | 8 +-
.../apache/cassandra/db/filter/QueryFilter.java | 31 ++-
.../cassandra/db/filter/SliceQueryFilter.java | 20 +-
.../AbstractSimplePerColumnSecondaryIndex.java | 2 +-
.../db/index/SecondaryIndexManager.java | 20 +-
.../db/index/SecondaryIndexSearcher.java | 17 +-
.../db/index/composites/CompositesIndex.java | 2 +-
.../CompositesIndexOnClusteringKey.java | 4 +-
.../CompositesIndexOnPartitionKey.java | 4 +-
.../composites/CompositesIndexOnRegular.java | 4 +-
.../db/index/composites/CompositesSearcher.java | 31 ++-
.../cassandra/db/index/keys/KeysIndex.java | 4 +-
.../cassandra/db/index/keys/KeysSearcher.java | 27 +-
.../io/sstable/SSTableIdentityIterator.java | 5 +-
.../cassandra/io/sstable/SSTableReader.java | 10 +-
.../cassandra/service/ActiveRepairService.java | 2 +-
.../apache/cassandra/service/CacheService.java | 4 +-
.../service/RangeSliceResponseResolver.java | 6 +-
.../service/RangeSliceVerbHandler.java | 15 +-
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../cassandra/service/RowDataResolver.java | 12 +-
.../apache/cassandra/service/StorageProxy.java | 35 ++-
.../cassandra/thrift/CassandraServer.java | 211 ++++++++-------
.../serialization/2.0/db.RangeSliceCommand.bin | Bin 753 -> 801 bytes
.../2.0/db.SliceByNamesReadCommand.bin | Bin 437 -> 485 bytes
.../2.0/db.SliceFromReadCommand.bin | Bin 437 -> 485 bytes
.../org/apache/cassandra/db/LongTableTest.java | 5 +-
.../unit/org/apache/cassandra/SchemaLoader.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 9 +-
.../org/apache/cassandra/config/DefsTest.java | 6 +-
.../org/apache/cassandra/db/CleanupTest.java | 4 +-
.../cassandra/db/CollationControllerTest.java | 4 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 259 +++++++++++++------
.../apache/cassandra/db/ColumnFamilyTest.java | 4 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 12 +-
.../apache/cassandra/db/KeyCollisionTest.java | 2 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 10 +-
.../apache/cassandra/db/ReadMessageTest.java | 13 +-
.../db/RecoveryManagerTruncateTest.java | 6 +-
.../cassandra/db/RemoveColumnFamilyTest.java | 2 +-
.../db/RemoveColumnFamilyWithFlush1Test.java | 2 +-
.../db/RemoveColumnFamilyWithFlush2Test.java | 2 +-
.../apache/cassandra/db/RemoveColumnTest.java | 18 +-
.../cassandra/db/RemoveSubColumnTest.java | 10 +-
.../org/apache/cassandra/db/RowCacheTest.java | 28 +-
test/unit/org/apache/cassandra/db/RowTest.java | 4 +-
.../apache/cassandra/db/SerializationsTest.java | 24 +-
.../unit/org/apache/cassandra/db/TableTest.java | 94 ++++---
.../org/apache/cassandra/db/TimeSortTest.java | 13 +-
.../db/compaction/CompactionsPurgeTest.java | 22 +-
.../db/compaction/CompactionsTest.java | 4 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/db/compaction/TTLExpiryTest.java | 2 +-
.../db/index/PerRowSecondaryIndexTest.java | 7 +-
.../cassandra/db/marshal/CompositeTypeTest.java | 2 +-
.../db/marshal/DynamicCompositeTypeTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
.../service/AntiEntropyServiceTestAbstract.java | 2 +-
.../cassandra/service/RowResolverTest.java | 14 +-
.../streaming/StreamingTransferTest.java | 12 +-
.../cassandra/tools/SSTableExportTest.java | 4 +-
.../cassandra/tools/SSTableImportTest.java | 12 +-
102 files changed, 1008 insertions(+), 730 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a620d31..46b67e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,8 @@
* Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
* Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
* cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+ (CASSANDRA-5149)
1.2.6
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b5ece0c..15713bb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1251,7 +1251,7 @@ public final class CFMetaData
public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, Descriptor.Version version)
{
- return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, (int) (System.currentTimeMillis() / 1000), version);
+ return getOnDiskIterator(in, count, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
}
public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, int count, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 9d21435..470e7d7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -306,7 +306,8 @@ public class ColumnDefinition
DefsTable.searchComposite(cfName, true),
DefsTable.searchComposite(cfName, false),
false,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
return new Row(key, cf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 57fec4a..a40bfea 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -71,7 +71,7 @@ public class QueryProcessor
public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME);
- private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+ private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
throws InvalidRequestException, ReadTimeoutException, UnavailableException, IsBootstrappingException, WriteTimeoutException
{
List<ReadCommand> commands = new ArrayList<ReadCommand>();
@@ -87,7 +87,7 @@ public class QueryProcessor
ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables);
validateKey(key);
- commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), new NamesQueryFilter(columnNames)));
+ commands.add(new SliceByNamesReadCommand(metadata.ksName, key, select.getColumnFamily(), now, new NamesQueryFilter(columnNames)));
}
}
// ...a range (slice) of column names
@@ -106,6 +106,7 @@ public class QueryProcessor
commands.add(new SliceFromReadCommand(metadata.ksName,
key,
select.getColumnFamily(),
+ now,
new SliceQueryFilter(start, finish, select.isColumnsReversed(), select.getColumnsLimit())));
}
}
@@ -128,7 +129,7 @@ public class QueryProcessor
return columnNames;
}
- private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables)
+ private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables, long now)
throws ReadTimeoutException, UnavailableException, InvalidRequestException
{
IPartitioner<?> p = StorageService.getPartitioner();
@@ -175,6 +176,7 @@ public class QueryProcessor
List<org.apache.cassandra.db.Row> rows = StorageProxy.getRangeSlice(new RangeSliceCommand(metadata.ksName,
select.getColumnFamily(),
+ now,
columnFilter,
bounds,
expressions,
@@ -379,14 +381,15 @@ public class QueryProcessor
List<org.apache.cassandra.db.Row> rows;
+ long now = System.currentTimeMillis();
// By-key
if (!select.isKeyRange() && (select.getKeys().size() > 0))
{
- rows = getSlice(metadata, select, variables);
+ rows = getSlice(metadata, select, variables, now);
}
else
{
- rows = multiRangeSlice(metadata, select, variables);
+ rows = multiRangeSlice(metadata, select, variables, now);
}
// count resultset is a single column named "count"
@@ -429,7 +432,7 @@ public class QueryProcessor
{
for (org.apache.cassandra.db.Column c : row.cf.getSortedColumns())
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
ColumnDefinition cd = metadata.getColumnDefinitionFromColumnName(c.name());
@@ -474,7 +477,7 @@ public class QueryProcessor
if (cd != null)
result.schema.value_types.put(name, TypeParser.getShortName(cd.getValidator()));
org.apache.cassandra.db.Column c = row.cf.getColumn(name);
- if (c == null || c.isMarkedForDelete())
+ if (c == null || c.isMarkedForDelete(now))
thriftColumns.add(new Column().setName(name));
else
thriftColumns.add(thriftify(c));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 20fa3bd..8974523 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -104,20 +104,22 @@ public class ColumnGroupMap
{
private final CompositeType composite;
private final int idx;
+ private final long now;
private ByteBuffer[] previous;
private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
private ColumnGroupMap currentGroup;
- public Builder(CompositeType composite, boolean hasCollections)
+ public Builder(CompositeType composite, boolean hasCollections, long now)
{
this.composite = composite;
this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+ this.now = now;
}
public void add(Column c)
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
return;
ByteBuffer[] current = composite.split(c.name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f6b7140..62f7fbd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -297,10 +297,12 @@ public abstract class ModificationStatement implements CQLStatement
}
List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
+ long now = System.currentTimeMillis();
for (ByteBuffer key : partitionKeys)
commands.add(new SliceFromReadCommand(keyspace(),
key,
columnFamily(),
+ now,
new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
List<Row> rows = local
@@ -313,7 +315,7 @@ public abstract class ModificationStatement implements CQLStatement
if (row.cf == null || row.cf.getColumnCount() == 0)
continue;
- ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+ ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
for (Column column : row.cf)
groupBuilder.add(column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a96eb1d..fd47ba8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -127,17 +127,18 @@ public class SelectStatement implements CQLStatement
cl.validateForRead(keyspace());
int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit), cl)
- : StorageProxy.read(getSliceCommands(variables, limit), cl);
+ ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
+ : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
- return processResults(rows, variables, limit);
+ return processResults(rows, variables, limit, now);
}
- private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some column together in sparse column families
- ResultSet rset = process(rows, variables, limit);
+ ResultSet rset = process(rows, variables, limit, now);
rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
return new ResultMessage.Rows(rset);
}
@@ -153,19 +154,20 @@ public class SelectStatement implements CQLStatement
public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
+ List<ByteBuffer> variables = Collections.emptyList();
int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit))
- : readLocally(keyspace(), getSliceCommands(variables, limit));
+ ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+ : readLocally(keyspace(), getSliceCommands(variables, limit, now));
- return processResults(rows, variables, limit);
+ return processResults(rows, variables, limit, now);
}
public ResultSet process(List<Row> rows) throws InvalidRequestException
{
assert !parameters.isCount; // not yet needed
- return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()));
+ return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
}
public String keyspace()
@@ -178,7 +180,7 @@ public class SelectStatement implements CQLStatement
return cfDef.cfm.cfName;
}
- private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
Collection<ByteBuffer> keys = getKeys(variables);
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
@@ -192,25 +194,18 @@ public class SelectStatement implements CQLStatement
// We should not share the slice filter amongst the commands (hence the cloneShallow), due to
// SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
// (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
- commands.add(ReadCommand.create(keyspace(), key, columnFamily(), filter.cloneShallow()));
+ commands.add(ReadCommand.create(keyspace(), key, columnFamily(), now, filter.cloneShallow()));
}
return commands;
}
- private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit) throws RequestValidationException
+ private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
IDiskAtomFilter filter = makeFilter(variables, limit);
List<IndexExpression> expressions = getIndexExpressions(variables);
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
- return new RangeSliceCommand(keyspace(),
- columnFamily(),
- filter,
- getKeyBounds(variables),
- expressions,
- limit,
- true,
- false);
+ return new RangeSliceCommand(keyspace(), columnFamily(), now, filter, getKeyBounds(variables), expressions, limit, true, false);
}
private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
@@ -661,9 +656,9 @@ public class SelectStatement implements CQLStatement
};
}
- private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit) throws InvalidRequestException
+ private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
{
- Selection.ResultSetBuilder result = selection.resultSetBuilder();
+ Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
for (org.apache.cassandra.db.Row row : rows)
{
// Not columns match the query, skip
@@ -679,7 +674,7 @@ public class SelectStatement implements CQLStatement
// One cqlRow per column
for (Column c : columnsInOrder(row.cf, variables))
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
ByteBuffer[] components = null;
@@ -728,11 +723,11 @@ public class SelectStatement implements CQLStatement
// Sparse case: group column in cqlRow when composite prefix is equal
CompositeType composite = (CompositeType)cfDef.cfm.comparator;
- ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections);
+ ColumnGroupMap.Builder builder = new ColumnGroupMap.Builder(composite, cfDef.hasCollections, now);
for (Column c : row.cf)
{
- if (c.isMarkedForDelete())
+ if (c.isMarkedForDelete(now))
continue;
builder.add(c);
@@ -743,7 +738,7 @@ public class SelectStatement implements CQLStatement
}
else
{
- if (row.cf.hasOnlyTombstones())
+ if (row.cf.hasOnlyTombstones(now))
continue;
// Static case: One cqlRow for all columns
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index d3018e5..cf2b62e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -213,9 +213,9 @@ public abstract class Selection
return columnsList;
}
- public ResultSetBuilder resultSetBuilder()
+ public ResultSetBuilder resultSetBuilder(long now)
{
- return new ResultSetBuilder();
+ return new ResultSetBuilder(now);
}
private static ByteBuffer value(Column c)
@@ -240,12 +240,14 @@ public abstract class Selection
List<ByteBuffer> current;
final long[] timestamps;
final int[] ttls;
+ final long now;
- private ResultSetBuilder()
+ private ResultSetBuilder(long now)
{
this.resultSet = new ResultSet(metadata);
this.timestamps = collectTimestamps ? new long[columnsList.size()] : null;
this.ttls = collectTTLs ? new int[columnsList.size()] : null;
+ this.now = now;
}
public void add(ByteBuffer v)
@@ -264,14 +266,14 @@ public abstract class Selection
{
int ttl = -1;
if (!isDead(c) && c instanceof ExpiringColumn)
- ttl = c.getLocalDeletionTime() - (int) (System.currentTimeMillis() / 1000);
+ ttl = c.getLocalDeletionTime() - (int) (now / 1000);
ttls[current.size() - 1] = ttl;
}
}
private boolean isDead(Column c)
{
- return c == null || c.isMarkedForDelete();
+ return c == null || c.isMarkedForDelete(now);
}
public void newRow() throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index f1fccef..d0d22c5 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
@@ -37,8 +35,6 @@ import org.apache.cassandra.utils.HeapAllocator;
public class CollationController
{
- private static final Logger logger = LoggerFactory.getLogger(CollationController.class);
-
private final ColumnFamilyStore cfs;
private final QueryFilter filter;
private final int gcBefore;
@@ -104,7 +100,7 @@ public class CollationController
// (reduceNameFilter removes columns that are known to be irrelevant)
NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter;
TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns);
- QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns));
+ QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp);
/* add the SSTables on disk */
Collections.sort(view.sstables, SSTable.maxTimestampComparator);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index b42097c..b210d22 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -143,14 +143,20 @@ public class Column implements OnDiskAtom
return timestamp;
}
- public boolean isMarkedForDelete()
+ public boolean isMarkedForDelete(long now)
{
- return (int) (System.currentTimeMillis() / 1000) >= getLocalDeletionTime();
+ return false;
}
+ public boolean isLive(long now)
+ {
+ return !isMarkedForDelete(now);
+ }
+
+ // Don't call unless the column is actually marked for delete.
public long getMarkedForDeleteAt()
{
- throw new IllegalStateException("column is not marked for delete");
+ return Long.MAX_VALUE;
}
public int dataSize()
@@ -186,9 +192,7 @@ public class Column implements OnDiskAtom
public Column diff(Column column)
{
if (timestamp() < column.timestamp())
- {
return column;
- }
return null;
}
@@ -223,9 +227,9 @@ public class Column implements OnDiskAtom
public Column reconcile(Column column, Allocator allocator)
{
// tombstones take precedence. (if both are tombstones, then it doesn't matter which one we use.)
- if (isMarkedForDelete())
+ if (isMarkedForDelete(System.currentTimeMillis()))
return timestamp() < column.timestamp() ? column : this;
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return timestamp() > column.timestamp() ? this : column;
// break ties by comparing values.
if (timestamp() == column.timestamp())
@@ -276,7 +280,7 @@ public class Column implements OnDiskAtom
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
sb.append(":");
- sb.append(isMarkedForDelete());
+ sb.append(isMarkedForDelete(System.currentTimeMillis()));
sb.append(":");
sb.append(value.remaining());
sb.append("@");
@@ -284,11 +288,6 @@ public class Column implements OnDiskAtom
return sb.toString();
}
- public boolean isLive()
- {
- return !isMarkedForDelete();
- }
-
protected void validateName(CFMetaData metadata) throws MarshalException
{
metadata.comparator.validate(name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index cda2d9b..36396f8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -31,7 +31,6 @@ import java.util.UUID;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
-
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.cassandra.cache.IRowCacheEntry;
@@ -194,7 +193,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
* </code>
* but is potentially faster.
*/
- public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+ public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
/**
* Replace oldColumn if present by newColumn.
@@ -426,13 +425,11 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
return metadata.comparator;
}
- public boolean hasOnlyTombstones()
+ public boolean hasOnlyTombstones(long now)
{
for (Column column : this)
- {
- if (column.isLive())
+ if (column.isLive(now))
return false;
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index f5cf3d4..411b040 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -102,11 +102,10 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
return null;
ColumnFamily cf = factory.create(Schema.instance.getCFMetaData(deserializeCfId(in, version)));
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
if (cf.metadata().isSuper() && version < MessagingService.VERSION_20)
{
- SuperColumns.deserializerSuperColumnFamily(in, cf, flag, expireBefore, version);
+ SuperColumns.deserializerSuperColumnFamily(in, cf, flag, version);
}
else
{
@@ -115,9 +114,7 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
ColumnSerializer columnSerializer = Column.serializer;
int size = in.readInt();
for (int i = 0; i < size; ++i)
- {
- cf.addColumn(columnSerializer.deserialize(in, flag, expireBefore));
- }
+ cf.addColumn(columnSerializer.deserialize(in, flag));
}
return cf;
}
@@ -170,21 +167,6 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
throw new UnsupportedOperationException();
}
- public void deserializeColumnsFromSSTable(DataInput in, ColumnFamily cf, int size, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
- {
- Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, size, flag, expireBefore, version);
- while (iter.hasNext())
- cf.addAtom(iter.next());
- }
-
- public void deserializeFromSSTable(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, Descriptor.Version version) throws IOException
- {
- cf.delete(DeletionInfo.serializer().deserializeFromSSTable(in, version));
- int size = in.readInt();
- int expireBefore = (int) (System.currentTimeMillis() / 1000);
- deserializeColumnsFromSSTable(in, cf, size, flag, expireBefore, version);
- }
-
public void serializeCfId(UUID cfId, DataOutput out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(cfId, out, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4287df6..648c25a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.compaction.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +48,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -73,6 +70,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static org.apache.cassandra.config.CFMetaData.Caching;
@@ -1180,24 +1178,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
}
- public ColumnFamily getColumnFamily(DecoratedKey key, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+ public ColumnFamily getColumnFamily(DecoratedKey key,
+ ByteBuffer start,
+ ByteBuffer finish,
+ boolean reversed,
+ int limit,
+ long timestamp)
{
- return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit));
- }
-
- /**
- * get a list of columns starting from a given column, in a specified order.
- * only the latest version of a column is returned.
- * @return null if there is no data and no tombstones; otherwise a ColumnFamily
- */
- public ColumnFamily getColumnFamily(QueryFilter filter)
- {
- return getColumnFamily(filter, gcBefore());
- }
-
- public int gcBefore()
- {
- return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds();
+ return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
}
/**
@@ -1236,7 +1224,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
- ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name), Integer.MIN_VALUE);
+ ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp),
+ Integer.MIN_VALUE);
if (sentinelSuccess && data != null)
CacheService.instance.rowCache.replace(key, sentinel, data);
@@ -1249,7 +1238,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore)
+ public int gcBefore(long now)
+ {
+ return (int) (now / 1000) - metadata.getGcGraceSeconds();
+ }
+
+ /**
+ * get a list of columns starting from a given column, in a specified order.
+ * only the latest version of a column is returned.
+ * @return null if there is no data and no tombstones; otherwise a ColumnFamily
+ */
+ public ColumnFamily getColumnFamily(QueryFilter filter)
{
assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
@@ -1258,6 +1257,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
long start = System.nanoTime();
try
{
+ int gcBefore = gcBefore(filter.timestamp);
if (isRowCacheEnabled())
{
UUID cfId = Schema.instance.getId(table.getName(), name);
@@ -1274,7 +1274,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return null;
}
- result = filterColumnFamily(cached, filter, gcBefore);
+ result = filterColumnFamily(cached, filter);
}
else
{
@@ -1301,12 +1301,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* tombstones that are no longer relevant.
* The returned column family won't be thread safe.
*/
- ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, int gcBefore)
+ ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
{
ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
- filter.collateOnDiskAtom(cf, ci, gcBefore);
- return removeDeletedCF(cf, gcBefore);
+ filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
+ return removeDeletedCF(cf, gcBefore(filter.timestamp));
}
/**
@@ -1440,7 +1440,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
return files;
}
- finally {
+ finally
+ {
SSTableReader.releaseReferences(view.sstables);
}
}
@@ -1468,14 +1469,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @param range Either a Bounds, which includes start key, or a Range, which does not.
* @param columnFilter description of the columns we're interested in for each row
*/
- public AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+ private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
+ IDiskAtomFilter columnFilter,
+ long timestamp)
{
assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
final RowPosition startWith = range.left;
final RowPosition stopAt = range.right;
- QueryFilter filter = new QueryFilter(null, name, columnFilter);
+ QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
final ViewFragment view = markReferenced(range);
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
@@ -1524,25 +1527,43 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter)
+ public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+ List<IndexExpression> rowFilter,
+ IDiskAtomFilter columnFilter,
+ int maxResults)
{
- return getRangeSlice(range, maxResults, columnFilter, rowFilter, false, false);
+ return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
}
- public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging)
+ public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
+ List<IndexExpression> rowFilter,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
- return filter(getSequentialIterator(range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging));
+ return filter(getSequentialIterator(range, columnFilter, now),
+ ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
}
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults)
{
- return search(clause, range, maxResults, dataFilter, false);
+ return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
}
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
- return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows);
+ return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
}
public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1566,7 +1587,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
if (extraFilter != null)
{
- ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter));
+ ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
if (cf != null)
data.addAll(cf, HeapAllocator.instance);
}
@@ -1751,14 +1772,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return Iterables.concat(stores);
}
- public static List<ColumnFamilyStore> allUserDefined()
- {
- List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
- for (Table table : Sets.difference(ImmutableSet.copyOf(Table.all()), Schema.systemKeyspaceNames))
- cfses.addAll(table.getColumnFamilyStores());
- return cfses;
- }
-
public Iterable<DecoratedKey> keySamples(Range<Token> range)
{
Collection<SSTableReader> sstables = getSSTables();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 353dda9..fb38b5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -88,7 +88,7 @@ public class ColumnSerializer implements ISerializer<Column>
*/
public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
{
- return deserialize(in, flag, (int) (System.currentTimeMillis() / 1000));
+ return deserialize(in, flag, Integer.MIN_VALUE);
}
public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 15da1df..207ded6 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -169,9 +169,11 @@ public class CounterColumn extends Column
{
assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
- if (column.isMarkedForDelete()) // live + tombstone: track last tombstone
+ // live + tombstone: track last tombstone
+ if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
{
- if (timestamp() < column.timestamp()) // live < tombstone
+ // live < tombstone
+ if (timestamp() < column.timestamp())
{
return column;
}
@@ -230,7 +232,7 @@ public class CounterColumn extends Column
StringBuilder sb = new StringBuilder();
sb.append(comparator.getString(name));
sb.append(":");
- sb.append(isMarkedForDelete());
+ sb.append(false);
sb.append(":");
sb.append(contextManager.toString(value));
sb.append("@");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6f60a26..9ace314 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -84,11 +84,12 @@ public class CounterMutation implements IMutation
public RowMutation makeReplicationMutation()
{
List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+ long timestamp = System.currentTimeMillis();
for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
{
if (!columnFamily.metadata().getReplicateOnWrite())
continue;
- addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, readCommands);
+ addReadCommandFromColumnFamily(rowMutation.getTable(), rowMutation.key(), columnFamily, timestamp, readCommands);
}
// create a replication RowMutation
@@ -106,11 +107,11 @@ public class CounterMutation implements IMutation
return replicationMutation;
}
- private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, List<ReadCommand> commands)
+ private void addReadCommandFromColumnFamily(String table, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List<ReadCommand> commands)
{
SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(columnFamily.metadata().comparator);
Iterables.addAll(s, columnFamily.getColumnNames());
- commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s)));
+ commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s)));
}
public MessageOut<CounterMutation> makeMutationMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
index 9d9530e..1ae7dd7 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
@@ -64,7 +64,7 @@ public class CounterUpdateColumn extends Column
assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
// tombstones take precedence
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
return timestamp() > column.timestamp() ? this : column;
// neither is tombstoned
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index df551e0..4b6c1c2 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -152,7 +152,9 @@ public class DefsTable
private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
{
ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF);
- return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, SystemTable.SCHEMA_COLUMNFAMILIES_CF)));
+ return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey,
+ SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+ System.currentTimeMillis())));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index f9bda78..57c9bf9 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -53,10 +53,8 @@ public class DeletedColumn extends Column
}
@Override
- public boolean isMarkedForDelete()
+ public boolean isMarkedForDelete(long now)
{
- // We don't rely on the column implementation because it could mistakenly return false if
- // some node are not exactly synchronized, which is problematic (see #4307)
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d1ce0eb..5296529 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -83,9 +83,9 @@ public class DeletionTime implements Comparable<DeletionTime>
return localDeletionTime < gcBefore;
}
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Column column, long now)
{
- return column.isMarkedForDelete() && column.getMarkedForDeleteAt() <= markedForDeleteAt;
+ return column.isMarkedForDelete(now) && column.getMarkedForDeleteAt() <= markedForDeleteAt;
}
public long memorySize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 7660bad..f342310 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -158,16 +158,15 @@ public class ExpiringColumn extends Column
}
@Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return (int) (now / 1000) >= getLocalDeletionTime();
+ }
+
+ @Override
public long getMarkedForDeleteAt()
{
- if (isMarkedForDelete())
- {
- return timestamp;
- }
- else
- {
- throw new IllegalStateException("column is not marked for delete");
- }
+ return timestamp;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index c10bed6..e89c769 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -326,15 +326,16 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
delivery:
while (true)
{
+ long now = System.currentTimeMillis();
QueryFilter filter = QueryFilter.getSliceFilter(epkey,
SystemTable.HINTS_CF,
startColumn,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- pageSize);
+ pageSize,
+ now);
- ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
- (int) (System.currentTimeMillis() / 1000));
+ ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000));
if (pagingFinished(hintsPage, startColumn))
break;
@@ -362,7 +363,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// in which the local deletion timestamp was generated on the last column in the old page, in which
// case the hint will have no columns (since it's deleted) but will still be included in the resultset
// since (even with gcgs=0) it's still a "relevant" tombstone.
- if (!hint.isLive())
+ if (!hint.isLive(System.currentTimeMillis()))
continue;
startColumn = hint.name();
@@ -479,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
- List<Row> rows = hintStore.getRangeSlice(range, Integer.MAX_VALUE, filter, null);
+ List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
for (Row row : rows)
{
UUID hostId = UUIDGen.getUUID(row.key.key);
@@ -576,18 +577,22 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
- // Get a bunch of rows!
- List<Row> rows;
try
{
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(Table.SYSTEM_KS, SystemTable.HINTS_CF, predicate, range, null, LARGE_NUMBER), ConsistencyLevel.ONE);
+ RangeSliceCommand cmd = new RangeSliceCommand(Table.SYSTEM_KS,
+ SystemTable.HINTS_CF,
+ System.currentTimeMillis(),
+ predicate,
+ range,
+ null,
+ LARGE_NUMBER);
+ return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE);
}
catch (Exception e)
{
logger.info("HintsCF getEPPendingHints timed out.");
throw new RuntimeException(e);
}
- return rows;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 2fdb7ad..14a21c8 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -66,7 +66,7 @@ public interface OnDiskAtom
public OnDiskAtom deserializeFromSSTable(DataInput in, Descriptor.Version version) throws IOException
{
- return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, (int)(System.currentTimeMillis() / 1000), version);
+ return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
}
public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4020a15..c7e71f3 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -15,24 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.cassandra.db;
import java.io.DataInput;
@@ -46,8 +28,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
@@ -57,8 +37,6 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
public class RangeSliceCommand implements IReadCommand
@@ -69,6 +47,8 @@ public class RangeSliceCommand implements IReadCommand
public final String column_family;
+ public final long timestamp;
+
public final IDiskAtomFilter predicate;
public final List<IndexExpression> row_filter;
@@ -77,20 +57,40 @@ public class RangeSliceCommand implements IReadCommand
public final boolean countCQL3Rows;
public final boolean isPaging;
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ int maxResults)
{
- this(keyspace, column_family, predicate, range, null, maxResults, false, false);
+ this(keyspace, column_family, timestamp, predicate, range, null, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ List<IndexExpression> row_filter,
+ int maxResults)
{
- this(keyspace, column_family, predicate, range, row_filter, maxResults, false, false);
+ this(keyspace, column_family, timestamp, predicate, range, row_filter, maxResults, false, false);
}
- public RangeSliceCommand(String keyspace, String column_family, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public RangeSliceCommand(String keyspace,
+ String column_family,
+ long timestamp,
+ IDiskAtomFilter predicate,
+ AbstractBounds<RowPosition> range,
+ List<IndexExpression> row_filter,
+ int maxResults,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
this.keyspace = keyspace;
this.column_family = column_family;
+ this.timestamp = timestamp;
this.predicate = predicate;
this.range = range;
this.row_filter = row_filter;
@@ -110,6 +110,7 @@ public class RangeSliceCommand implements IReadCommand
return "RangeSliceCommand{" +
"keyspace='" + keyspace + '\'' +
", column_family='" + column_family + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", predicate=" + predicate +
", range=" + range +
", row_filter =" + row_filter +
@@ -131,27 +132,14 @@ public class RangeSliceCommand implements IReadCommand
class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
{
- // For compatibility with pre-1.2 sake. We should remove at some point.
- public static SlicePredicate asSlicePredicate(IDiskAtomFilter predicate)
- {
- SlicePredicate sp = new SlicePredicate();
- if (predicate instanceof NamesQueryFilter)
- {
- sp.setColumn_names(new ArrayList<ByteBuffer>(((NamesQueryFilter)predicate).columns));
- }
- else
- {
- SliceQueryFilter sqf = (SliceQueryFilter)predicate;
- sp.setSlice_range(new SliceRange(sqf.start(), sqf.finish(), sqf.reversed, sqf.count));
- }
- return sp;
- }
-
public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
{
out.writeUTF(sliceCommand.keyspace);
out.writeUTF(sliceCommand.column_family);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(sliceCommand.timestamp);
+
IDiskAtomFilter filter = sliceCommand.predicate;
if (version < MessagingService.VERSION_20)
{
@@ -199,6 +187,8 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
IDiskAtomFilter predicate;
@@ -234,7 +224,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
predicate = IDiskAtomFilter.Serializer.instance.deserialize(in, version, metadata.comparator);
}
- List<IndexExpression> rowFilter = null;
+ List<IndexExpression> rowFilter;
int filterCount = in.readInt();
rowFilter = new ArrayList<IndexExpression>(filterCount);
for (int i = 0; i < filterCount; i++)
@@ -250,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
int maxResults = in.readInt();
boolean countCQL3Rows = in.readBoolean();
boolean isPaging = in.readBoolean();
- return new RangeSliceCommand(keyspace, columnFamily, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
+ return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
}
public long serializedSize(RangeSliceCommand rsc, int version)
@@ -258,6 +248,9 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
size += TypeSizes.NATIVE.sizeof(rsc.column_family);
+ if (version >= MessagingService.VERSION_20)
+ size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
+
IDiskAtomFilter filter = rsc.predicate;
if (version < MessagingService.VERSION_20)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 9342570..e30cd5b 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -237,11 +237,11 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
}
}
- public boolean isDeleted(Column column)
+ public boolean isDeleted(Column column, long now)
{
for (RangeTombstone tombstone : ranges)
{
- if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+ if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column, now))
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 3cff8b6..61a2478 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -35,10 +35,10 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.RowDataResolver;
-
public abstract class ReadCommand implements IReadCommand
{
- public enum Type {
+ public enum Type
+ {
GET_BY_NAMES((byte)1),
GET_SLICES((byte)2);
@@ -65,23 +65,25 @@ public abstract class ReadCommand implements IReadCommand
public final String table;
public final String cfName;
public final ByteBuffer key;
+ public final long timestamp;
private boolean isDigestQuery = false;
protected final Type commandType;
- protected ReadCommand(String table, ByteBuffer key, String cfName, Type cmdType)
+ protected ReadCommand(String table, ByteBuffer key, String cfName, long timestamp, Type cmdType)
{
this.table = table;
this.key = key;
this.cfName = cfName;
+ this.timestamp = timestamp;
this.commandType = cmdType;
}
- public static ReadCommand create(String table, ByteBuffer key, String cfName, IDiskAtomFilter filter)
+ public static ReadCommand create(String table, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
{
if (filter instanceof SliceQueryFilter)
- return new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ return new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
else
- return new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ return new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
}
public boolean isDigestQuery()
@@ -143,7 +145,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
newCommand.setDigestQuery(command.isDigestQuery());
superColumn = scFilter.scName;
}
@@ -187,7 +189,7 @@ class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, command.filter());
- newCommand = ReadCommand.create(command.table, command.key, command.cfName, scFilter.updatedFilter);
+ newCommand = ReadCommand.create(command.table, command.key, command.cfName, command.timestamp, scFilter.updatedFilter);
newCommand.setDigestQuery(command.isDigestQuery());
superColumn = scFilter.scName;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index 52bad89..7ca57a8 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -19,25 +19,26 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.filter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+
public class RetriedSliceFromReadCommand extends SliceFromReadCommand
{
static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
public final int originalCount;
- public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter, int originalCount)
+ public RetriedSliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
{
- super(table, key, cfName, filter);
+ super(table, key, cfName, timestamp, filter);
this.originalCount = originalCount;
}
@Override
public ReadCommand copy()
{
- ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, filter, originalCount);
+ ReadCommand readCommand = new RetriedSliceFromReadCommand(table, key, cfName, timestamp, filter, originalCount);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
index 49aa426..13e6f67 100644
--- a/src/java/org/apache/cassandra/db/Row.java
+++ b/src/java/org/apache/cassandra/db/Row.java
@@ -54,9 +54,9 @@ public class Row
')';
}
- public int getLiveCount(IDiskAtomFilter filter)
+ public int getLiveCount(IDiskAtomFilter filter, long now)
{
- return cf == null ? 0 : filter.getLiveCount(cf);
+ return cf == null ? 0 : filter.getLiveCount(cf, now);
}
public static class RowSerializer implements IVersionedSerializer<Row>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 4588146..c9f715c 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -52,11 +52,11 @@ public class RowIteratorFactory
* @return A row iterator following all the given restrictions
*/
public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
- final Collection<SSTableReader> sstables,
- final RowPosition startWith,
- final RowPosition stopAt,
- final QueryFilter filter,
- final ColumnFamilyStore cfs)
+ final Collection<SSTableReader> sstables,
+ final RowPosition startWith,
+ final RowPosition stopAt,
+ final QueryFilter filter,
+ final ColumnFamilyStore cfs)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -76,7 +76,7 @@ public class RowIteratorFactory
// reduce rows from all sources into a single row
return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
{
- private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ private final int gcBefore = cfs.gcBefore(filter.timestamp);
private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
private DecoratedKey key;
private ColumnFamily returnCF;
@@ -106,8 +106,8 @@ public class RowIteratorFactory
}
else
{
- QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter);
- returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
+ QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+ returnCF = cfs.filterColumnFamily(cached, keyFilter);
}
Row rv = new Row(key, returnCF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 909ba76..2942249 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -36,15 +36,15 @@ public class SliceByNamesReadCommand extends ReadCommand
public final NamesQueryFilter filter;
- public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, NamesQueryFilter filter)
+ public SliceByNamesReadCommand(String table, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
{
- super(table, key, cfName, Type.GET_BY_NAMES);
+ super(table, key, cfName, timestamp, Type.GET_BY_NAMES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, filter);
+ ReadCommand readCommand= new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -52,7 +52,7 @@ public class SliceByNamesReadCommand extends ReadCommand
public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, cfName, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
@Override
@@ -62,6 +62,7 @@ public class SliceByNamesReadCommand extends ReadCommand
"table='" + table + '\'' +
", key=" + ByteBufferUtil.bytesToHex(key) +
", cfName='" + cfName + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", filter=" + filter +
')';
}
@@ -91,6 +92,9 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
else
out.writeUTF(command.cfName);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(cmd.timestamp);
+
NamesQueryFilter.serializer.serialize(command.filter, out, version);
}
@@ -113,6 +117,8 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
cfName = in.readUTF();
}
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
ReadCommand command;
if (version < MessagingService.VERSION_20)
@@ -135,14 +141,14 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
// Due to SC compat, it's possible we get back a slice filter at this point
if (filter instanceof NamesQueryFilter)
- command = new SliceByNamesReadCommand(table, key, cfName, (NamesQueryFilter)filter);
+ command = new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter)filter);
else
- command = new SliceFromReadCommand(table, key, cfName, (SliceQueryFilter)filter);
+ command = new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter)filter);
}
else
{
NamesQueryFilter filter = NamesQueryFilter.serializer.deserialize(in, version, metadata.comparator);
- command = new SliceByNamesReadCommand(table, key, cfName, filter);
+ command = new SliceByNamesReadCommand(table, key, cfName, timestamp, filter);
}
command.setDigestQuery(isDigest);
@@ -165,15 +171,15 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
size += sizes.sizeof((short)keySize) + keySize;
if (version < MessagingService.VERSION_20)
- {
size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- }
else
- {
size += sizes.sizeof(command.cfName);
- }
+
+ if (version >= MessagingService.VERSION_20)
+ size += sizes.sizeof(cmd.timestamp);
size += NamesQueryFilter.serializer.serializedSize(command.filter, version);
+
return size;
}
}
[3/4] Include a timestamp with all read commands to determine column
expiration
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index be64ae1..5c42de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -46,15 +46,15 @@ public class SliceFromReadCommand extends ReadCommand
public final SliceQueryFilter filter;
- public SliceFromReadCommand(String table, ByteBuffer key, String cfName, SliceQueryFilter filter)
+ public SliceFromReadCommand(String table, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
{
- super(table, key, cfName, Type.GET_SLICES);
+ super(table, key, cfName, timestamp, Type.GET_SLICES);
this.filter = filter;
}
public ReadCommand copy()
{
- ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, filter);
+ ReadCommand readCommand = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
readCommand.setDigestQuery(isDigestQuery());
return readCommand;
}
@@ -62,7 +62,7 @@ public class SliceFromReadCommand extends ReadCommand
public Row getRow(Table table)
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return table.getRow(new QueryFilter(dk, cfName, filter));
+ return table.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
@Override
@@ -77,7 +77,7 @@ public class SliceFromReadCommand extends ReadCommand
if (maxLiveColumns < count)
return null;
- int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+ int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
if (liveCountInRow < getOriginalRequestedCount())
{
// We asked t (= count) live columns and got l (=liveCountInRow) ones.
@@ -86,7 +86,7 @@ public class SliceFromReadCommand extends ReadCommand
// round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
- return new RetriedSliceFromReadCommand(table, key, cfName, newFilter, getOriginalRequestedCount());
+ return new RetriedSliceFromReadCommand(table, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
}
return null;
@@ -98,7 +98,7 @@ public class SliceFromReadCommand extends ReadCommand
if ((row == null) || (row.cf == null))
return;
- filter.trim(row.cf, getOriginalRequestedCount());
+ filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
}
public IDiskAtomFilter filter()
@@ -123,6 +123,7 @@ public class SliceFromReadCommand extends ReadCommand
"table='" + table + '\'' +
", key='" + ByteBufferUtil.bytesToHex(key) + '\'' +
", cfName='" + cfName + '\'' +
+ ", timestamp='" + timestamp + '\'' +
", filter='" + filter + '\'' +
')';
}
@@ -147,6 +148,9 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
else
out.writeUTF(realRM.cfName);
+ if (version >= MessagingService.VERSION_20)
+ out.writeLong(realRM.timestamp);
+
SliceQueryFilter.serializer.serialize(realRM.filter, out, version);
}
@@ -169,6 +173,8 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
cfName = in.readUTF();
}
+ long timestamp = version < MessagingService.VERSION_20 ? System.currentTimeMillis() : in.readLong();
+
CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
SliceQueryFilter filter;
if (version < MessagingService.VERSION_20)
@@ -183,7 +189,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
filter = SliceQueryFilter.serializer.deserialize(in, version);
}
- ReadCommand command = new SliceFromReadCommand(table, key, cfName, filter);
+ ReadCommand command = new SliceFromReadCommand(table, key, cfName, timestamp, filter);
command.setDigestQuery(isDigest);
return command;
}
@@ -204,15 +210,15 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
size += sizes.sizeof((short) keySize) + keySize;
if (version < MessagingService.VERSION_20)
- {
size += new QueryPath(command.cfName, superColumn).serializedSize(sizes);
- }
else
- {
size += sizes.sizeof(command.cfName);
- }
+
+ if (version >= MessagingService.VERSION_20)
+ size += sizes.sizeof(cmd.timestamp);
size += SliceQueryFilter.serializer.serializedSize(command.filter, version);
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
index 7b8e4d4..c1933ad 100644
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java
@@ -51,10 +51,11 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
if (exhausted)
return null;
+ long now = System.currentTimeMillis();
SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
- QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter);
+ QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
ColumnFamily cf = cfs.getColumnFamily(filter);
- if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE)
+ if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
{
exhausted = true;
}
@@ -62,7 +63,7 @@ public class SliceQueryPager implements Iterator<ColumnFamily>
{
Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
Column lastColumn = iter.next();
- while (lastColumn.isMarkedForDelete())
+ while (lastColumn.isMarkedForDelete(now))
lastColumn = iter.next();
int i = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 753cd91..54f59d6 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -102,13 +102,13 @@ public class SuperColumns
return scMap;
}
- public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int expireBefore, int version) throws IOException
+ public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
{
// Note that there was no way to insert a range tombstone in a SCF in 1.2
cf.delete(DeletionInfo.serializer().deserialize(in, version, cf.getComparator()));
assert !cf.deletionInfo().rangeIterator().hasNext();
- Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, expireBefore);
+ Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE);
while (iter.hasNext())
cf.addAtom(iter.next());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index e62b258..90bfd8d 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -535,7 +535,8 @@ public class SystemTable
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(INDEX_CF);
QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(table)),
INDEX_CF,
- ByteBufferUtil.bytes(indexName));
+ ByteBufferUtil.bytes(indexName),
+ System.currentTimeMillis());
return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
}
@@ -596,7 +597,8 @@ public class SystemTable
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
true,
- 1);
+ 1,
+ System.currentTimeMillis());
ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
if (cf != null && cf.getColumnCount() != 0)
return CounterId.wrap(cf.iterator().next().name());
@@ -629,7 +631,7 @@ public class SystemTable
List<CounterId.CounterIdRecord> l = new ArrayList<CounterId.CounterIdRecord>();
Table table = Table.open(Table.SYSTEM_KS);
- QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF);
+ QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis());
ColumnFamily cf = table.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter);
CounterId previous = null;
@@ -673,11 +675,10 @@ public class SystemTable
{
Token minToken = StorageService.getPartitioner().getMinimumToken();
- return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(),
- minToken.maxKeyBound()),
- Integer.MAX_VALUE,
+ return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
+ null,
new IdentityQueryFilter(),
- null);
+ Integer.MAX_VALUE);
}
public static Collection<RowMutation> serializeSchema()
@@ -729,7 +730,7 @@ public class SystemTable
DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
ColumnFamilyStore schemaCFS = SystemTable.schemaCFS(SCHEMA_KEYSPACES_CF);
- ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF));
+ ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, SCHEMA_KEYSPACES_CF, System.currentTimeMillis()));
return new Row(key, result);
}
@@ -743,7 +744,8 @@ public class SystemTable
DefsTable.searchComposite(cfName, true),
DefsTable.searchComposite(cfName, false),
false,
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
return new Row(key, result);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 90f6dfa..6c9f50d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -628,7 +628,7 @@ public class CompactionManager implements CompactionManagerMBean
// this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
// time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
// 'as good as in the non-snapshot' case)
- gcBefore = (int) (cfs.getSnapshotCreationTime(validator.request.sessionid) / 1000) - cfs.metadata.getGcGraceSeconds();
+ gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(validator.request.sessionid));
}
else
{
@@ -731,9 +731,7 @@ public class CompactionManager implements CompactionManagerMBean
{
// 2ndary indexes have ExpiringColumns too, so we need to purge tombstones deleted before now. We do not need to
// add any GcGrace however since 2ndary indexes are local to a node.
- return cfs.isIndex()
- ? (int) (System.currentTimeMillis() / 1000)
- : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ return cfs.isIndex() ? (int) (System.currentTimeMillis() / 1000) : cfs.gcBefore(System.currentTimeMillis());
}
private static class ValidationCompactionIterable extends CompactionIterable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index e9cd2f1..32361b2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -209,7 +209,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
Column column = (Column) current;
container.addColumn(column);
if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete()
+ && !column.isMarkedForDelete(System.currentTimeMillis())
&& !container.getColumn(column.name()).equals(column))
{
indexer.remove(column);
@@ -247,7 +247,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
// not the range tombstone. For that we use the columnIndexer tombstone tracker.
// Note that this doesn't work for super columns.
- if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+ if (indexBuilder.tombstoneTracker().isDeleted(reduced, System.currentTimeMillis()))
return null;
columns++;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index f69a955..85f2a23 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -133,7 +133,7 @@ public class PrecompactedRow extends AbstractCompactedRow
{
container.addColumn(column);
if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete()
+ && !column.isMarkedForDelete(System.currentTimeMillis())
&& !container.getColumn(column.name()).equals(column))
{
indexer.remove(column);
@@ -149,7 +149,7 @@ public class PrecompactedRow extends AbstractCompactedRow
};
Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
- filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
+ filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
}
public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 697ad02..a5d54ce 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -31,18 +31,24 @@ public class ColumnCounter
{
protected int live;
protected int ignored;
+ protected final long timestamp;
+
+ public ColumnCounter(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
public void count(Column column, ColumnFamily container)
{
- if (!isLive(column, container))
+ if (!isLive(column, container, timestamp))
ignored++;
else
live++;
}
- protected static boolean isLive(Column column, ColumnFamily container)
+ protected static boolean isLive(Column column, ColumnFamily container, long timestamp)
{
- return column.isLive() && (!container.deletionInfo().isDeleted(column));
+ return column.isLive(timestamp) && (!container.deletionInfo().isDeleted(column));
}
public int live()
@@ -71,8 +77,9 @@ public class ColumnCounter
* column. If 0, all columns are grouped, otherwise we group
* those for which the {@code toGroup} first component are equals.
*/
- public GroupByPrefix(CompositeType type, int toGroup)
+ public GroupByPrefix(long timestamp, CompositeType type, int toGroup)
{
+ super(timestamp);
this.type = type;
this.toGroup = toGroup;
@@ -81,7 +88,7 @@ public class ColumnCounter
public void count(Column column, ColumnFamily container)
{
- if (!isLive(column, container))
+ if (!isLive(column, container, timestamp))
{
ignored++;
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 064cc6e..d061ac2 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -44,34 +44,42 @@ public abstract class ExtendedFilter
private static final Logger logger = LoggerFactory.getLogger(ExtendedFilter.class);
public final ColumnFamilyStore cfs;
+ public final long timestamp;
protected final IDiskAtomFilter originalFilter;
private final int maxResults;
private final boolean countCQL3Rows;
private final boolean isPaging;
- public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public static ExtendedFilter create(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
if (clause == null || clause.isEmpty())
{
- return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging);
+ return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
}
else
{
if (isPaging)
throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
return cfs.getComparator() instanceof CompositeType
- ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows)
- : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows);
+ ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
+ : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
}
}
- protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
{
assert cfs != null;
assert filter != null;
this.cfs = cfs;
this.originalFilter = filter;
this.maxResults = maxResults;
+ this.timestamp = timestamp;
this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
if (countCQL3Rows)
@@ -112,7 +120,7 @@ public abstract class ExtendedFilter
if (initialFilter() instanceof SliceQueryFilter)
return ((SliceQueryFilter)initialFilter()).lastCounted();
else
- return initialFilter().getLiveCount(data);
+ return initialFilter().getLiveCount(data, timestamp);
}
/** The initial filter we'll do our first slice with (either the original or a superset of it) */
@@ -167,9 +175,14 @@ public abstract class ExtendedFilter
protected final List<IndexExpression> clause;
protected final IDiskAtomFilter initialFilter;
- public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+ public FilterWithClauses(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows)
{
- super(cfs, filter, maxResults, countCQL3Rows, false);
+ super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
assert clause != null;
this.clause = clause;
this.initialFilter = computeInitialFilter();
@@ -271,7 +284,7 @@ public abstract class ExtendedFilter
return data;
ColumnFamily pruned = data.cloneMeShallow();
OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
- originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore());
+ originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
return pruned;
}
@@ -335,9 +348,14 @@ public abstract class ExtendedFilter
private static class FilterWithCompositeClauses extends FilterWithClauses
{
- public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows)
+ public FilterWithCompositeClauses(ColumnFamilyStore cfs,
+ List<IndexExpression> clause,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows)
{
- super(cfs, filter, clause, maxResults, countCQL3Rows);
+ super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
}
/*
@@ -359,9 +377,14 @@ public abstract class ExtendedFilter
private static class EmptyClauseFilter extends ExtendedFilter
{
- public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging)
+ public EmptyClauseFilter(ColumnFamilyStore cfs,
+ IDiskAtomFilter filter,
+ int maxResults,
+ long timestamp,
+ boolean countCQL3Rows,
+ boolean isPaging)
{
- super(cfs, filter, maxResults, countCQL3Rows, isPaging);
+ super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
}
public IDiskAtomFilter initialFilter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index e032f1d..35f71e5 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -66,14 +66,14 @@ public interface IDiskAtomFilter
* by the filter code, which should have some limit on the number of columns
* to avoid running out of memory on large rows.
*/
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore);
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now);
public Comparator<Column> getColumnComparator(AbstractType<?> comparator);
public boolean isReversed();
public void updateColumnsLimit(int newLimit);
- public int getLiveCount(ColumnFamily cf);
+ public int getLiveCount(ColumnFamily cf, long now);
public IDiskAtomFilter cloneShallow();
public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 570eb29..297f227 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -90,7 +90,7 @@ public class NamesQueryFilter implements IDiskAtomFilter
return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
}
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
while (reducedColumns.hasNext())
container.addIfRelevant(reducedColumns.next(), gcBefore);
@@ -118,15 +118,15 @@ public class NamesQueryFilter implements IDiskAtomFilter
{
}
- public int getLiveCount(ColumnFamily cf)
+ public int getLiveCount(ColumnFamily cf, long now)
{
if (countCQL3Rows)
- return cf.hasOnlyTombstones() ? 0 : 1;
+ return cf.hasOnlyTombstones(now) ? 0 : 1;
int count = 0;
for (Column column : cf)
{
- if (column.isLive())
+ if (column.isLive(now))
count++;
}
return count;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 126b240..ac0c632 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,12 +33,14 @@ public class QueryFilter
public final DecoratedKey key;
public final String cfName;
public final IDiskAtomFilter filter;
+ public final long timestamp;
- public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter)
+ public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp)
{
this.key = key;
this.cfName = cfName;
this.filter = filter;
+ this.timestamp = timestamp;
}
public OnDiskAtomIterator getMemtableColumnIterator(Memtable memtable)
@@ -79,7 +81,7 @@ public class QueryFilter
public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore)
{
Iterator<Column> columns = gatherTombstones(returnCF, toCollate);
- filter.collectReducedColumns(returnCF, columns, gcBefore);
+ filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
}
public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
@@ -107,7 +109,7 @@ public class QueryFilter
};
Iterator<Column> reduced = MergeIterator.get(toCollate, fcomp, reducer);
- filter.collectReducedColumns(returnCF, reduced, gcBefore);
+ filter.collectReducedColumns(returnCF, reduced, gcBefore, timestamp);
}
/**
@@ -178,19 +180,26 @@ public class QueryFilter
* @param finish column to stop slice at, inclusive; empty for "the last column"
* @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest
* @param limit maximum number of non-deleted columns to return
+ * @param timestamp time to use for determining expiring columns' state
*/
- public static QueryFilter getSliceFilter(DecoratedKey key, String cfName, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
+ public static QueryFilter getSliceFilter(DecoratedKey key,
+ String cfName,
+ ByteBuffer start,
+ ByteBuffer finish,
+ boolean reversed,
+ int limit,
+ long timestamp)
{
- return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit));
+ return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp);
}
/**
* return a QueryFilter object that includes every column in the row.
* This is dangerous on large rows; avoid except for test code.
*/
- public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName)
+ public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp)
{
- return new QueryFilter(key, cfName, new IdentityQueryFilter());
+ return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp);
}
/**
@@ -199,17 +208,17 @@ public class QueryFilter
* @param cfName column family to query
* @param columns the column names to restrict the results to, sorted in comparator order
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<ByteBuffer> columns, long timestamp)
{
- return new QueryFilter(key, cfName, new NamesQueryFilter(columns));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp);
}
/**
* convenience method for creating a name filter matching a single column
*/
- public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column)
+ public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, ByteBuffer column, long timestamp)
{
- return new QueryFilter(key, cfName, new NamesQueryFilter(column));
+ return new QueryFilter(key, cfName, new NamesQueryFilter(column), timestamp);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index fda7ac5..b76ce04 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -120,9 +120,9 @@ public class SliceQueryFilter implements IDiskAtomFilter
return reversed ? comparator.columnReverseComparator : comparator.columnComparator;
}
- public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore)
+ public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
- columnCounter = getColumnCounter(container);
+ columnCounter = getColumnCounter(container, now);
while (reducedColumns.hasNext())
{
@@ -142,28 +142,28 @@ public class SliceQueryFilter implements IDiskAtomFilter
Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
}
- public int getLiveCount(ColumnFamily cf)
+ public int getLiveCount(ColumnFamily cf, long now)
{
- ColumnCounter counter = getColumnCounter(cf);
+ ColumnCounter counter = getColumnCounter(cf, now);
for (Column column : cf)
counter.count(column, cf);
return counter.live();
}
- private ColumnCounter getColumnCounter(ColumnFamily container)
+ private ColumnCounter getColumnCounter(ColumnFamily container, long now)
{
AbstractType<?> comparator = container.getComparator();
if (compositesToGroup < 0)
- return new ColumnCounter();
+ return new ColumnCounter(now);
else if (compositesToGroup == 0)
- return new ColumnCounter.GroupByPrefix(null, 0);
+ return new ColumnCounter.GroupByPrefix(now, null, 0);
else
- return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+ return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
}
- public void trim(ColumnFamily cf, int trimTo)
+ public void trim(ColumnFamily cf, int trimTo, long now)
{
- ColumnCounter counter = getColumnCounter(cf);
+ ColumnCounter counter = getColumnCounter(cf, now);
Collection<Column> columns = reversed
? cf.getReverseSortedColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f12acdc..4d0914a 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -91,7 +91,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
public void delete(ByteBuffer rowKey, Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, column));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index af370d5..17ac81f 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -509,12 +509,17 @@ public class SecondaryIndexManager
* Performs a search across a number of column indexes
* TODO: add support for querying across index types
*
- * @param clause the index query clause
* @param range the row range to restrict to
- * @param dataFilter the column range to restrict to
+ * @param clause the index query clause
+ * @param columnFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
@@ -525,8 +530,7 @@ public class SecondaryIndexManager
if (indexSearchers.size() > 1)
throw new RuntimeException("Unable to search across multiple secondary index types");
-
- return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows);
+ return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
}
public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -584,7 +588,7 @@ public class SecondaryIndexManager
public void insert(Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
for (SecondaryIndex index : indexFor(column.name()))
@@ -605,7 +609,7 @@ public class SecondaryIndexManager
{
// insert the new value before removing the old one, so we never have a period
// where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
- if (!column.isMarkedForDelete())
+ if (!column.isMarkedForDelete(System.currentTimeMillis()))
((PerColumnSecondaryIndex) index).insert(key.key, column);
((PerColumnSecondaryIndex) index).delete(key.key, oldColumn);
}
@@ -614,7 +618,7 @@ public class SecondaryIndexManager
public void remove(Column column)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(System.currentTimeMillis()))
return;
for (SecondaryIndex index : indexFor(column.name()))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 16ac091..ddd79dd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -39,7 +39,12 @@ public abstract class SecondaryIndexSearcher
this.baseCfs = indexManager.baseCfs;
}
- public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows);
+ public abstract List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows);
/**
* @return true this index is able to handle given clauses.
@@ -49,16 +54,6 @@ public abstract class SecondaryIndexSearcher
return highestSelectivityPredicate(clause) != null;
}
- protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue)
- {
- Column liveColumn = liveData.getColumn(indexedColumnName);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
- return true;
-
- ByteBuffer liveValue = liveColumn.value();
- return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue);
- }
-
protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
{
IndexExpression best = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 131b8c6..0720e83 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -93,7 +93,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry);
- public abstract boolean isStale(IndexedEntry entry, ColumnFamily data);
+ public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
public void delete(IndexedEntry entry)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index 8ad990e..b767d6c 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -107,9 +107,9 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- return data == null || data.hasOnlyTombstones();
+ return data == null || data.hasOnlyTombstones(now);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index 2034c71..7a00de8 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -95,9 +95,9 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
return true;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
- return data == null || data.hasOnlyTombstones();
+ return data == null || data.hasOnlyTombstones(now);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
index 40a2ee1..7159c23 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
@@ -91,11 +91,11 @@ public class CompositesIndexOnRegular extends CompositesIndex
&& comp.compare(components[columnDef.componentIndex], columnDef.name) == 0;
}
- public boolean isStale(IndexedEntry entry, ColumnFamily data)
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
{
ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).build();
Column liveColumn = data.getColumn(bb);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
+ if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
ByteBuffer liveValue = liveColumn.value();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e16d94d..e8c0a09 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -21,18 +21,18 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class CompositesSearcher extends SecondaryIndexSearcher
{
@@ -44,10 +44,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
@@ -69,7 +74,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return isStart ? builder.build() : builder.buildAsEndOfRange();
}
- public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -80,8 +85,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
final DecoratedKey indexKey = index.getIndexKeyFor(primary.value);
if (logger.isDebugEnabled())
- logger.debug("Most-selective indexed predicate is {}",
- ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary));
+ logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary));
/*
* XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
@@ -155,14 +159,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (logger.isTraceEnabled())
logger.trace("Scanning index {} starting with {}",
- ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), indexComparator.getString(startPrefix));
+ index.expressionString(primary), indexComparator.getString(startPrefix));
QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
index.getIndexCfs().name,
lastSeenPrefix,
endPrefix,
false,
- rowsPerQuery);
+ rowsPerQuery,
+ filter.timestamp);
ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
if (indexRow == null || indexRow.getColumnCount() == 0)
return makeReturn(currentKey, data);
@@ -185,7 +190,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
{
Column column = indexColumns.poll();
lastSeenPrefix = column.name();
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(filter.timestamp))
{
logger.trace("skipping {}", column.name());
continue;
@@ -242,8 +247,8 @@ public class CompositesSearcher extends SecondaryIndexSearcher
false,
Integer.MAX_VALUE,
baseCfs.metadata.clusteringKeyColumns().size());
- ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter));
- if (index.isStale(entry, newData))
+ ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
+ if (index.isStale(entry, newData, filter.timestamp))
{
index.delete(entry);
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index cd89e92..f47b1e1 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -48,10 +48,10 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
return new KeysSearcher(baseCfs.indexManager, columns);
}
- public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data)
+ public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now)
{
Column liveColumn = data.getColumn(columnDef.name);
- if (liveColumn == null || liveColumn.isMarkedForDelete())
+ if (liveColumn == null || liveColumn.isMarkedForDelete(now))
return true;
ByteBuffer liveValue = liveColumn.value();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 8901bf5..e919d8a 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
@@ -33,8 +36,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.HeapAllocator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class KeysSearcher extends SecondaryIndexSearcher
{
@@ -46,14 +47,19 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows)
+ public List<Row> search(AbstractBounds<RowPosition> range,
+ List<IndexExpression> clause,
+ IDiskAtomFilter dataFilter,
+ int maxResults,
+ long now,
+ boolean countCQL3Rows)
{
assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false);
+ ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
return baseCfs.filter(getIndexedIterator(range, filter), filter);
}
- public ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -106,7 +112,8 @@ public class KeysSearcher extends SecondaryIndexSearcher
lastSeenKey,
endKey,
false,
- rowsPerQuery);
+ rowsPerQuery,
+ filter.timestamp);
ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
logger.trace("fetched {}", indexRow);
if (indexRow == null)
@@ -139,7 +146,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
{
Column column = indexColumns.next();
lastSeenKey = column.name();
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(filter.timestamp))
{
logger.trace("skipping {}", column.name());
continue;
@@ -158,7 +165,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter()));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
@@ -168,12 +175,12 @@ public class KeysSearcher extends SecondaryIndexSearcher
IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
if (extraFilter != null)
{
- ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter));
+ ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
if (cf != null)
data.addAll(cf, HeapAllocator.instance);
}
- if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data))
+ if (((KeysIndex)index).isIndexEntryStale(indexKey.key, data, filter.timestamp))
{
// delete the index entry w/ its own timestamp
Column dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index d35e14d..65164ef 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -181,7 +181,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
{
ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
// since we already read column count, just pass that value and continue deserialization
- columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
+ Iterator<OnDiskAtom> iter = cf.metadata().getOnDiskIterator(in, columnCount, flag, expireBefore, dataVersion);
+ while (iter.hasNext())
+ cf.addAtom(iter.next());
+
if (validateColumns)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8119388..f63ea6a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1076,12 +1076,12 @@ public class SSTableReader extends SSTable
return getScanner((RateLimiter) null);
}
- public SSTableScanner getScanner(RateLimiter limiter)
- {
- return new SSTableScanner(this, null, limiter);
- }
+ public SSTableScanner getScanner(RateLimiter limiter)
+ {
+ return new SSTableScanner(this, null, limiter);
+ }
- /**
+ /**
* Direct I/O SSTableScanner over a defined range of tokens.
*
* @param range the range of keys to cover
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b692ab0..82ecbe9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -836,7 +836,7 @@ public class ActiveRepairService
if (isSequential)
makeSnapshots(endpoints);
- int gcBefore = (int)(System.currentTimeMillis()/1000) - Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+ int gcBefore = Table.open(tablename).getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
for (InetAddress endpoint : allEndpoints)
treeRequests.add(new TreeRequest(getName(), endpoint, range, gcBefore, new CFPair(tablename, cfname)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index d301507..b3cef49 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -300,7 +300,7 @@ public class CacheService implements CacheServiceMBean
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(buffer);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name), Integer.MIN_VALUE);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
}
});
@@ -311,7 +311,7 @@ public class CacheService implements CacheServiceMBean
for (ByteBuffer key : buffers)
{
DecoratedKey dk = cfs.partitioner.decorateKey(key);
- ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name), Integer.MIN_VALUE);
+ ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE);
if (data != null)
rowCache.put(new RowCacheKey(cfs.metadata.cfId, dk), data);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 5ffcdbe..4aab87d 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -48,13 +48,15 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
};
private final String table;
+ private final long timestamp;
private List<InetAddress> sources;
protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>();
- public RangeSliceResponseResolver(String table)
+ public RangeSliceResponseResolver(String table, long timestamp)
{
this.table = table;
+ this.timestamp = timestamp;
}
public void setSources(List<InetAddress> endpoints)
@@ -142,7 +144,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
protected Row getReduced()
{
ColumnFamily resolved = versions.size() > 1
- ? RowDataResolver.resolveSuperset(versions)
+ ? RowDataResolver.resolveSuperset(versions, timestamp)
: versions.get(0);
if (versions.size() < sources.size())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index b6f257e..f63fcb1 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -35,9 +35,20 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
{
ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
if (cfs.indexManager.hasIndexFor(command.row_filter))
- return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows);
+ return cfs.search(command.range,
+ command.row_filter,
+ command.predicate,
+ command.maxResults,
+ command.timestamp,
+ command.countCQL3Rows);
else
- return cfs.getRangeSlice(command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
+ return cfs.getRangeSlice(command.range,
+ command.row_filter,
+ command.predicate,
+ command.maxResults,
+ command.timestamp,
+ command.countCQL3Rows,
+ command.isPaging);
}
public void doVerb(MessageIn<RangeSliceCommand> message, int id)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index fe7f4d7..7cb5a23 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -186,7 +186,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
ReadRepairMetrics.repairedBackground.mark();
ReadCommand readCommand = (ReadCommand) command;
- final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
+ final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter(), readCommand.timestamp);
AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 8533f4f..69cd381 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -39,11 +39,13 @@ public class RowDataResolver extends AbstractRowResolver
private int maxLiveCount = 0;
public List<AsyncOneResponse> repairResults = Collections.emptyList();
private final IDiskAtomFilter filter;
+ private final long timestamp;
- public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+ public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp)
{
super(key, table);
this.filter = qFilter;
+ this.timestamp = timestamp;
}
/*
@@ -74,12 +76,12 @@ public class RowDataResolver extends AbstractRowResolver
endpoints.add(message.from);
// compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+ int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp);
if (liveCount > maxLiveCount)
maxLiveCount = liveCount;
}
- resolved = resolveSuperset(versions);
+ resolved = resolveSuperset(versions, timestamp);
if (logger.isDebugEnabled())
logger.debug("versions merged");
@@ -125,7 +127,7 @@ public class RowDataResolver extends AbstractRowResolver
return results;
}
- static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+ static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now)
{
assert Iterables.size(versions) > 0;
@@ -146,7 +148,7 @@ public class RowDataResolver extends AbstractRowResolver
// mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
// this will handle removing columns and subcolumns that are supressed by a row or
// supercolumn tombstone.
- QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter());
+ QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now);
List<CloseableIterator<Column>> iters = new ArrayList<CloseableIterator<Column>>();
for (ColumnFamily version : versions)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1383be7..9d2f6c1 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -212,9 +212,13 @@ public class StorageProxy implements StorageProxyMBean
// read the current value and compare with expected
Tracing.trace("Reading existing values for CAS precondition");
- ReadCommand readCommand = expected == null
- ? new SliceFromReadCommand(table, key, cfName, new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1))
- : new SliceByNamesReadCommand(table, key, cfName, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
+ long timestamp = System.currentTimeMillis();
+ IDiskAtomFilter filter = expected == null
+ ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+ : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames()));
+ ReadCommand readCommand = filter instanceof SliceQueryFilter
+ ? new SliceFromReadCommand(table, key, cfName, timestamp, (SliceQueryFilter) filter)
+ : new SliceByNamesReadCommand(table, key, cfName, timestamp, (NamesQueryFilter) filter);
List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
ColumnFamily current = rows.get(0).cf;
if (!casApplies(expected, current))
@@ -245,16 +249,18 @@ public class StorageProxy implements StorageProxyMBean
throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1);
}
- private static boolean hasLiveColumns(ColumnFamily cf)
+ private static boolean hasLiveColumns(ColumnFamily cf, long now)
{
- return cf != null && !cf.hasOnlyTombstones();
+ return cf != null && !cf.hasOnlyTombstones(now);
}
private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
{
- if (!hasLiveColumns(expected))
- return !hasLiveColumns(current);
- else if (!hasLiveColumns(current))
+ long now = System.currentTimeMillis();
+
+ if (!hasLiveColumns(expected, now))
+ return !hasLiveColumns(current, now);
+ else if (!hasLiveColumns(current, now))
return false;
// current has been built from expected, so we know that it can't have columns
@@ -264,14 +270,14 @@ public class StorageProxy implements StorageProxyMBean
for (Column e : expected)
{
Column c = current.getColumn(e.name());
- if (e.isLive())
+ if (e.isLive(now))
{
- if (!(c != null && c.isLive() && c.value().equals(e.value())))
+ if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
return false;
}
else
{
- if (c != null && c.isLive())
+ if (c != null && c.isLive(now))
return false;
}
}
@@ -1179,7 +1185,7 @@ public class StorageProxy implements StorageProxyMBean
ReadRepairMetrics.repairedBlocking.mark();
// Do a full data read to resolve the correct response (and repair node that need be)
- RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+ RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter(), exec.command.timestamp);
ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
if (repairCommands == null)
@@ -1397,6 +1403,7 @@ public class StorageProxy implements StorageProxyMBean
RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
command.column_family,
+ command.timestamp,
commandPredicate,
range,
command.row_filter,
@@ -1405,7 +1412,7 @@ public class StorageProxy implements StorageProxyMBean
command.isPaging);
// collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback(resolver, consistency_level, nodeCmd, filteredEndpoints);
handler.assureSufficientLiveNodes();
resolver.setSources(filteredEndpoints);
@@ -1431,7 +1438,7 @@ public class StorageProxy implements StorageProxyMBean
{
rows.add(row);
if (nodeCmd.countCQL3Rows)
- cql3RowCount += row.getLiveCount(commandPredicate);
+ cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
[2/4] Include a timestamp with all read commands to determine column
expiration
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 2daa910..8f1fd21 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -32,10 +32,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.cassandra.auth.AuthenticatedUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -121,15 +121,15 @@ public class CassandraServer implements Cassandra.Iface
return columnFamilyKeyMap;
}
- public List<ColumnOrSuperColumn> thriftifyColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+ public List<ColumnOrSuperColumn> thriftifyColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
{
ArrayList<ColumnOrSuperColumn> thriftColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
for (org.apache.cassandra.db.Column column : columns)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(now))
continue;
- thriftColumns.add(thriftifyColumn(column));
+ thriftColumns.add(thriftifyColumnWithName(column, column.name()));
}
// we have to do the reversing here, since internally we pass results around in ColumnFamily
@@ -142,22 +142,15 @@ public class CassandraServer implements Cassandra.Iface
private ColumnOrSuperColumn thriftifyColumnWithName(org.apache.cassandra.db.Column column, ByteBuffer newName)
{
- assert !column.isMarkedForDelete();
-
if (column instanceof org.apache.cassandra.db.CounterColumn)
return new ColumnOrSuperColumn().setCounter_column(thriftifySubCounter(column).setName(newName));
else
return new ColumnOrSuperColumn().setColumn(thriftifySubColumn(column).setName(newName));
}
- private ColumnOrSuperColumn thriftifyColumn(org.apache.cassandra.db.Column column)
- {
- return thriftifyColumnWithName(column, column.name());
- }
-
private Column thriftifySubColumn(org.apache.cassandra.db.Column column)
{
- assert !column.isMarkedForDelete() && !(column instanceof org.apache.cassandra.db.CounterColumn);
+ assert !(column instanceof org.apache.cassandra.db.CounterColumn);
Column thrift_column = new Column(column.name()).setValue(column.value()).setTimestamp(column.timestamp());
if (column instanceof ExpiringColumn)
@@ -169,18 +162,22 @@ public class CassandraServer implements Cassandra.Iface
private CounterColumn thriftifySubCounter(org.apache.cassandra.db.Column column)
{
- assert !column.isMarkedForDelete() && (column instanceof org.apache.cassandra.db.CounterColumn);
+ assert column instanceof org.apache.cassandra.db.CounterColumn;
return new CounterColumn(column.name(), CounterContext.instance().total(column.value()));
}
- private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, boolean subcolumnsOnly, boolean isCounterCF)
+ private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns,
+ boolean reverseOrder,
+ long now,
+ boolean subcolumnsOnly,
+ boolean isCounterCF)
{
if (subcolumnsOnly)
{
ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
for (org.apache.cassandra.db.Column column : columns)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(now))
continue;
thriftSuperColumns.add(thriftifyColumnWithName(column, SuperColumns.subName(column.name())));
@@ -192,19 +189,19 @@ public class CassandraServer implements Cassandra.Iface
else
{
if (isCounterCF)
- return thriftifyCounterSuperColumns(columns, reverseOrder);
+ return thriftifyCounterSuperColumns(columns, reverseOrder, now);
else
- return thriftifySuperColumns(columns, reverseOrder);
+ return thriftifySuperColumns(columns, reverseOrder, now);
}
}
- private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+ private List<ColumnOrSuperColumn> thriftifySuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
{
ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
SuperColumn current = null;
for (org.apache.cassandra.db.Column column : columns)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(now))
continue;
ByteBuffer scName = SuperColumns.scName(column.name());
@@ -222,13 +219,13 @@ public class CassandraServer implements Cassandra.Iface
return thriftSuperColumns;
}
- private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder)
+ private List<ColumnOrSuperColumn> thriftifyCounterSuperColumns(Collection<org.apache.cassandra.db.Column> columns, boolean reverseOrder, long now)
{
ArrayList<ColumnOrSuperColumn> thriftSuperColumns = new ArrayList<ColumnOrSuperColumn>(columns.size());
CounterSuperColumn current = null;
for (org.apache.cassandra.db.Column column : columns)
{
- if (column.isMarkedForDelete())
+ if (column.isMarkedForDelete(now))
continue;
ByteBuffer scName = SuperColumns.scName(column.name());
@@ -255,14 +252,14 @@ public class CassandraServer implements Cassandra.Iface
{
ColumnFamily cf = columnFamilies.get(StorageService.getPartitioner().decorateKey(command.key));
boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).filter.reversed;
- List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(cf, subColumnsOnly, reverseOrder);
+ List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(cf, subColumnsOnly, reverseOrder, command.timestamp);
columnFamiliesMap.put(command.key, thriftifiedColumns);
}
return columnFamiliesMap;
}
- private List<ColumnOrSuperColumn> thriftifyColumnFamily(ColumnFamily cf, boolean subcolumnsOnly, boolean reverseOrder)
+ private List<ColumnOrSuperColumn> thriftifyColumnFamily(ColumnFamily cf, boolean subcolumnsOnly, boolean reverseOrder, long now)
{
if (cf == null || cf.getColumnCount() == 0)
return EMPTY_COLUMNS;
@@ -270,11 +267,11 @@ public class CassandraServer implements Cassandra.Iface
if (cf.metadata().isSuper())
{
boolean isCounterCF = cf.metadata().getDefaultValidator().isCommutative();
- return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, subcolumnsOnly, isCounterCF);
+ return thriftifySuperColumns(cf.getSortedColumns(), reverseOrder, now, subcolumnsOnly, isCounterCF);
}
else
{
- return thriftifyColumns(cf.getSortedColumns(), reverseOrder);
+ return thriftifyColumns(cf.getSortedColumns(), reverseOrder, now);
}
}
@@ -299,7 +296,7 @@ public class CassandraServer implements Cassandra.Iface
ClientState cState = state();
String keyspace = cState.getKeyspace();
state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
- return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+ return getSliceInternal(keyspace, key, column_parent, System.currentTimeMillis(), predicate, consistency_level);
}
catch (RequestValidationException e)
{
@@ -311,6 +308,17 @@ public class CassandraServer implements Cassandra.Iface
}
}
+ private List<ColumnOrSuperColumn> getSliceInternal(String keyspace,
+ ByteBuffer key,
+ ColumnParent column_parent,
+ long timestamp,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+ {
+ return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, timestamp, predicate, consistency_level).get(key);
+ }
+
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
@@ -335,7 +343,7 @@ public class CassandraServer implements Cassandra.Iface
ClientState cState = state();
String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
- return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+ return multigetSliceInternal(keyspace, keys, column_parent, System.currentTimeMillis(), predicate, consistency_level);
}
catch (RequestValidationException e)
{
@@ -347,7 +355,12 @@ public class CassandraServer implements Cassandra.Iface
}
}
- private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
+ private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+ List<ByteBuffer> keys,
+ ColumnParent column_parent,
+ long timestamp,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
@@ -388,56 +401,12 @@ public class CassandraServer implements Cassandra.Iface
ThriftValidation.validateKey(metadata, key);
// Note that we should not share a slice filter amongst the command, due to SliceQueryFilter not being immutable
// due to its columnCounter used by the lastCounted() method (also see SelectStatement.getSliceCommands)
- commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), filter.cloneShallow()));
+ commands.add(ReadCommand.create(keyspace, key, column_parent.getColumn_family(), timestamp, filter.cloneShallow()));
}
return getSlice(commands, column_parent.isSetSuper_column(), consistencyLevel);
}
- private ColumnOrSuperColumn internal_get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
- throws RequestValidationException, NotFoundException, UnavailableException, TimedOutException
- {
- ThriftClientState cState = state();
- String keyspace = cState.getKeyspace();
- cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
-
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
- ThriftValidation.validateColumnPath(metadata, column_path);
- org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
- consistencyLevel.validateForRead(keyspace);
-
- ThriftValidation.validateKey(metadata, key);
-
- IDiskAtomFilter filter;
- if (metadata.isSuper())
- {
- CompositeType type = (CompositeType)metadata.comparator;
- SortedSet names = new TreeSet<ByteBuffer>(column_path.column == null ? type.types.get(0) : type.types.get(1));
- names.add(column_path.column == null ? column_path.super_column : column_path.column);
- filter = SuperColumns.fromSCNamesFilter(type, column_path.column == null ? null : column_path.bufferForSuper_column(), new NamesQueryFilter(names));
- }
- else
- {
- SortedSet<ByteBuffer> names = new TreeSet<ByteBuffer>(metadata.comparator);
- names.add(column_path.column);
- filter = new NamesQueryFilter(names);
- }
-
- ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, filter);
-
- Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel);
-
- ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
-
- if (cf == null)
- throw new NotFoundException();
- List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false);
- if (tcolumns.isEmpty())
- throw new NotFoundException();
- assert tcolumns.size() == 1;
- return tcolumns.get(0);
- }
-
public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
{
@@ -455,7 +424,46 @@ public class CassandraServer implements Cassandra.Iface
try
{
- return internal_get(key, column_path, consistency_level);
+ ThriftClientState cState = state();
+ String keyspace = cState.getKeyspace();
+ cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
+
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
+ ThriftValidation.validateColumnPath(metadata, column_path);
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
+
+ ThriftValidation.validateKey(metadata, key);
+
+ IDiskAtomFilter filter;
+ if (metadata.isSuper())
+ {
+ CompositeType type = (CompositeType)metadata.comparator;
+ SortedSet names = new TreeSet<ByteBuffer>(column_path.column == null ? type.types.get(0) : type.types.get(1));
+ names.add(column_path.column == null ? column_path.super_column : column_path.column);
+ filter = SuperColumns.fromSCNamesFilter(type, column_path.column == null ? null : column_path.bufferForSuper_column(), new NamesQueryFilter(names));
+ }
+ else
+ {
+ SortedSet<ByteBuffer> names = new TreeSet<ByteBuffer>(metadata.comparator);
+ names.add(column_path.column);
+ filter = new NamesQueryFilter(names);
+ }
+
+ long now = System.currentTimeMillis();
+ ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter);
+
+ Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel);
+
+ ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
+
+ if (cf == null)
+ throw new NotFoundException();
+ List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now);
+ if (tcolumns.isEmpty())
+ throw new NotFoundException();
+ assert tcolumns.size() == 1;
+ return tcolumns.get(0);
}
catch (RequestValidationException e)
{
@@ -490,9 +498,10 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
Table table = Table.open(keyspace);
ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
+ long timestamp = System.currentTimeMillis();
if (predicate.column_names != null)
- return get_slice(key, column_parent, predicate, consistency_level).size();
+ return getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level).size();
int pageSize;
// request by page if this is a large row
@@ -525,7 +534,7 @@ public class CassandraServer implements Cassandra.Iface
while (true)
{
predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
- columns = get_slice(key, column_parent, predicate, consistency_level);
+ columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
if (columns.isEmpty())
break;
@@ -594,7 +603,12 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
- Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+ Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace,
+ keys,
+ column_parent,
+ System.currentTimeMillis(),
+ predicate,
+ consistency_level);
for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
counts.put(cf.getKey(), cf.getValue().size());
@@ -1068,14 +1082,11 @@ public class CassandraServer implements Cassandra.Iface
try
{
- String keyspace = null;
- CFMetaData metadata = null;
-
ThriftClientState cState = state();
- keyspace = cState.getKeyspace();
+ String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
- metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
@@ -1101,12 +1112,19 @@ public class CassandraServer implements Cassandra.Iface
: RowPosition.forKey(range.end_key, p);
bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
}
+ long now = System.currentTimeMillis();
schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, column_parent.super_column);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent.column_family, filter, bounds,
- range.row_filter, range.count), consistencyLevel);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+ column_parent.column_family,
+ now,
+ filter,
+ bounds,
+ range.row_filter,
+ range.count),
+ consistencyLevel);
}
finally
{
@@ -1114,7 +1132,7 @@ public class CassandraServer implements Cassandra.Iface
}
assert rows != null;
- return thriftifyKeySlices(rows, column_parent, predicate);
+ return thriftifyKeySlices(rows, column_parent, predicate, now);
}
catch (RequestValidationException e)
{
@@ -1185,12 +1203,21 @@ public class CassandraServer implements Cassandra.Iface
}
List<Row> rows;
+ long now = System.currentTimeMillis();
schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, filter,
- bounds, range.row_filter, range.count, true, true), consistencyLevel);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+ column_family,
+ now,
+ filter,
+ bounds,
+ range.row_filter,
+ range.count,
+ true,
+ true),
+ consistencyLevel);
}
finally
{
@@ -1198,7 +1225,7 @@ public class CassandraServer implements Cassandra.Iface
}
assert rows != null;
- return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
+ return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate, now);
}
catch (RequestValidationException e)
{
@@ -1219,13 +1246,13 @@ public class CassandraServer implements Cassandra.Iface
}
}
- private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
+ private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate, long now)
{
List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
for (Row row : rows)
{
- List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed);
+ List<ColumnOrSuperColumn> thriftifiedColumns = thriftifyColumnFamily(row.cf, column_parent.super_column != null, reversed, now);
keySlices.add(new KeySlice(row.key.key, thriftifiedColumns));
}
@@ -1265,15 +1292,17 @@ public class CassandraServer implements Cassandra.Iface
p.getMinimumToken().minKeyBound());
IDiskAtomFilter filter = ThriftValidation.asIFilter(column_predicate, metadata, column_parent.super_column);
+ long now = System.currentTimeMillis();
RangeSliceCommand command = new RangeSliceCommand(keyspace,
column_parent.column_family,
+ now,
filter,
bounds,
index_clause.expressions,
index_clause.count);
List<Row> rows = StorageProxy.getRangeSlice(command, consistencyLevel);
- return thriftifyKeySlices(rows, column_parent, column_predicate);
+ return thriftifyKeySlices(rows, column_parent, column_predicate, now);
}
catch (RequestValidationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.RangeSliceCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RangeSliceCommand.bin b/test/data/serialization/2.0/db.RangeSliceCommand.bin
index e96746d..81a0c02 100644
Binary files a/test/data/serialization/2.0/db.RangeSliceCommand.bin and b/test/data/serialization/2.0/db.RangeSliceCommand.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin b/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin
index 0e143a6..e9c33a2 100644
Binary files a/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin and b/test/data/serialization/2.0/db.SliceByNamesReadCommand.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/data/serialization/2.0/db.SliceFromReadCommand.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.SliceFromReadCommand.bin b/test/data/serialization/2.0/db.SliceFromReadCommand.bin
index 7b357aa..1beede3 100644
Binary files a/test/data/serialization/2.0/db.SliceFromReadCommand.bin and b/test/data/serialization/2.0/db.SliceFromReadCommand.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/long/org/apache/cassandra/db/LongTableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongTableTest.java b/test/long/org/apache/cassandra/db/LongTableTest.java
index e209b03..70b40d2 100644
--- a/test/long/org/apache/cassandra/db/LongTableTest.java
+++ b/test/long/org/apache/cassandra/db/LongTableTest.java
@@ -56,7 +56,10 @@ public class LongTableTest extends SchemaLoader
{
for (int j = 0; j < i; j++)
{
- cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key" + i), "Standard1", ByteBufferUtil.bytes("c" + j)));
+ cf = cfStore.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key" + i),
+ "Standard1",
+ ByteBufferUtil.bytes("c" + j),
+ System.currentTimeMillis()));
TableTest.assertColumns(cf, "c" + j);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 1ba45e6..a893576 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -425,7 +425,7 @@ public class SchemaLoader
for (int i = offset; i < offset + numberOfRows; i++)
{
DecoratedKey key = Util.dk("key" + i);
- store.getColumnFamily(QueryFilter.getNamesFilter(key, columnFamily, ByteBufferUtil.bytes("col" + i)));
+ store.getColumnFamily(QueryFilter.getNamesFilter(key, columnFamily, ByteBufferUtil.bytes("col" + i), System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index a14fd99..053fa7d 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -150,10 +150,7 @@ public class Util
: new SliceQueryFilter(SuperColumns.startOf(superColumn), SuperColumns.endOf(superColumn), false, Integer.MAX_VALUE);
Token min = StorageService.getPartitioner().getMinimumToken();
- return cfs.getRangeSlice(new Bounds<Token>(min, min).toRowBounds(),
- 10000,
- filter,
- null);
+ return cfs.getRangeSlice(new Bounds<Token>(min, min).toRowBounds(), null, filter, 10000);
}
/**
@@ -180,7 +177,7 @@ public class Util
{
ColumnFamilyStore cfStore = table.getColumnFamilyStore(cfName);
assert cfStore != null : "Column family " + cfName + " has not been defined";
- return cfStore.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ return cfStore.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
}
public static byte[] concatByteArrays(byte[] first, byte[]... remaining)
@@ -258,7 +255,7 @@ public class Util
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
- int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ int gcBefore = cfs.gcBefore(System.currentTimeMillis());
AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
task.execute(null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index 04fd5f8..52c4d36 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -179,7 +179,7 @@ public class DefsTest extends SchemaLoader
assert store != null;
store.forceBlockingFlush();
- ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, cf, ByteBufferUtil.bytes("col0")));
+ ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, cf, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
assert ByteBufferUtil.bytes("value0").equals(col.value());
@@ -252,7 +252,7 @@ public class DefsTest extends SchemaLoader
assert store != null;
store.forceBlockingFlush();
- ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0")));
+ ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
assert ByteBufferUtil.bytes("value0").equals(col.value());
@@ -360,7 +360,7 @@ public class DefsTest extends SchemaLoader
assert store != null;
store.forceBlockingFlush();
- ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0")));
+ ColumnFamily cfam = store.getColumnFamily(QueryFilter.getNamesFilter(dk, newCf.cfName, ByteBufferUtil.bytes("col0"), System.currentTimeMillis()));
assert cfam.getColumn(ByteBufferUtil.bytes("col0")) != null;
Column col = cfam.getColumn(ByteBufferUtil.bytes("col0"));
assert ByteBufferUtil.bytes("value0").equals(col.value());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 513db98..4c02a75 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -115,7 +115,7 @@ public class CleanupTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- rows = table.getColumnFamilyStore(CF1).search(clause, range, Integer.MAX_VALUE, filter);
+ rows = table.getColumnFamilyStore(CF1).search(range, clause, filter, Integer.MAX_VALUE);
assertEquals(LOOPS, rows.size());
// we don't allow cleanup when the local host has no range to avoid wipping up all data when a node has not join the ring.
@@ -137,7 +137,7 @@ public class CleanupTest extends SchemaLoader
assert cfs.getSSTables().isEmpty();
// 2ary indexes should result in no results, too (although tombstones won't be gone until compacted)
- rows = cfs.search(clause, range, Integer.MAX_VALUE, filter);
+ rows = cfs.search(range, clause, filter, Integer.MAX_VALUE);
assertEquals(0, rows.size());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
index f835ad3..ca0ec6e 100644
--- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -69,7 +69,7 @@ public class CollationControllerTest extends SchemaLoader
// A NamesQueryFilter goes down one code path (through collectTimeOrderedData())
// It should only iterate the last flushed sstable, since it probably contains the most recent value for Column1
- QueryFilter filter = QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1"));
+ QueryFilter filter = QueryFilter.getNamesFilter(dk, "Standard1", ByteBufferUtil.bytes("Column1"), System.currentTimeMillis());
CollationController controller = new CollationController(store, filter, Integer.MIN_VALUE);
controller.getTopLevelColumns();
assertEquals(1, controller.getSstablesIterated());
@@ -77,7 +77,7 @@ public class CollationControllerTest extends SchemaLoader
// SliceQueryFilter goes down another path (through collectAllData())
// We will read "only" the last sstable in that case, but because the 2nd sstable has a tombstone that is more
// recent than the maxTimestamp of the very first sstable we flushed, we should only read the 2 first sstables.
- filter = QueryFilter.getIdentityFilter(dk, "Standard1");
+ filter = QueryFilter.getIdentityFilter(dk, "Standard1", System.currentTimeMillis());
controller = new CollationController(store, filter, Integer.MIN_VALUE);
controller.getTopLevelColumns();
assertEquals(2, controller.getSstablesIterated());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index a0f64de..39802cc 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -94,7 +94,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
cfs.forceBlockingFlush();
cfs.getRecentSSTablesPerReadHistogram(); // resets counts
- cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), "Standard1", ByteBufferUtil.bytes("Column1")));
+ cfs.getColumnFamily(QueryFilter.getNamesFilter(Util.dk("key1"), "Standard1", ByteBufferUtil.bytes("Column1"), System.currentTimeMillis()));
assertEquals(1, cfs.getRecentSSTablesPerReadHistogram()[0]);
}
@@ -116,7 +116,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
List<SSTableReader> ssTables = table.getAllSSTables();
assertEquals(1, ssTables.size());
ssTables.get(0).forceFilterFailures();
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), "Standard1"));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), "Standard1", System.currentTimeMillis()));
assertNull(cf);
}
@@ -135,12 +135,21 @@ public class ColumnFamilyStoreTest extends SchemaLoader
{
public void runMayThrow() throws IOException
{
- QueryFilter sliceFilter = QueryFilter.getSliceFilter(Util.dk("key1"), "Standard2", ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ QueryFilter sliceFilter = QueryFilter.getSliceFilter(Util.dk("key1"),
+ "Standard2",
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ 1,
+ System.currentTimeMillis());
ColumnFamily cf = store.getColumnFamily(sliceFilter);
assert cf.isMarkedForDelete();
assert cf.getColumnCount() == 0;
- QueryFilter namesFilter = QueryFilter.getNamesFilter(Util.dk("key1"), "Standard2", ByteBufferUtil.bytes("a"));
+ QueryFilter namesFilter = QueryFilter.getNamesFilter(Util.dk("key1"),
+ "Standard2",
+ ByteBufferUtil.bytes("a"),
+ System.currentTimeMillis());
cf = store.getColumnFamily(namesFilter);
assert cf.isMarkedForDelete();
assert cf.getColumnCount() == 0;
@@ -157,9 +166,9 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IPartitioner p = StorageService.getPartitioner();
List<Row> result = cfs.getRangeSlice(Util.range(p, "key1", "key2"),
- 10,
+ null,
new NamesQueryFilter(ByteBufferUtil.bytes("asdf")),
- null);
+ 10);
assertEquals(1, result.size());
assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key2"));
}
@@ -195,7 +204,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
assert rows != null;
assert rows.size() == 2 : StringUtils.join(rows, ",");
@@ -212,14 +221,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// add a second expression
IndexExpression expr2 = new IndexExpression(ByteBufferUtil.bytes("notbirthdate"), IndexOperator.GTE, ByteBufferUtil.bytes(2L));
clause = Arrays.asList(expr, expr2);
- rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
assert "k3".equals( key );
// same query again, but with resultset not including the subordinate expression
- rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, new NamesQueryFilter(ByteBufferUtil.bytes("birthdate")));
+ rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, new NamesQueryFilter(ByteBufferUtil.bytes("birthdate")), 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
@@ -229,7 +238,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// once more, this time with a slice rowset that needs to be expanded
SliceQueryFilter emptyFilter = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 0);
- rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, emptyFilter);
+ rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, emptyFilter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
@@ -241,7 +250,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// doesn't tell the scan loop that it's done
IndexExpression expr3 = new IndexExpression(ByteBufferUtil.bytes("notbirthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(-1L));
clause = Arrays.asList(expr, expr3);
- rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
assert rows.isEmpty();
}
@@ -264,7 +273,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
assert rows != null;
assert rows.size() == 50 : rows.size();
@@ -290,7 +299,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = cfs.search(clause, range, 100, filter);
+ List<Row> rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
String key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -299,7 +308,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete("Indexed1", ByteBufferUtil.bytes("birthdate"), 1);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.isEmpty();
// verify that it's not being indexed under the deletion column value either
@@ -307,14 +316,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ByteBuffer deletionLong = ByteBufferUtil.bytes((long) ByteBufferUtil.toInt(deletion.value()));
IndexExpression expr0 = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, deletionLong);
List<IndexExpression> clause0 = Arrays.asList(expr0);
- rows = cfs.search(clause0, range, 100, filter);
+ rows = cfs.search(range, clause0, filter, 100);
assert rows.isEmpty();
// resurrect w/ a newer timestamp
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 2);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -323,7 +332,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete("Indexed1", 1);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -332,7 +341,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete("Indexed1", ByteBufferUtil.bytes("birthdate"), 1);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -341,14 +350,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.delete("Indexed1", 3);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.isEmpty() : StringUtils.join(rows, ",");
// make sure obsolete mutations don't generate an index entry
rm = new RowMutation("Keyspace3", ByteBufferUtil.bytes("k1"));
rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 3);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.isEmpty() : StringUtils.join(rows, ",");
// try insert followed by row delete in the same mutation
@@ -356,7 +365,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 1);
rm.delete("Indexed1", 2);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.isEmpty() : StringUtils.join(rows, ",");
// try row delete followed by insert in the same mutation
@@ -364,7 +373,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.delete("Indexed1", 3);
rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 4);
rm.apply();
- rows = cfs.search(clause, range, 100, filter);
+ rows = cfs.search(range, clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -389,12 +398,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ List<Row> rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
assert rows.size() == 0;
expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(2L));
clause = Arrays.asList(expr);
- rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
String key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -403,7 +412,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.add("Indexed1", ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(3L), 0);
rm.apply();
- rows = table.getColumnFamilyStore("Indexed1").search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
key = ByteBufferUtil.string(rows.get(0).key.key);
assert "k1".equals( key );
@@ -433,7 +442,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
List<IndexExpression> clause = Arrays.asList(expr);
IDiskAtomFilter filter = new IdentityQueryFilter();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ List<Row> rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(1, rows.size());
// force a flush, so our index isn't being read from a memtable
@@ -448,14 +457,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// because the new value was not indexed and the old value should be ignored
// (and in fact purged from the index cf).
// first check for the old value
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
// now check for the updated value
expr = new IndexExpression(colName, IndexOperator.EQ, val2);
clause = Arrays.asList(expr);
filter = new IdentityQueryFilter();
range = Util.range("", "");
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
// now, reset back to the original value, still skipping the index update, to
@@ -468,7 +477,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
clause = Arrays.asList(expr);
filter = new IdentityQueryFilter();
range = Util.range("", "");
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
}
@@ -505,12 +514,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
List<IndexExpression> clause = Arrays.asList(expr);
IDiskAtomFilter filter = new IdentityQueryFilter();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ List<Row> rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(1, rows.size());
// force a flush and retry the query, so our index isn't being read from a memtable
table.getColumnFamilyStore(cfName).forceBlockingFlush();
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(1, rows.size());
// now apply another update, but force the index update to be skipped
@@ -522,14 +531,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// because the new value was not indexed and the old value should be ignored
// (and in fact purged from the index cf).
// first check for the old value
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
// now check for the updated value
expr = new IndexExpression(colName, IndexOperator.EQ, val2);
clause = Arrays.asList(expr);
filter = new IdentityQueryFilter();
range = Util.range("", "");
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
// now, reset back to the original value, still skipping the index update, to
@@ -542,7 +551,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
clause = Arrays.asList(expr);
filter = new IdentityQueryFilter();
range = Util.range("", "");
- rows = table.getColumnFamilyStore(cfName).search(clause, range, 100, filter);
+ rows = table.getColumnFamilyStore(cfName).search(range, clause, filter, 100);
assertEquals(0, rows.size());
}
@@ -579,7 +588,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range<RowPosition> range = Util.range("", "");
- List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(clause, range, 1, filter);
+ List<Row> rows = Table.open("Keyspace1").getColumnFamilyStore("Indexed1").search(range, clause, filter, 1);
assert rows != null;
assert rows.size() == 1 : StringUtils.join(rows, ",");
@@ -623,7 +632,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
List<IndexExpression> clause = Arrays.asList(expr);
IDiskAtomFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
- List<Row> rows = table.getColumnFamilyStore("Indexed2").search(clause, Util.range("", ""), 100, filter);
+ List<Row> rows = table.getColumnFamilyStore("Indexed2").search(Util.range("", ""), clause, filter, 100);
assert rows.size() == 1 : StringUtils.join(rows, ",");
assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.key));
}
@@ -635,9 +644,9 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IPartitioner p = StorageService.getPartitioner();
List<Row> result = cfs.getRangeSlice(Util.bounds("key1", "key2"),
- 10,
+ null,
new NamesQueryFilter(ByteBufferUtil.bytes("asdf")),
- null);
+ 10);
assertEquals(2, result.size());
assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key1"));
}
@@ -672,7 +681,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
- assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+ assertRowAndColCount(1, 6, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// delete
RowMutation rm = new RowMutation(table.getName(), key.key);
@@ -680,13 +689,13 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
// verify delete.
- assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// flush
cfs.forceBlockingFlush();
// re-verify delete.
- assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// late insert.
putColsSuper(cfs, key, scfName,
@@ -694,14 +703,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
new Column(getBytes(7L), ByteBufferUtil.bytes("val7"), 1L));
// re-verify delete.
- assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+ assertRowAndColCount(1, 0, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
// make sure new writes are recognized.
putColsSuper(cfs, key, scfName,
new Column(getBytes(3L), ByteBufferUtil.bytes("val3"), 3),
new Column(getBytes(8L), ByteBufferUtil.bytes("val8"), 3),
new Column(getBytes(9L), ByteBufferUtil.bytes("val9"), 3));
- assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), null));
+ assertRowAndColCount(1, 3, scfName, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
}
private static void assertRowAndColCount(int rowCount, int colCount, ByteBuffer sc, boolean isDeleted, Collection<Row> rows) throws CharacterCodingException
@@ -760,14 +769,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// insert
putColsStandard(cfs, key, column("col1", "val1", 1), column("col2", "val2", 1));
- assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 2, null, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// flush.
cfs.forceBlockingFlush();
// insert, don't flush
putColsStandard(cfs, key, column("col3", "val3", 1), column("col4", "val4", 1));
- assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 4, null, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// delete (from sstable and memtable)
RowMutation rm = new RowMutation(table.getName(), key.key);
@@ -775,27 +784,27 @@ public class ColumnFamilyStoreTest extends SchemaLoader
rm.apply();
// verify delete
- assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// flush
cfs.forceBlockingFlush();
// re-verify delete. // first breakage is right here because of CASSANDRA-1837.
- assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// simulate a 'late' insertion that gets put in after the deletion. should get inserted, but fail on read.
putColsStandard(cfs, key, column("col5", "val5", 1), column("col2", "val2", 1));
// should still be nothing there because we deleted this row. 2nd breakage, but was undetected because of 1837.
- assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 0, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// make sure that new writes are recognized.
putColsStandard(cfs, key, column("col6", "val6", 3), column("col7", "val7", 3));
- assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
// and it remains so after flush. (this wasn't failing before, but it's good to check.)
cfs.forceBlockingFlush();
- assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null));
+ assertRowAndColCount(1, 2, null, true, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
}
@@ -844,7 +853,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
new Column(ByteBufferUtil.bytes("b"), ByteBufferUtil.bytes("B"), 1));
// Get the entire supercolumn like normal
- ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName));
+ ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
assertEquals(ByteBufferUtil.bytes("A"), cfGet.getColumn(CompositeType.build(superColName, ByteBufferUtil.bytes("a"))).value());
assertEquals(ByteBufferUtil.bytes("B"), cfGet.getColumn(CompositeType.build(superColName, ByteBufferUtil.bytes("b"))).value());
@@ -852,7 +861,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
SortedSet<ByteBuffer> sliceColNames = new TreeSet<ByteBuffer>(cfs.metadata.comparator);
sliceColNames.add(CompositeType.build(superColName, ByteBufferUtil.bytes("a")));
sliceColNames.add(CompositeType.build(superColName, ByteBufferUtil.bytes("b")));
- SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, new NamesQueryFilter(sliceColNames));
+ SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
ColumnFamily cfSliced = cmd.getRow(table).cf;
// Make sure the slice returns the same as the straight get
@@ -888,7 +897,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
putColsStandard(cfs, key, new Column(cname, ByteBufferUtil.bytes("b"), 1));
// Test fetching the column by name returns the first column
- SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, new NamesQueryFilter(cname));
+ SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(tableName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(cname));
ColumnFamily cf = cmd.getRow(table).cf;
Column column = (Column) cf.getColumn(cname);
assert column.value().equals(ByteBufferUtil.bytes("a")) : "expecting a, got " + ByteBufferUtil.string(column.value());
@@ -899,7 +908,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
int columns = 0;
for (Row row : rows)
{
- columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount));
+ columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount), System.currentTimeMillis());
}
assert columns == expectedCount : "Expected " + expectedCount + " live columns but got " + columns + ": " + rows;
}
@@ -929,11 +938,46 @@ public class ColumnFamilyStoreTest extends SchemaLoader
sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 3);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 5, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 8, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 10, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 10);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 11);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 3,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 3);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 5,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 5);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 8,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 8);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 10,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 10);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 100,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 11);
// Check that when querying by name, we always include all names for a
// gien row even if it means returning more columns than requested (this is necesseray for CQL)
@@ -944,11 +988,46 @@ public class ColumnFamilyStoreTest extends SchemaLoader
ByteBufferUtil.bytes("c2")
));
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 1, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 3);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 4, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 5, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 5);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 6, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
- assertTotalColCount(cfs.getRangeSlice(Util.range("", ""), 100, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, false), 8);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 1,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 3);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 4,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 5);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 5,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 5);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 6,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 8);
+ assertTotalColCount(cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 100,
+ System.currentTimeMillis(),
+ true,
+ false),
+ 8);
}
@Test
@@ -975,13 +1054,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
- Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+ Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 3,
+ System.currentTimeMillis(),
+ true,
+ true);
assert rows.size() == 1 : "Expected 1 row, got " + rows;
Row row = rows.iterator().next();
assertColumnNames(row, "c0", "c1", "c2");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
- rows = cfs.getRangeSlice(Util.range("", ""), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+ rows = cfs.getRangeSlice(Util.range("", ""),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 3,
+ System.currentTimeMillis(),
+ true,
+ true);
assert rows.size() == 2 : "Expected 2 rows, got " + rows;
Iterator<Row> iter = rows.iterator();
Row row1 = iter.next();
@@ -990,13 +1081,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
assertColumnNames(row2, "c0");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c0")));
- rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+ rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 3,
+ System.currentTimeMillis(),
+ true,
+ true);
assert rows.size() == 1 : "Expected 1 row, got " + rows;
row = rows.iterator().next();
assertColumnNames(row, "c0", "c1", "c2");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
- rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")), 3, ThriftValidation.asIFilter(sp, cfs.metadata, null), null, true, true);
+ rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")),
+ null,
+ ThriftValidation.asIFilter(sp, cfs.metadata, null),
+ 3,
+ System.currentTimeMillis(),
+ true,
+ true);
assert rows.size() == 2 : "Expected 2 rows, got " + rows;
iter = rows.iterator();
row1 = iter.next();
@@ -1057,25 +1160,25 @@ public class ColumnFamilyStoreTest extends SchemaLoader
List<Row> rows;
// Start and end inclusive
- rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+ rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
assert rows.size() == 6;
assert rows.get(0).key.equals(idk(2));
assert rows.get(rows.size() - 1).key.equals(idk(7));
// Start and end excluded
- rows = cfs.getRangeSlice(new ExcludingBounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+ rows = cfs.getRangeSlice(new ExcludingBounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
assert rows.size() == 4;
assert rows.get(0).key.equals(idk(3));
assert rows.get(rows.size() - 1).key.equals(idk(6));
// Start excluded, end included
- rows = cfs.getRangeSlice(new Range<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+ rows = cfs.getRangeSlice(new Range<RowPosition>(rp("2"), rp("7")), null, qf, 100);
assert rows.size() == 5;
assert rows.get(0).key.equals(idk(3));
assert rows.get(rows.size() - 1).key.equals(idk(7));
// Start included, end excluded
- rows = cfs.getRangeSlice(new IncludingExcludingBounds<RowPosition>(rp("2"), rp("7")), 100, qf, null);
+ rows = cfs.getRangeSlice(new IncludingExcludingBounds<RowPosition>(rp("2"), rp("7")), null, qf, 100);
assert rows.size() == 5;
assert rows.get(0).key.equals(idk(2));
assert rows.get(rows.size() - 1).key.equals(idk(6));
@@ -1102,7 +1205,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, LongType.instance.decompose(1L));
// explicitly tell to the KeysSearcher to use column limiting for rowsPerQuery to trigger bogus columnsRead--; (CASSANDRA-3996)
- List<Row> rows = store.search(Arrays.asList(expr), Util.range("", ""), 10, new IdentityQueryFilter(), true);
+ List<Row> rows = store.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), 10, System.currentTimeMillis(), true);
assert rows.size() == 10;
}
@@ -1485,8 +1588,12 @@ public class ColumnFamilyStoreTest extends SchemaLoader
String... colNames)
{
List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(rp(rowKey), rp(rowKey)),
- Integer.MAX_VALUE,
- filter, null, false, false);
+ null,
+ filter,
+ Integer.MAX_VALUE,
+ System.currentTimeMillis(),
+ false,
+ false);
assertSame("unexpected number of rows ", 1, rows.size());
Row row = rows.get(0);
Collection<Column> cols = !filter.isReversed() ? row.cf.getSortedColumns() : row.cf.getReverseSortedColumns();
@@ -1513,7 +1620,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
{
DecoratedKey ROW = Util.dk(rowKey);
System.err.println("Original:");
- ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(ROW, "Standard1"));
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(ROW, "Standard1", System.currentTimeMillis()));
System.err.println("Row key: " + rowKey + " Cols: "
+ Iterables.transform(cf.getSortedColumns(), new Function<Column, String>()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
index 95caac6..a01c25c 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
@@ -142,9 +142,9 @@ public class ColumnFamilyTest extends SchemaLoader
// check that tombstone wins timestamp ties
cf_result.addTombstone(ByteBufferUtil.bytes("col1"), 0, 3);
- assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete();
+ assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete(System.currentTimeMillis());
cf_result.addColumn(ByteBufferUtil.bytes("col1"), val2, 3);
- assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete();
+ assert cf_result.getColumn(ByteBufferUtil.bytes("col1")).isMarkedForDelete(System.currentTimeMillis());
// check that column value wins timestamp ties in absence of tombstone
cf_result.addColumn(ByteBufferUtil.bytes("col3"), val, 2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 7f57aae..922e255 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -119,14 +119,16 @@ public class KeyCacheTest extends SchemaLoader
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- 10));
+ 10,
+ System.currentTimeMillis()));
cfs.getColumnFamily(QueryFilter.getSliceFilter(key2,
COLUMN_FAMILY1,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- 10));
+ 10,
+ System.currentTimeMillis()));
assertEquals(2, CacheService.instance.keyCache.size());
@@ -141,14 +143,16 @@ public class KeyCacheTest extends SchemaLoader
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- 10));
+ 10,
+ System.currentTimeMillis()));
cfs.getColumnFamily(QueryFilter.getSliceFilter(key2,
COLUMN_FAMILY1,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
false,
- 10));
+ 10,
+ System.currentTimeMillis()));
assert CacheService.instance.keyCache.size() == 4;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
index 48838b9..a6fbecf 100644
--- a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
@@ -71,7 +71,7 @@ public class KeyCollisionTest extends SchemaLoader
insert("key1", "key2", "key3"); // token = 4
insert("longKey1", "longKey2"); // token = 8
- List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), 10000, new IdentityQueryFilter(), null);
+ List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), null, new IdentityQueryFilter(), 10000);
assert rows.size() == 4 : "Expecting 4 keys, got " + rows.size();
assert rows.get(0).key.key.equals(ByteBufferUtil.bytes("k2"));
assert rows.get(1).key.key.equals(ByteBufferUtil.bytes("k3"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f7628ce/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index e5f0acb..0ed0c4a 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -78,7 +78,7 @@ public class RangeTombstoneTest extends SchemaLoader
columns.add(b(i));
for (int i : dead)
columns.add(b(i));
- cf = cfs.getColumnFamily(QueryFilter.getNamesFilter(dk(key), CFNAME, columns));
+ cf = cfs.getColumnFamily(QueryFilter.getNamesFilter(dk(key), CFNAME, columns, System.currentTimeMillis()));
for (int i : live)
assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -86,7 +86,7 @@ public class RangeTombstoneTest extends SchemaLoader
assert !isLive(cf, cf.getColumn(b(i))) : "Column " + i + " shouldn't be live";
// Queries by slices
- cf = cfs.getColumnFamily(QueryFilter.getSliceFilter(dk(key), CFNAME, b(7), b(30), false, Integer.MAX_VALUE));
+ cf = cfs.getColumnFamily(QueryFilter.getSliceFilter(dk(key), CFNAME, b(7), b(30), false, Integer.MAX_VALUE, System.currentTimeMillis()));
for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 })
assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -130,7 +130,7 @@ public class RangeTombstoneTest extends SchemaLoader
rm.apply();
cfs.forceBlockingFlush();
- cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME));
+ cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME, System.currentTimeMillis()));
for (int i = 0; i < 5; i++)
assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -141,7 +141,7 @@ public class RangeTombstoneTest extends SchemaLoader
// Compact everything and re-test
CompactionManager.instance.performMaximal(cfs);
- cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME));
+ cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dk(key), CFNAME, System.currentTimeMillis()));
for (int i = 0; i < 5; i++)
assert isLive(cf, cf.getColumn(b(i))) : "Column " + i + " should be live";
@@ -153,7 +153,7 @@ public class RangeTombstoneTest extends SchemaLoader
private static boolean isLive(ColumnFamily cf, Column c)
{
- return c != null && !c.isMarkedForDelete() && !cf.deletionInfo().isDeleted(c);
+ return c != null && !c.isMarkedForDelete(System.currentTimeMillis()) && !cf.deletionInfo().isDeleted(c);
}
private static ByteBuffer b(int i)