You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2017/03/10 09:17:06 UTC

[02/10] cassandra git commit: Fix queries updating multiple time the same list

Fix queries updating multiple time the same list

patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-13130


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ef8a8b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ef8a8b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ef8a8b4

Branch: refs/heads/cassandra-3.0
Commit: 5ef8a8b408d4c492f7f2ffbbbe6fce237140c7cb
Parents: e4be2d0
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Mar 10 09:57:20 2017 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Mar 10 09:57:20 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/cql3/Lists.java   |  10 +-
 .../apache/cassandra/cql3/UpdateParameters.java |  31 +++++-
 .../validation/entities/CollectionsTest.java    | 100 +++++++++++++++++++
 4 files changed, 135 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0982de9..09e4039 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
  * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
  * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index da8c48a..cc75476 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -21,15 +21,18 @@ import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompositesBuilder;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -349,7 +352,7 @@ public abstract class Lists
             if (index == ByteBufferUtil.UNSET_BYTE_BUFFER)
                 throw new InvalidRequestException("Invalid unset value for list index");
 
-            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
             int idx = ByteBufferUtil.toInt(index);
             if (existingList == null || existingList.size() == 0)
                 throw new InvalidRequestException("Attempted to set an element on a list which is null");
@@ -458,7 +461,7 @@ public abstract class Lists
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
             assert column.type.isMultiCell() : "Attempted to delete from a frozen list";
-            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
             // We want to call bind before possibly returning to reject queries where the value provided is not a list.
             Term.Terminal value = t.bind(params.options);
 
@@ -505,7 +508,8 @@ public abstract class Lists
             if (index == Constants.UNSET_VALUE)
                 return;
 
-            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+            List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
+
             int idx = ByteBufferUtil.toInt(index.get(params.options.getProtocolVersion()));
             if (existingList == null || existingList.size() == 0)
                 throw new InvalidRequestException("Attempted to delete an element from a list which is null");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index e412585..65edef7 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -91,16 +91,39 @@ public class UpdateParameters
         return new RangeTombstone(slice.start, slice.finish, timestamp - 1, localDeletionTime);
     }
 
-    public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName)
+    /**
+     * Returns the prefetched list with the already performed modifications.
+     * <p>If no modification have yet been performed this method will return the fetched list.
+     * If some modifications (updates or deletions) have already been done the list returned
+     * will be the result of the merge of the fetched list and of the pending mutations.</p>
+     *
+     * @param rowKey the row key
+     * @param cql3ColumnName the column name
+     * @param cf the pending modifications
+     * @return the prefetched list with the already performed modifications
+     */
+    public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName, ColumnFamily cf)
     {
         if (prefetchedLists == null)
             return Collections.emptyList();
 
         CQL3Row row = prefetchedLists.get(rowKey);
-        if (row == null)
-            return Collections.<Cell>emptyList();
 
-        List<Cell> cql3List = row.getMultiCellColumn(cql3ColumnName);
+        List<Cell> cql3List = row == null ? Collections.<Cell>emptyList() : row.getMultiCellColumn(cql3ColumnName);
+
+        if (!cf.isEmpty())
+        {
+            ColumnFamily currentCf = cf.cloneMe();
+
+            for (Cell c : cql3List)
+                currentCf.addColumn(c);
+
+            CFMetaData cfm = currentCf.metadata();
+            CQL3Row.RowIterator iterator = cfm.comparator.CQL3RowBuilder(cfm, timestamp).group(currentCf.iterator());
+            // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+            cql3List = iterator.hasNext() ? iterator.next().getMultiCellColumn(cql3ColumnName) : null;
+        }
+
         return (cql3List == null) ? Collections.<Cell>emptyList() : cql3List;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
index 115b755..99d9695 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
@@ -648,4 +648,104 @@ public class CollectionsTest extends CQLTester
         assertInvalidMessage("The data cannot be deserialized as a map",
                              "INSERT INTO %s (pk, m) VALUES (?, ?)", 1, -1);
     }
