You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/08/24 17:17:06 UTC

[cassandra] branch cassandra-3.0 updated: Fix secondary indexes on primary key columns skipping some writes

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new c76ff1b  Fix secondary indexes on primary key columns skipping some writes
c76ff1b is described below

commit c76ff1ba14487d521c49d4b830b2d718d170b2e1
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Tue Aug 24 17:57:53 2021 +0100

    Fix secondary indexes on primary key columns skipping some writes
    
    patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-16868
---
 CHANGES.txt                                        |   1 +
 .../cassandra/index/internal/CassandraIndex.java   |   5 +-
 .../validation/entities/SecondaryIndexTest.java    | 140 ++++++++++++++++++++-
 3 files changed, 139 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4a8288f..9089836 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868)
  * Fix incorrect error message in LegacyLayout (CASSANDRA-15136)
  * Use JMX to validate nodetool --jobs parameter (CASSANDRA-16104)
  * Handle properly UnsatisfiedLinkError in NativeLibrary#getProcessID() (CASSANDRA-16578)
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index ad5dd4b..de3a974 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -436,7 +436,7 @@ public abstract class CassandraIndex implements Index
 
                 if (isPrimaryKeyIndex())
                     indexPrimaryKey(newRow.clustering(),
-                                    newRow.primaryKeyLivenessInfo(),
+                                    getPrimaryKeyIndexLiveness(newRow),
                                     newRow.deletion());
 
                 if (indexedColumn.isComplex())
@@ -514,10 +514,7 @@ public abstract class CassandraIndex implements Index
                     if (cell.isLive(nowInSec))
                     {
                         if (cellTimestamp > timestamp)
-                        {
                             timestamp = cellTimestamp;
-                            ttl = cell.ttl();
-                        }
                     }
                 }
                 return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index c2640a0..201571e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -1235,39 +1235,144 @@ public class SecondaryIndexTest extends CQLTester
     @Test
     public void testIndexOnPartitionKeyInsertExpiringColumn() throws Throwable
     {
+        testIndexOnPartitionKeyInsertExpiringColumn(false);
+    }
+
+    @Test
+    public void testIndexOnPartitionKeyInsertExpiringColumnWithFlush() throws Throwable
+    {
+        testIndexOnPartitionKeyInsertExpiringColumn(true);
+    }
+
+    private void testIndexOnPartitionKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+    {
         createTable("CREATE TABLE %s (k1 int, k2 int, a int, b int, PRIMARY KEY ((k1, k2)))");
-        createIndex("CREATE INDEX on %s(k1)");
+        createIndex("CREATE INDEX ON %s(k1)");
         execute("INSERT INTO %s (k1, k2, a, b) VALUES (1, 2, 3, 4)");
         assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4));
+
+        if (flushBeforeUpdate)
+            flush();
+
         execute("UPDATE %s USING TTL 1 SET b = 10 WHERE k1 = 1 AND k2 = 2");
         Thread.sleep(1000);
         assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, null));
     }
 
     @Test
+    public void testIndexOnPartitionKeyOverridingExpiredRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2)))");
+        createIndex("CREATE INDEX ON %s(k1)");
+
+        execute("UPDATE %s USING TTL 1 SET v = 3 WHERE k1 = 1 AND k2 = 2");
+        Thread.sleep(1000);
+
+        assertEmpty(execute("SELECT * FROM %s WHERE k1 = 1"));
+
+        execute("UPDATE %s SET v = 3 WHERE k1 = 1 AND k2 = 2");
+        assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3));
+    }
+
+    @Test
+    public void testIndexOnPartitionKeyOverridingDeletedRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 int, k2 int, c int, v int, PRIMARY KEY ((k1, k2), c))");
+        createIndex("CREATE INDEX ON %s(k1)");
+
+        execute("INSERT INTO %s(k1, k2, c, v) VALUES (1, 2, 3, 4)");
+        execute("DELETE FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3");
+        execute("UPDATE %s SET v = 4 WHERE k1 = 1 AND k2 = 2 AND c = 3");
+
+        assertRows(execute("SELECT * FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3"), row(1, 2, 3, 4));
+        assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4));
+    }
+
+    @Test
     public void testIndexOnClusteringKeyInsertExpiringColumn() throws Throwable
     {
+        testIndexOnClusteringKeyInsertExpiringColumn(false);
+    }
+
+    @Test
+    public void testIndexOnClusteringKeyInsertExpiringColumnWithFlush() throws Throwable
+    {
+        testIndexOnClusteringKeyInsertExpiringColumn(true);
+    }
+
+    private void testIndexOnClusteringKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+    {
         createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
-        createIndex("CREATE INDEX on %s(ck)");
+        createIndex("CREATE INDEX ON %s(ck)");
         execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)");
         assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, 4));
