You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/25 08:52:52 UTC

git commit: (CQL3) Fix prepend logic and ensure batches have a unique timestamp

Updated Branches:
  refs/heads/trunk 8f46176e9 -> c54fe4c41


(CQL3) Fix prepend logic and ensure batches have a unique timestamp

patch by slebresne; reviewed by jbellis for CASSANDRA-4835


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c54fe4c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c54fe4c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c54fe4c4

Branch: refs/heads/trunk
Commit: c54fe4c41e18ffcedb3a849aead4d7576cd73167
Parents: 8f46176
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Oct 25 08:51:48 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Oct 25 08:51:48 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/cql3/operations/ListOperation.java   |   18 +++++++++-----
 .../cassandra/cql3/statements/BatchStatement.java  |    6 ++--
 .../cassandra/cql3/statements/DeleteStatement.java |    4 +-
 .../cql3/statements/ModificationStatement.java     |   12 +++++----
 .../cassandra/cql3/statements/UpdateStatement.java |    6 ++--
 6 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ef9530..e067d9c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@
  * Move consistency level to the protocol level (CASSANDRA-4734, 4824)
  * Fix Subcolumn slice ends not respected (CASSANDRA-4826)
  * Fix Assertion error in cql3 select (CASSANDRA-4783)
+ * Fix list prepend logic (CQL3) (CASSANDRA-4835)
 Merged from 1.1:
  * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
  * fix indexing empty column values (CASSANDRA-4832)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
index b40cc18..64dcdb2 100644
--- a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
+++ b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java
@@ -46,7 +46,7 @@ public class ListOperation implements Operation
      * For prepend, we need to be able to generate unique but decreasing time
      * UUID, which is a bit challenging. To do that, given a time in milliseconds,
      * we adds a number representing the 100-nanoseconds precision and make sure
-     * that within the same millisecond, that number is always increasing. We
+     * that within the same millisecond, that number is always decreasing. We
      * do rely on the fact that the user will only provide decreasing
      * milliseconds timestamp for that purpose.
      */
@@ -72,8 +72,8 @@ public class ListOperation implements Operation
 
             assert millis <= current.millis;
             PrecisionTime next = millis < current.millis
-                    ? new PrecisionTime(millis, 0)
-                    : new PrecisionTime(millis, current.nanos + 1);
+                    ? new PrecisionTime(millis, 9999)
+                    : new PrecisionTime(millis, Math.max(0, current.nanos - 1));
 
             if (last.compareAndSet(current, next))
                 return next;
@@ -180,11 +180,9 @@ public class ListOperation implements Operation
     {
         long time = REFERENCE_TIME - (System.currentTimeMillis() - REFERENCE_TIME);
 
-        // We do the loop in reverse order because getNext() will create increasing time but we want the last
-        // value in the prepended list to have the lower time
-        for (int i = values.size() - 1; i >= 0; i--)
+        for (int i = 0; i < values.size(); i++)
         {
-            ColumnNameBuilder b = i == 0 ? builder : builder.copy();
+            ColumnNameBuilder b = i == values.size() - 1 ? builder : builder.copy();
             PrecisionTime pt = getNextTime(time);
             ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(pt.millis, pt.nanos));
             ByteBuffer name = b.add(uuid).build();
@@ -263,6 +261,12 @@ public class ListOperation implements Operation
         return new ListOperation(values, Kind.DISCARD_IDX);
     }
 
+    @Override
+    public String toString()
+    {
+        return "ListOperation(" + kind + ", " + values + ")";
+    }
+
     private int validateListIdx(Term value, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ff1d730..6ab0271 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -97,17 +97,17 @@ public class BatchStatement extends ModificationStatement
             statement.validateConsistency(cl);
     }
 
-    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
         for (ModificationStatement statement : statements)
         {
             if (isSetTimestamp())
-                statement.setTimestamp(getTimestamp(clientState));
+                statement.setTimestamp(getTimestamp(now));
 
             // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : statement.getMutations(clientState, variables, local, cl))
+            for (IMutation m : statement.getMutations(clientState, variables, local, cl, now))
             {
                 if (m instanceof CounterMutation && type != Type.COUNTER)
                     throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 1015a90..76c4374 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -69,7 +69,7 @@ public class DeleteStatement extends ModificationStatement
             cl.validateForWrite(cfDef.cfm.ksName);
     }
 
-    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         // keys
@@ -104,7 +104,7 @@ public class DeleteStatement extends ModificationStatement
         Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
 
         Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
-        UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
+        UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), -1);
 
         for (ByteBuffer key : keys)
             rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, rows == null ? null : rows.get(key)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 426274b..afdff22 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -89,7 +89,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
         validateConsistency(cl);
 
-        Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl);
+        Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl, state.getTimestamp());
 
         // The type should have been set by now or we have a bug
         assert type != null;
@@ -115,14 +115,14 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null))
+        for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null, state.getTimestamp()))
             mutation.apply();
         return null;
     }
 
-    public long getTimestamp(ClientState clientState)
+    public long getTimestamp(long now)
     {
-        return timestamp == null ? clientState.getTimestamp() : timestamp;
+        return timestamp == null ? now : timestamp;
     }
 
     public void setTimestamp(long timestamp)
@@ -202,11 +202,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
      * @param clientState current client status
      * @param variables value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
+     * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
+     * @param now the current timestamp in microseconds to use if no timestamp is user provided.
      *
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException;
 
     public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 2a99b3f..8c19b10 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -105,7 +105,7 @@ public class UpdateStatement extends ModificationStatement
     }
 
     /** {@inheritDoc} */
-    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
+    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
@@ -135,7 +135,7 @@ public class UpdateStatement extends ModificationStatement
         Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
 
         Collection<IMutation> mutations = new LinkedList<IMutation>();
-        UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive());
+        UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), getTimeToLive());
 
         for (ByteBuffer key: keys)
             mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key), cl));
@@ -449,7 +449,7 @@ public class UpdateStatement extends ModificationStatement
                              cfName,
                              whereClause,
                              columns,
-                             isSetTimestamp() ? getTimestamp(null) : "<now>",
+                             isSetTimestamp() ? getTimestamp(-1) : "<now>",
                              getTimeToLive());
     }
 }