You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/06/11 16:48:34 UTC
git commit: Explicitly use Long.MAX_VALUE timestamp for counter
deletions
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 deaf5ba15 -> 5fe755762
Explicitly use Long.MAX_VALUE timestamp for counter deletions
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-7346
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fe75576
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fe75576
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fe75576
Branch: refs/heads/cassandra-2.1
Commit: 5fe7557627fac6ace2554a4f8ef552c9d9512490
Parents: deaf5ba
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Jun 11 09:47:22 2014 -0500
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jun 11 09:47:22 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/cql/AbstractModification.java | 9 ++++--
.../apache/cassandra/cql/DeleteStatement.java | 27 ++++++++++--------
.../apache/cassandra/cql/UpdateStatement.java | 7 -----
.../apache/cassandra/cql3/UpdateParameters.java | 19 ++++++++++---
.../cql3/statements/DeleteStatement.java | 2 +-
.../apache/cassandra/db/CounterMutation.java | 3 ++
.../cassandra/thrift/CassandraServer.java | 29 ++++++++++++--------
8 files changed, 59 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b70782b..a8a84d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.0
+ * Explicitly use Long.MAX_VALUE timestamp for counter deletions
+ (CASSANDRA-7346)
* Fix native protocol CAS batches (CASSANDRA-7337)
* Reduce likelihood of contention on local paxos locking (CASSANDRA-7359)
* Upgrade to Pig 0.12.1 (CASSANDRA-6556)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/AbstractModification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AbstractModification.java b/src/java/org/apache/cassandra/cql/AbstractModification.java
index 8da2611..9b88b5e 100644
--- a/src/java/org/apache/cassandra/cql/AbstractModification.java
+++ b/src/java/org/apache/cassandra/cql/AbstractModification.java
@@ -107,8 +107,11 @@ public abstract class AbstractModification
*
* @throws InvalidRequestException on the wrong request
*/
- public abstract List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
- throws InvalidRequestException, UnauthorizedException;
+ public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
+ throws InvalidRequestException, UnauthorizedException
+ {
+ return prepareRowMutations(keyspace, clientState, null, variables);
+ }
/**
* Convert statement into a list of mutations to apply on the server
@@ -121,6 +124,6 @@ public abstract class AbstractModification
*
* @throws InvalidRequestException on the wrong request
*/
- public abstract List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
+ public abstract List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long batchTimestamp, List<ByteBuffer> variables)
throws InvalidRequestException, UnauthorizedException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DeleteStatement.java b/src/java/org/apache/cassandra/cql/DeleteStatement.java
index 71942e4..e00ffc7 100644
--- a/src/java/org/apache/cassandra/cql/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.IMutation;
@@ -62,13 +63,7 @@ public class DeleteStatement extends AbstractModification
return keys;
}
- public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
- throws InvalidRequestException, UnauthorizedException
- {
- return prepareRowMutations(keyspace, clientState, null, variables);
- }
-
- public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
+ public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long batchTimestamp, List<ByteBuffer> variables)
throws InvalidRequestException, UnauthorizedException
{
CFMetaData metadata = validateColumnFamily(keyspace, columnFamily);
@@ -79,22 +74,22 @@ public class DeleteStatement extends AbstractModification
List<IMutation> mutations = new ArrayList<IMutation>(keys.size());
for (Term key : keys)
- mutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState, variables, metadata));
+ mutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, batchTimestamp, clientState, variables, metadata));
return mutations;
}
- public Mutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
+ public Mutation mutationForKey(ByteBuffer key, String keyspace, Long batchTimestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
throws InvalidRequestException
{
Mutation mutation = new Mutation(keyspace, key);
QueryProcessor.validateKeyAlias(metadata, keyName);
- if (columns.size() < 1)
+ if (columns.isEmpty())
{
// No columns, delete the partition
- mutation.delete(columnFamily, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+ mutation.delete(columnFamily, batchTimestamp == null ? getTimestamp(clientState) : batchTimestamp);
}
else
{
@@ -104,13 +99,21 @@ public class DeleteStatement extends AbstractModification
{
CellName columnName = metadata.comparator.cellFromByteBuffer(column.getByteBuffer(at, variables));
validateColumnName(columnName);
- mutation.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+ mutation.delete(columnFamily, columnName, batchTimestamp == null ? getTimestamp(clientState) : batchTimestamp);
}
}
return mutation;
}
+ @Override
+ public long getTimestamp(ThriftClientState clientState)
+ {
+ return Schema.instance.getCFMetaData(keyspace, columnFamily).isCounter()
+ ? CounterMutation.TOMBSTONE_TIMESTAMP
+ : super.getTimestamp(clientState);
+ }
+
public String toString()
{
return String.format("DeleteStatement(columns=%s, keyspace=%s, columnFamily=%s, consistency=%s keys=%s)",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index 16a0d76..8a995d2 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -119,13 +119,6 @@ public class UpdateStatement extends AbstractModification
}
/** {@inheritDoc} */
- public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
- throws InvalidRequestException, UnauthorizedException
- {
- return prepareRowMutations(keyspace, clientState, null, variables);
- }
-
- /** {@inheritDoc} */
public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
throws InvalidRequestException, UnauthorizedException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8a47536..3f0c2b9 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.exceptions.InvalidRequestException;
/**
- * A simple container that simplify passing parameters for collections methods.
+ * A simple container that simplifies passing parameters for (mostly) collections methods.
*/
public class UpdateParameters
{
@@ -61,21 +61,27 @@ public class UpdateParameters
public Cell makeTombstone(CellName name) throws InvalidRequestException
{
QueryProcessor.validateCellName(name, metadata.comparator);
- return new BufferDeletedCell(name, localDeletionTime, timestamp);
+ return new BufferDeletedCell(name, localDeletionTime, tombstoneTimestamp());
}
public RangeTombstone makeRangeTombstone(ColumnSlice slice) throws InvalidRequestException
{
QueryProcessor.validateComposite(slice.start, metadata.comparator);
QueryProcessor.validateComposite(slice.finish, metadata.comparator);
- return new RangeTombstone(slice.start, slice.finish, timestamp, localDeletionTime);
+ return new RangeTombstone(slice.start, slice.finish, tombstoneTimestamp(), localDeletionTime);
}
public RangeTombstone makeTombstoneForOverwrite(ColumnSlice slice) throws InvalidRequestException
{
QueryProcessor.validateComposite(slice.start, metadata.comparator);
QueryProcessor.validateComposite(slice.finish, metadata.comparator);
- return new RangeTombstone(slice.start, slice.finish, timestamp - 1, localDeletionTime);
+ // As of 2.1, will never be called for a counter table. However, in 3.0, CASSANDRA-6506 might change that, so play safe.
+ return new RangeTombstone(slice.start, slice.finish, tombstoneTimestamp() - 1, localDeletionTime);
+ }
+
+ public DeletionInfo makeDeletionInfo()
+ {
+ return new DeletionInfo(tombstoneTimestamp(), localDeletionTime);
}
public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName)
@@ -86,4 +92,9 @@ public class UpdateParameters
CQL3Row row = prefetchedLists.get(rowKey);
return row == null ? Collections.<Cell>emptyList() : row.getCollection(cql3ColumnName);
}
+
+ private long tombstoneTimestamp()
+ {
+ return metadata.isCounter() ? CounterMutation.TOMBSTONE_TIMESTAMP : timestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 49fdfc2..1569ae4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -66,7 +66,7 @@ public class DeleteStatement extends ModificationStatement
if (prefix.isEmpty())
{
// No columns specified, delete the row
- cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
+ cf.delete(params.makeDeletionInfo());
}
else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 2bfdd4e..848e4db 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.*;
public class CounterMutation implements IMutation
{
+ // Counter deletions are final in C*, because there is no way to provide reliable deletion otherwise.
+ public static final long TOMBSTONE_TIMESTAMP = Long.MAX_VALUE;
+
public static final CounterMutationSerializer serializer = new CounterMutationSerializer();
private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fe75576/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1a77ffa..49466bd 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -907,16 +907,18 @@ public class CassandraServer implements Cassandra.Iface
private void deleteColumnOrSuperColumn(org.apache.cassandra.db.Mutation mutation, CFMetaData cfm, Deletion del)
{
+ long timestamp = cfm.isCounter() ? CounterMutation.TOMBSTONE_TIMESTAMP : del.timestamp;
+
if (del.predicate != null && del.predicate.column_names != null)
{
for (ByteBuffer c : del.predicate.column_names)
{
if (del.super_column == null && cfm.isSuper())
- mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), SuperColumns.endOf(c), del.timestamp);
+ mutation.deleteRange(cfm.cfName, SuperColumns.startOf(c), SuperColumns.endOf(c), timestamp);
else if (del.super_column != null)
- mutation.delete(cfm.cfName, cfm.comparator.makeCellName(del.super_column, c), del.timestamp);
+ mutation.delete(cfm.cfName, cfm.comparator.makeCellName(del.super_column, c), timestamp);
else
- mutation.delete(cfm.cfName, cfm.comparator.cellFromByteBuffer(c), del.timestamp);
+ mutation.delete(cfm.cfName, cfm.comparator.cellFromByteBuffer(c), timestamp);
}
}
else if (del.predicate != null && del.predicate.slice_range != null)
@@ -925,24 +927,27 @@ public class CassandraServer implements Cassandra.Iface
mutation.deleteRange(cfm.cfName,
SuperColumns.startOf(del.predicate.getSlice_range().start),
SuperColumns.startOf(del.predicate.getSlice_range().finish),
- del.timestamp);
+ timestamp);
else if (del.super_column != null)
- mutation.deleteRange(cfm.cfName,
- cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().start),
- cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().finish),
- del.timestamp);
+ mutation.deleteRange(cfm.cfName,
+ cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().start),
+ cfm.comparator.makeCellName(del.super_column, del.predicate.getSlice_range().finish),
+ timestamp);
else
mutation.deleteRange(cfm.cfName,
cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().start),
cfm.comparator.cellFromByteBuffer(del.predicate.getSlice_range().finish),
- del.timestamp);
+ timestamp);
}
else
{
if (del.super_column != null)
- mutation.deleteRange(cfm.cfName, SuperColumns.startOf(del.super_column), SuperColumns.endOf(del.super_column), del.timestamp);
+ mutation.deleteRange(cfm.cfName,
+ SuperColumns.startOf(del.super_column),
+ SuperColumns.endOf(del.super_column),
+ timestamp);
else
- mutation.delete(cfm.cfName, del.timestamp);
+ mutation.delete(cfm.cfName, timestamp);
}
}
@@ -1830,7 +1835,7 @@ public class CassandraServer implements Cassandra.Iface
try
{
- internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
+ internal_remove(key, path, CounterMutation.TOMBSTONE_TIMESTAMP, consistency_level, true);
}
catch (RequestValidationException e)
{