+
+        if (flushBeforeUpdate)
+            flush();
+
         execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2");
         Thread.sleep(1000);
         assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, null));
     }
 
     @Test
+    public void testIndexOnClusteringKeyOverridingExpiredRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+        createIndex("CREATE INDEX ON %s(ck)");
+
+        execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2");
+        Thread.sleep(1000);
+
+        assertEmpty(execute("SELECT * FROM %s WHERE ck = 2"));
+
+        execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+        assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3));
+    }
+
+    @Test
+    public void testIndexOnClusteringKeyOverridingDeletedRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+        createIndex("CREATE INDEX ON %s(ck)");
+
+        execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)");
+        execute("DELETE FROM %s WHERE pk = 1 AND ck = 2");
+        execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+
+        assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 2"), row(1, 2, 3));
+        assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3));
+    }
+
+    @Test
     public void testIndexOnRegularColumnInsertExpiringColumn() throws Throwable
     {
+        testIndexOnRegularColumnInsertExpiringColumn(false);
+    }
+
+    @Test
+    public void testIndexOnRegularColumnInsertExpiringColumnWithFlush() throws Throwable
+    {
+        testIndexOnRegularColumnInsertExpiringColumn(true);
+    }
+
+    private void testIndexOnRegularColumnInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+    {
         createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
-        createIndex("CREATE INDEX on %s(a)");
+        createIndex("CREATE INDEX ON %s(a)");
         execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)");
         assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, 4));
 
+        if (flushBeforeUpdate)
+            flush();
+
         execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2");
         Thread.sleep(1000);
         assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, null));
 
+        if (flushBeforeUpdate)
+            flush();
+
         execute("UPDATE %s USING TTL 1 SET a = 5 WHERE pk = 1 AND ck = 2");
         Thread.sleep(1000);
         assertEmpty(execute("SELECT * FROM %s WHERE a = 3"));
@@ -1275,6 +1380,35 @@ public class SecondaryIndexTest extends CQLTester
     }
 
     @Test
+    public void testIndexOnRegularColumnOverridingExpiredRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+        createIndex("CREATE INDEX ON %s(v)");
+
+        execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2");
+        Thread.sleep(1000);
+
+        assertEmpty(execute("SELECT * FROM %s WHERE v = 3"));
+
+        execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+        assertRows(execute("SELECT * FROM %s WHERE v = 3"), row(1, 2, 3));
+    }
+
+    @Test
+    public void testIndexOnRegularColumnOverridingDeletedRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+        createIndex("CREATE INDEX ON %s(v)");
+
+        execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)");
+        execute("DELETE FROM %s WHERE pk=1 AND ck=2");
+        execute("UPDATE %s SET v=3 WHERE pk=1 AND ck=2");
+
+        assertRows(execute("SELECT * FROM %s WHERE pk=1 AND ck=2"), row(1, 2, 3));
+        assertRows(execute("SELECT * FROM %s WHERE v=3"), row(1, 2, 3));
+    }
+
+    @Test
     public void testIndicesOnCompactTable() throws Throwable
     {
         assertInvalidMessage("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: v1, v2)",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org