You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/25 08:52:52 UTC
git commit: (CQL3) Fix prepend logic and ensure batches have a unique
timestamp
Updated Branches:
refs/heads/trunk 8f46176e9 -> c54fe4c41
(CQL3) Fix prepend logic and ensure batches have a unique timestamp
patch by slebresne; reviewed by jbellis for CASSANDRA-4835
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c54fe4c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c54fe4c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c54fe4c4
Branch: refs/heads/trunk
Commit: c54fe4c41e18ffcedb3a849aead4d7576cd73167
Parents: 8f46176
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Oct 25 08:51:48 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Oct 25 08:51:48 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/cql3/operations/ListOperation.java | 18 +++++++++-----
.../cassandra/cql3/statements/BatchStatement.java | 6 ++--
.../cassandra/cql3/statements/DeleteStatement.java | 4 +-
.../cql3/statements/ModificationStatement.java | 12 +++++----
.../cassandra/cql3/statements/UpdateStatement.java | 6 ++--
6 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ef9530..e067d9c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@
* Move consistency level to the protocol level (CASSANDRA-4734, 4824)
* Fix Subcolumn slice ends not respected (CASSANDRA-4826)
* Fix Assertion error in cql3 select (CASSANDRA-4783)
+ * Fix list prepend logic (CQL3) (CASSANDRA-4835)
Merged from 1.1:
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
* fix indexing empty column values (CASSANDRA-4832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
index b40cc18..64dcdb2 100644
--- a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
+++ b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
@@ -46,7 +46,7 @@ public class ListOperation implements Operation
* For prepend, we need to be able to generate unique but decreasing time
* UUID, which is a bit challenging. To do that, given a time in milliseconds,
* we adds a number representing the 100-nanoseconds precision and make sure
- * that within the same millisecond, that number is always increasing. We
+ * that within the same millisecond, that number is always decreasing. We
* do rely on the fact that the user will only provide decreasing
* milliseconds timestamp for that purpose.
*/
@@ -72,8 +72,8 @@ public class ListOperation implements Operation
assert millis <= current.millis;
PrecisionTime next = millis < current.millis
- ? new PrecisionTime(millis, 0)
- : new PrecisionTime(millis, current.nanos + 1);
+ ? new PrecisionTime(millis, 9999)
+ : new PrecisionTime(millis, Math.max(0, current.nanos - 1));
if (last.compareAndSet(current, next))
return next;
@@ -180,11 +180,9 @@ public class ListOperation implements Operation
{
long time = REFERENCE_TIME - (System.currentTimeMillis() - REFERENCE_TIME);
- // We do the loop in reverse order because getNext() will create increasing time but we want the last
- // value in the prepended list to have the lower time
- for (int i = values.size() - 1; i >= 0; i--)
+ for (int i = 0; i < values.size(); i++)
{
- ColumnNameBuilder b = i == 0 ? builder : builder.copy();
+ ColumnNameBuilder b = i == values.size() - 1 ? builder : builder.copy();
PrecisionTime pt = getNextTime(time);
ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(pt.millis, pt.nanos));
ByteBuffer name = b.add(uuid).build();
@@ -263,6 +261,12 @@ public class ListOperation implements Operation
return new ListOperation(values, Kind.DISCARD_IDX);
}
+ @Override
+ public String toString()
+ {
+ return "ListOperation(" + kind + ", " + values + ")";
+ }
+
private int validateListIdx(Term value, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ff1d730..6ab0271 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -97,17 +97,17 @@ public class BatchStatement extends ModificationStatement
statement.validateConsistency(cl);
}
- public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+ public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
for (ModificationStatement statement : statements)
{
if (isSetTimestamp())
- statement.setTimestamp(getTimestamp(clientState));
+ statement.setTimestamp(getTimestamp(now));
// Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(clientState, variables, local, cl))
+ for (IMutation m : statement.getMutations(clientState, variables, local, cl, now))
{
if (m instanceof CounterMutation && type != Type.COUNTER)
throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 1015a90..76c4374 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -69,7 +69,7 @@ public class DeleteStatement extends ModificationStatement
cl.validateForWrite(cfDef.cfm.ksName);
}
- public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+ public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
// keys
@@ -104,7 +104,7 @@ public class DeleteStatement extends ModificationStatement
Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
- UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
+ UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), -1);
for (ByteBuffer key : keys)
rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, rows == null ? null : rows.get(key)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 426274b..afdff22 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -89,7 +89,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
validateConsistency(cl);
- Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl);
+ Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl, state.getTimestamp());
// The type should have been set by now or we have a bug
assert type != null;
@@ -115,14 +115,14 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException
{
- for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null))
+ for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null, state.getTimestamp()))
mutation.apply();
return null;
}
- public long getTimestamp(ClientState clientState)
+ public long getTimestamp(long now)
{
- return timestamp == null ? clientState.getTimestamp() : timestamp;
+ return timestamp == null ? now : timestamp;
}
public void setTimestamp(long timestamp)
@@ -202,11 +202,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
* @param clientState current client status
* @param variables value for prepared statement markers
* @param local if true, any requests (for collections) performed by getMutation should be done locally only.
+ * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
+ * @param now the current timestamp in microseconds to use if no timestamp is user provided.
*
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+ protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException;
public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 2a99b3f..8c19b10 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -105,7 +105,7 @@ public class UpdateStatement extends ModificationStatement
}
/** {@inheritDoc} */
- public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+ public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
@@ -135,7 +135,7 @@ public class UpdateStatement extends ModificationStatement
Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
Collection<IMutation> mutations = new LinkedList<IMutation>();
- UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive());
+ UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), getTimeToLive());
for (ByteBuffer key: keys)
mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key), cl));
@@ -449,7 +449,7 @@ public class UpdateStatement extends ModificationStatement
cfName,
whereClause,
columns,
- isSetTimestamp() ? getTimestamp(null) : "<now>",
+ isSetTimestamp() ? getTimestamp(-1) : "<now>",
getTimeToLive());
}
}