You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2017/03/10 09:17:06 UTC
[02/10] cassandra git commit: Fix queries updating multiple time the
same list
Fix queries updating multiple time the same list
patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-13130
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ef8a8b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ef8a8b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ef8a8b4
Branch: refs/heads/cassandra-3.0
Commit: 5ef8a8b408d4c492f7f2ffbbbe6fce237140c7cb
Parents: e4be2d0
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Mar 10 09:57:20 2017 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Mar 10 09:57:20 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/cql3/Lists.java | 10 +-
.../apache/cassandra/cql3/UpdateParameters.java | 31 +++++-
.../validation/entities/CollectionsTest.java | 100 +++++++++++++++++++
4 files changed, 135 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0982de9..09e4039 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index da8c48a..cc75476 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -21,15 +21,18 @@ import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompositesBuilder;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -349,7 +352,7 @@ public abstract class Lists
if (index == ByteBufferUtil.UNSET_BYTE_BUFFER)
throw new InvalidRequestException("Invalid unset value for list index");
- List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+ List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
int idx = ByteBufferUtil.toInt(index);
if (existingList == null || existingList.size() == 0)
throw new InvalidRequestException("Attempted to set an element on a list which is null");
@@ -458,7 +461,7 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
assert column.type.isMultiCell() : "Attempted to delete from a frozen list";
- List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+ List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
// We want to call bind before possibly returning to reject queries where the value provided is not a list.
Term.Terminal value = t.bind(params.options);
@@ -505,7 +508,8 @@ public abstract class Lists
if (index == Constants.UNSET_VALUE)
return;
- List<Cell> existingList = params.getPrefetchedList(rowKey, column.name);
+ List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf);
+
int idx = ByteBufferUtil.toInt(index.get(params.options.getProtocolVersion()));
if (existingList == null || existingList.size() == 0)
throw new InvalidRequestException("Attempted to delete an element from a list which is null");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index e412585..65edef7 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -91,16 +91,39 @@ public class UpdateParameters
return new RangeTombstone(slice.start, slice.finish, timestamp - 1, localDeletionTime);
}
- public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName)
+ /**
+ * Returns the prefetched list with the already performed modifications.
+ * <p>If no modification have yet been performed this method will return the fetched list.
+ * If some modifications (updates or deletions) have already been done the list returned
+ * will be the result of the merge of the fetched list and of the pending mutations.</p>
+ *
+ * @param rowKey the row key
+ * @param cql3ColumnName the column name
+ * @param cf the pending modifications
+ * @return the prefetched list with the already performed modifications
+ */
+ public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName, ColumnFamily cf)
{
if (prefetchedLists == null)
return Collections.emptyList();
CQL3Row row = prefetchedLists.get(rowKey);
- if (row == null)
- return Collections.<Cell>emptyList();
- List<Cell> cql3List = row.getMultiCellColumn(cql3ColumnName);
+ List<Cell> cql3List = row == null ? Collections.<Cell>emptyList() : row.getMultiCellColumn(cql3ColumnName);
+
+ if (!cf.isEmpty())
+ {
+ ColumnFamily currentCf = cf.cloneMe();
+
+ for (Cell c : cql3List)
+ currentCf.addColumn(c);
+
+ CFMetaData cfm = currentCf.metadata();
+ CQL3Row.RowIterator iterator = cfm.comparator.CQL3RowBuilder(cfm, timestamp).group(currentCf.iterator());
+ // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+ cql3List = iterator.hasNext() ? iterator.next().getMultiCellColumn(cql3ColumnName) : null;
+ }
+
return (cql3List == null) ? Collections.<Cell>emptyList() : cql3List;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
index 115b755..99d9695 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java
@@ -648,4 +648,104 @@ public class CollectionsTest extends CQLTester
assertInvalidMessage("The data cannot be deserialized as a map",
"INSERT INTO %s (pk, m) VALUES (?, ?)", 1, -1);
}
+
+ @Test
+ public void testMultipleOperationOnListWithinTheSameQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, l list<int>)");
+ execute("INSERT INTO %s (pk, l) VALUES (1, [1, 2, 3, 4])");
+
+ // Checks that when the same element is updated twice the update with the greatest value is the one taken into account
+ execute("UPDATE %s SET l[?] = ?, l[?] = ? WHERE pk = ?", 2, 7, 2, 8, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 8, 4)));
+
+ execute("UPDATE %s SET l[?] = ?, l[?] = ? WHERE pk = ?", 2, 9, 2, 6, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 9, 4)));
+
+ // Checks that deleting twice the same element will result in the deletion of the element with the index
+ // and of the following element.
+ execute("DELETE l[?], l[?] FROM %s WHERE pk = ?", 2, 2, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2)));
+
+ // Checks that the set operation is performed on the added elements and that the greatest value win
+ execute("UPDATE %s SET l = l + ?, l[?] = ? WHERE pk = ?", list(3, 4), 3, 7, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7)));
+
+ execute("UPDATE %s SET l = l + ?, l[?] = ? WHERE pk = ?", list(6, 8), 4, 5, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7, 6, 8)));
+
+ // Checks that the order of the operations matters
+ assertInvalidMessage("List index 6 out of bound, list has size 6",
+ "UPDATE %s SET l[?] = ?, l = l + ? WHERE pk = ?", 6, 5, list(9), 1);
+
+ // Checks that the updated element is deleted.
+ execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(8), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 7, 6)));
+
+ // Checks that we cannot update an element that has been removed.
+ assertInvalidMessage("List index 3 out of bound, list has size 3",
+ "UPDATE %s SET l = l - ?, l[?] = ? WHERE pk = ?", list(6), 3, 4, 1);
+
+ // Checks that the element is updated before the other ones are shifted.
+ execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(1), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(2, 8, 6)));
+
+ // Checks that the element are shifted before the element is updated.
+ execute("UPDATE %s SET l = l - ?, l[?] = ? WHERE pk = ?", list(2, 6), 0, 9, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(9)));
+ }
+
+ @Test
+ public void testMultipleOperationOnMapWithinTheSameQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, m map<int, int>)");
+ execute("INSERT INTO %s (pk, m) VALUES (1, {0 : 1, 1 : 2, 2 : 3, 3 : 4})");
+
+ // Checks that when the same element is updated twice the update with the greatest value is the one taken into account
+ execute("UPDATE %s SET m[?] = ?, m[?] = ? WHERE pk = ?", 2, 7, 2, 8, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 8, 3, 4)));
+
+ execute("UPDATE %s SET m[?] = ?, m[?] = ? WHERE pk = ?", 2, 9, 2, 6, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 9, 3, 4)));
+
+ // Checks that deleting twice the same element has no side effect
+ execute("DELETE m[?], m[?] FROM %s WHERE pk = ?", 2, 2, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+ // Checks that the set operation is performed on the added elements and that the greatest value win
+ execute("UPDATE %s SET m = m + ?, m[?] = ? WHERE pk = ?", map(4, 5), 4, 7, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 7)));
+
+ execute("UPDATE %s SET m = m + ?, m[?] = ? WHERE pk = ?", map(4, 8), 4, 6, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 8)));
+
+ // Checks that, as tombstones win over updates for the same timestamp, the removed element is not readded
+ execute("UPDATE %s SET m = m - ?, m[?] = ? WHERE pk = ?", set(4), 4, 9, 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+ // Checks that the update is taken into account before the removal
+ execute("UPDATE %s SET m[?] = ?, m = m - ? WHERE pk = ?", 5, 9, set(5), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4)));
+
+ // Checks that the set operation is merged with the change of the append and that the greatest value win
+ execute("UPDATE %s SET m[?] = ?, m = m + ? WHERE pk = ?", 5, 9, map(5, 8, 6, 9), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9)));
+
+ execute("UPDATE %s SET m[?] = ?, m = m + ? WHERE pk = ?", 7, 1, map(7, 2), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9, 7, 2)));
+ }
+
+ @Test
+ public void testMultipleOperationOnSetWithinTheSameQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, s set<int>)");
+ execute("INSERT INTO %s (pk, s) VALUES (1, {0, 1, 2})");
+
+ // Checks that the two operation are merged and that the tombstone always win
+ execute("UPDATE %s SET s = s + ? , s = s - ? WHERE pk = ?", set(3, 4), set(3), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4)));
+
+ execute("UPDATE %s SET s = s - ? , s = s + ? WHERE pk = ?", set(3), set(3, 4), 1);
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4)));
+ }
}