You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/08/24 17:17:06 UTC
[cassandra] branch cassandra-3.0 updated: Fix secondary indexes on
primary key columns skipping some writes
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new c76ff1b Fix secondary indexes on primary key columns skipping some writes
c76ff1b is described below
commit c76ff1ba14487d521c49d4b830b2d718d170b2e1
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Tue Aug 24 17:57:53 2021 +0100
Fix secondary indexes on primary key columns skipping some writes
patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-16868
---
CHANGES.txt | 1 +
.../cassandra/index/internal/CassandraIndex.java | 5 +-
.../validation/entities/SecondaryIndexTest.java | 140 ++++++++++++++++++++-
3 files changed, 139 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a8288f..9089836 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Fix secondary indexes on primary key columns skipping some writes (CASSANDRA-16868)
* Fix incorrect error message in LegacyLayout (CASSANDRA-15136)
* Use JMX to validate nodetool --jobs parameter (CASSANDRA-16104)
* Handle properly UnsatisfiedLinkError in NativeLibrary#getProcessID() (CASSANDRA-16578)
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index ad5dd4b..de3a974 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -436,7 +436,7 @@ public abstract class CassandraIndex implements Index
if (isPrimaryKeyIndex())
indexPrimaryKey(newRow.clustering(),
- newRow.primaryKeyLivenessInfo(),
+ getPrimaryKeyIndexLiveness(newRow),
newRow.deletion());
if (indexedColumn.isComplex())
@@ -514,10 +514,7 @@ public abstract class CassandraIndex implements Index
if (cell.isLive(nowInSec))
{
if (cellTimestamp > timestamp)
- {
timestamp = cellTimestamp;
- ttl = cell.ttl();
- }
}
}
return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index c2640a0..201571e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -1235,39 +1235,144 @@ public class SecondaryIndexTest extends CQLTester
@Test
public void testIndexOnPartitionKeyInsertExpiringColumn() throws Throwable
{
+ testIndexOnPartitionKeyInsertExpiringColumn(false);
+ }
+
+ @Test
+ public void testIndexOnPartitionKeyInsertExpiringColumnWithFlush() throws Throwable
+ {
+ testIndexOnPartitionKeyInsertExpiringColumn(true);
+ }
+
+ private void testIndexOnPartitionKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+ {
createTable("CREATE TABLE %s (k1 int, k2 int, a int, b int, PRIMARY KEY ((k1, k2)))");
- createIndex("CREATE INDEX on %s(k1)");
+ createIndex("CREATE INDEX ON %s(k1)");
execute("INSERT INTO %s (k1, k2, a, b) VALUES (1, 2, 3, 4)");
assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4));
+
+ if (flushBeforeUpdate)
+ flush();
+
execute("UPDATE %s USING TTL 1 SET b = 10 WHERE k1 = 1 AND k2 = 2");
Thread.sleep(1000);
assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, null));
}
@Test
+ public void testIndexOnPartitionKeyOverridingExpiredRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2)))");
+ createIndex("CREATE INDEX ON %s(k1)");
+
+ execute("UPDATE %s USING TTL 1 SET v = 3 WHERE k1 = 1 AND k2 = 2");
+ Thread.sleep(1000);
+
+ assertEmpty(execute("SELECT * FROM %s WHERE k1 = 1"));
+
+ execute("UPDATE %s SET v = 3 WHERE k1 = 1 AND k2 = 2");
+ assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3));
+ }
+
+ @Test
+ public void testIndexOnPartitionKeyOverridingDeletedRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k1 int, k2 int, c int, v int, PRIMARY KEY ((k1, k2), c))");
+ createIndex("CREATE INDEX ON %s(k1)");
+
+ execute("INSERT INTO %s(k1, k2, c, v) VALUES (1, 2, 3, 4)");
+ execute("DELETE FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3");
+ execute("UPDATE %s SET v = 4 WHERE k1 = 1 AND k2 = 2 AND c = 3");
+
+ assertRows(execute("SELECT * FROM %s WHERE k1 = 1 AND k2 = 2 AND c = 3"), row(1, 2, 3, 4));
+ assertRows(execute("SELECT * FROM %s WHERE k1 = 1"), row(1, 2, 3, 4));
+ }
+
+ @Test
public void testIndexOnClusteringKeyInsertExpiringColumn() throws Throwable
{
+ testIndexOnClusteringKeyInsertExpiringColumn(false);
+ }
+
+ @Test
+ public void testIndexOnClusteringKeyInsertExpiringColumnWithFlush() throws Throwable
+ {
+ testIndexOnClusteringKeyInsertExpiringColumn(true);
+ }
+
+ private void testIndexOnClusteringKeyInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+ {
createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
- createIndex("CREATE INDEX on %s(ck)");
+ createIndex("CREATE INDEX ON %s(ck)");
execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)");
assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, 4));
+
+ if (flushBeforeUpdate)
+ flush();
+
execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2");
Thread.sleep(1000);
assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3, null));
}
@Test
+ public void testIndexOnClusteringKeyOverridingExpiredRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ createIndex("CREATE INDEX ON %s(ck)");
+
+ execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2");
+ Thread.sleep(1000);
+
+ assertEmpty(execute("SELECT * FROM %s WHERE ck = 2"));
+
+ execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+ assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3));
+ }
+
+ @Test
+ public void testIndexOnClusteringKeyOverridingDeletedRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ createIndex("CREATE INDEX ON %s(ck)");
+
+ execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)");
+ execute("DELETE FROM %s WHERE pk = 1 AND ck = 2");
+ execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 2"), row(1, 2, 3));
+ assertRows(execute("SELECT * FROM %s WHERE ck = 2"), row(1, 2, 3));
+ }
+
+ @Test
public void testIndexOnRegularColumnInsertExpiringColumn() throws Throwable
{
+ testIndexOnRegularColumnInsertExpiringColumn(false);
+ }
+
+ @Test
+ public void testIndexOnRegularColumnInsertExpiringColumnWithFlush() throws Throwable
+ {
+ testIndexOnRegularColumnInsertExpiringColumn(true);
+ }
+
+ private void testIndexOnRegularColumnInsertExpiringColumn(boolean flushBeforeUpdate) throws Throwable
+ {
createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
- createIndex("CREATE INDEX on %s(a)");
+ createIndex("CREATE INDEX ON %s(a)");
execute("INSERT INTO %s (pk, ck, a, b) VALUES (1, 2, 3, 4)");
assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, 4));
+ if (flushBeforeUpdate)
+ flush();
+
execute("UPDATE %s USING TTL 1 SET b = 10 WHERE pk = 1 AND ck = 2");
Thread.sleep(1000);
assertRows(execute("SELECT * FROM %s WHERE a = 3"), row(1, 2, 3, null));
+ if (flushBeforeUpdate)
+ flush();
+
execute("UPDATE %s USING TTL 1 SET a = 5 WHERE pk = 1 AND ck = 2");
Thread.sleep(1000);
assertEmpty(execute("SELECT * FROM %s WHERE a = 3"));
@@ -1275,6 +1380,35 @@ public class SecondaryIndexTest extends CQLTester
}
@Test
+ public void testIndexOnRegularColumnOverridingExpiredRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ createIndex("CREATE INDEX ON %s(v)");
+
+ execute("UPDATE %s USING TTL 1 SET v = 3 WHERE pk = 1 AND ck = 2");
+ Thread.sleep(1000);
+
+ assertEmpty(execute("SELECT * FROM %s WHERE v = 3"));
+
+ execute("UPDATE %s SET v = 3 WHERE pk = 1 AND ck = 2");
+ assertRows(execute("SELECT * FROM %s WHERE v = 3"), row(1, 2, 3));
+ }
+
+ @Test
+ public void testIndexOnRegularColumnOverridingDeletedRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ createIndex("CREATE INDEX ON %s(v)");
+
+ execute("INSERT INTO %s(pk, ck, v) VALUES (1, 2, 3)");
+ execute("DELETE FROM %s WHERE pk=1 AND ck=2");
+ execute("UPDATE %s SET v=3 WHERE pk=1 AND ck=2");
+
+ assertRows(execute("SELECT * FROM %s WHERE pk=1 AND ck=2"), row(1, 2, 3));
+ assertRows(execute("SELECT * FROM %s WHERE v=3"), row(1, 2, 3));
+ }
+
+ @Test
public void testIndicesOnCompactTable() throws Throwable
{
assertInvalidMessage("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: v1, v2)",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org