+
+    @Test
+    public void testMultipleOperationOnListWithinTheSameQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, l list<int>)");
+        execute("INSERT INTO %s (pk, l) VALUES (1, [1, 2, 3, 4])");
+
+        // Checks that when the same element is updated twice the update with the greatest value is the one taken into account
+        execute("UPDATE %s SET l[?] = ?, l[?] = ?  WHERE pk = ?", 2, 7, 2, 8, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 8, 4)));
+
+        execute("UPDATE %s SET l[?] = ?, l[?] = ?  WHERE pk = ?", 2, 9, 2, 6, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 9, 4)));
+
+        // Checks that deleting twice the same element will result in the deletion of the element with the index
+        // and of the following element.
+        execute("DELETE l[?], l[?] FROM %s WHERE pk = ?", 2, 2, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2)));
+
+        // Checks that the set operation is performed on the added elements and that the greatest value win
+        execute("UPDATE %s SET l = l + ?, l[?] = ?  WHERE pk = ?", list(3, 4), 3, 7, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7)));
+
+        execute("UPDATE %s SET l = l + ?, l[?] = ?  WHERE pk = ?", list(6, 8), 4, 5, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7, 6, 8)));
+
+        // Checks that the order of the operations matters
+        assertInvalidMessage("List index 6 out of bound, list has size 6",
+                             "UPDATE %s SET l[?] = ?, l = l + ? WHERE pk = ?", 6, 5, list(9), 1);
+
+        // Checks that the updated element is deleted.
+        execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(8), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 7, 6)));
+
+        // Checks that we cannot update an element that has been removed.
+        assertInvalidMessage("List index 3 out of bound, list has size 3",
+                             "UPDATE %s SET l = l - ?, l[?] = ?  WHERE pk = ?", list(6), 3, 4, 1);
+
+        // Checks that the element is updated before the other ones are shifted.
+        execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(1), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(2, 8, 6)));
+
+        // Checks that the element are shifted before the element is updated.
+        execute("UPDATE %s SET l = l - ?, l[?] = ?  WHERE pk = ?", list(2, 6), 0, 9, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(9)));
+    }
+
+    @Test
+    public void testMultipleOperationOnMapWithinTheSameQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, m map<int, int>)");
+        execute("INSERT INTO %s (pk, m) VALUES (1, {0 : 1, 1 : 2, 2 : 3, 3 : 4})");
+
+        // Checks that when the same element is updated twice the update with the greatest value is the one taken into account
+        execute("UPDATE %s SET m[?] = ?, m[?] = ?  WHERE pk = ?", 2, 7, 2, 8, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 8, 3, 4)));
+
+        execute("UPDATE %s SET m[?] = ?, m[?] = ?  WHERE pk = ?", 2, 9, 2, 6, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 9, 3, 4)));
+
+        // Checks that deleting twice the same element has no side effect
+        execute("DELETE m[?], m[?] FROM %s WHERE pk = ?", 2, 2, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+        // Checks that the set operation is performed on the added elements and that the greatest value win
+        execute("UPDATE %s SET m = m + ?, m[?] = ?  WHERE pk = ?", map(4, 5), 4, 7, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 7)));
+
+        execute("UPDATE %s SET m = m + ?, m[?] = ?  WHERE pk = ?", map(4, 8), 4, 6, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 8)));
+
+        // Checks that, as tombstones win over updates for the same timestamp, the removed element is not readded
+        execute("UPDATE %s SET m = m - ?, m[?] = ?  WHERE pk = ?", set(4), 4, 9, 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+        // Checks that the update is taken into account before the removal
+        execute("UPDATE %s SET m[?] = ?,  m = m - ?  WHERE pk = ?", 5, 9, set(5), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+        // Checks that the set operation is merged with the change of the append and that the greatest value win
+        execute("UPDATE %s SET m[?] = ?, m = m + ?  WHERE pk = ?", 5, 9, map(5, 8, 6, 9), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9)));
+
+        execute("UPDATE %s SET m[?] = ?, m = m + ?  WHERE pk = ?", 7, 1, map(7, 2), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9, 7, 2)));
+    }
+
+    @Test
+    public void testMultipleOperationOnSetWithinTheSameQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, s set<int>)");
+        execute("INSERT INTO %s (pk, s) VALUES (1, {0, 1, 2})");
+
+        // Checks that the two operation are merged and that the tombstone always win
+        execute("UPDATE %s SET s = s + ? , s = s - ?  WHERE pk = ?", set(3, 4), set(3), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4)));
+
+        execute("UPDATE %s SET s = s - ? , s = s + ?  WHERE pk = ?", set(3), set(3, 4), 1);
+        assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4)));
+    }
 }