You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/06/05 12:21:08 UTC
[cassandra-dtest] branch master updated: Add tests for
CASSANDRA-8272 and CASSANDRA-8273
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git
The following commit(s) were added to refs/heads/master by this push:
new 68f05b0 Add tests for CASSANDRA-8272 and CASSANDRA-8273
68f05b0 is described below
commit 68f05b02842ccf4b2859d35a057d3be77d3313ab
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Tue Mar 17 20:11:35 2020 +0000
Add tests for CASSANDRA-8272 and CASSANDRA-8273
---
replica_side_filtering_test.py | 558 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 558 insertions(+)
diff --git a/replica_side_filtering_test.py b/replica_side_filtering_test.py
new file mode 100644
index 0000000..c38fcfc
--- /dev/null
+++ b/replica_side_filtering_test.py
@@ -0,0 +1,558 @@
+from abc import abstractmethod
+
+import pytest
+from cassandra import ConsistencyLevel as CL
+from cassandra.query import SimpleStatement
+
+from dtest import Tester, create_ks
+from tools.assertions import (assert_all, assert_none, assert_one)
+
+since = pytest.mark.since
+keyspace = 'ks'
+
+class ReplicaSideFiltering(Tester):
+ """
+ @jira_ticket CASSANDRA-8272, CASSANDRA-8273
+ Base consistency test for queries involving replica-side filtering when some of the replicas have stale data.
+ """
+ __test__ = False
+
+ def _prepare_cluster(self, create_table, create_index=None, both_nodes=None, only_node1=None, only_node2=None):
+ """
+ :param create_table a table creation CQL query
+ :param create_index an index creation CQL query, that will be executed depending on ``_create_index`` method
+ :param both_nodes queries to be executed in both nodes with CL=ALL
+ :param only_node1 queries to be executed in the first node only, with CL=ONE, while the second node is stopped
+ :param only_node2 queries to be executed in the second node only, with CL=ONE, while the first node is stopped
+ :return: a session connected exclusively to the first node with CL=ALL
+ """
+ cluster = self.cluster
+
+ # Disable hinted handoff and set batch commit log so this doesn't interfere with the test
+ if only_node1 or only_node2:
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
+ cluster.set_batch_commitlog(enabled=True)
+
+ cluster.populate(2)
+ node1, node2 = cluster.nodelist()
+ cluster.start()
+
+ session = self.patient_exclusive_cql_connection(node1, consistency_level=CL.ALL)
+ create_ks(session, keyspace, 2)
+ session.execute("USE " + keyspace)
+
+ # create the table
+ session.execute(create_table)
+
+ # create the index if it's required
+ if self.create_index():
+ session.execute(create_index)
+
+ # execute the queries for both nodes with CL=ALL
+ if both_nodes:
+ for q in both_nodes:
+ session.execute(q)
+
+ # execute the queries for the first node only with the second node stopped
+ if only_node1:
+ self._execute_isolated(node_to_update=node1, node_to_stop=node2, queries=only_node1)
+
+ # execute the queries for the second node only with the first node stopped
+ if only_node2:
+ self._execute_isolated(node_to_update=node2, node_to_stop=node1, queries=only_node2)
+
+ # set the session with CL=ALL for testing queries with the created scenario
+ self.session = self.patient_exclusive_cql_connection(node1, keyspace=keyspace, consistency_level=CL.ALL)
+
+ def _execute_isolated(self, node_to_update, node_to_stop, queries):
+ node_to_stop.flush()
+ node_to_stop.stop()
+ session = self.patient_cql_connection(node_to_update, keyspace, consistency_level=CL.ONE)
+ for q in queries:
+ session.execute(q)
+ node_to_stop.start(wait_other_notice=True)
+
+ def _assert_none(self, query):
+ """
+ Assert query returns nothing.
+ @param query Query to run
+ """
+ decorated_query = self._decorate_query(query)
+ assert_none(self.session, decorated_query)
+
+ def _assert_one(self, query, row):
+ """
+ Assert query returns one row.
+ @param query Query to run
+ @param row Expected result row from query
+ """
+ decorated_query = self._decorate_query(query)
+ assert_one(self.session, decorated_query, row)
+
+ def _assert_all(self, query, rows):
+ """
+ Assert query returns all expected rows in the correct order.
+ @param query Query to run
+ @param rows Expected result rows from query
+ """
+ decorated_query = self._decorate_query(query)
+ assert_all(self.session, decorated_query, rows)
+
+ def _decorate_query(self, query):
+ return query if self.create_index() else query + " ALLOW FILTERING"
+
+ def _skip_if_index_on_static_is_not_supported(self):
+ if self.create_index() and self.cluster.version() < '3.4':
+ pytest.skip('Secondary indexes on static column are not supported before 3.4 (CASSANDRA-8103)')
+
+ def _skip_if_filtering_partition_columns_is_not_supported(self):
+ if not self.create_index() and self.cluster.version() < '3.11':
+ pytest.skip('Filtering of partition key parts is not supported before 3.11 (CASSANDRA-13275)')
+
+ @abstractmethod
+ def create_index(self):
+ """
+ :return:``True`` if the tests should create an index, ``False`` if they should use ``ALLOW FILTERING`` instead
+ """
+ pass
+
+ def test_update_on_skinny_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v text)",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, v) VALUES (0, 'old')"],
+ only_node1=["UPDATE t SET v = 'new' WHERE k = 0"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+ self._assert_one("SELECT * FROM t WHERE v = 'new'", row=[0, 'new'])
+
+ def test_update_on_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, s int STATIC, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, s) VALUES (0, 9)",
+ "INSERT INTO t(k, c, v) VALUES (0, -1, 'old')",
+ "INSERT INTO t(k, c, v) VALUES (0, 0, 'old')",
+ "INSERT INTO t(k, c, v) VALUES (0, 1, 'old')"],
+ only_node1=["UPDATE t SET v = 'new' WHERE k = 0 AND c = 0"])
+
+ self._assert_all("SELECT * FROM t WHERE v = 'old'", rows=[[0, -1, 9, 'old'], [0, 1, 9, 'old']])
+ self._assert_all("SELECT * FROM t WHERE v = 'new'", rows=[[0, 0, 9, 'new']])
+
+ def test_update_on_static_column_with_empty_partition(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v int, s text STATIC, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t(k, s) VALUES (0, 'old')",
+ "INSERT INTO t(k, s) VALUES (1, 'old')"],
+ only_node1=["UPDATE t SET s = 'new' WHERE k = 0"])
+
+ self._assert_one("SELECT * FROM t WHERE s = 'old'", row=[1, None, 'old', None])
+ self._assert_one("SELECT * FROM t WHERE s = 'new'", row=[0, None, 'new', None])
+
+ def test_update_on_static_column_with_not_empty_partition(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v int, s text STATIC, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t(k, s) VALUES (0, 'old')",
+ "INSERT INTO t(k, s) VALUES (1, 'old')",
+ "INSERT INTO t(k, c, v) VALUES (0, 10, 100)",
+ "INSERT INTO t(k, c, v) VALUES (0, 20, 200)",
+ "INSERT INTO t(k, c, v) VALUES (1, 30, 300)",
+ "INSERT INTO t(k, c, v) VALUES (1, 40, 400)"],
+ only_node1=["UPDATE t SET s = 'new' WHERE k = 0"])
+
+ self._assert_all("SELECT * FROM t WHERE s = 'old'", rows=[[1, 30, 'old', 300], [1, 40, 'old', 400]])
+ self._assert_all("SELECT * FROM t WHERE s = 'new'", rows=[[0, 10, 'new', 100], [0, 20, 'new', 200]])
+
+ def test_update_on_collection(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v set<int>)",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, v) VALUES (0, {-1, 0, 1})"],
+ only_node1=["UPDATE t SET v = v - {0} WHERE k = 0"])
+
+ self._assert_none("SELECT * FROM t WHERE v CONTAINS 0")
+ self._assert_one("SELECT * FROM t WHERE v CONTAINS 1", row=[0, [-1, 1]])
+
+ def test_complementary_deletion_with_limit_on_partition_key_column_with_empty_partitions(self):
+ self._skip_if_filtering_partition_columns_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k1 int, k2 int, c int, s int STATIC, PRIMARY KEY ((k1, k2), c))",
+ create_index="CREATE INDEX ON t(k1)",
+ both_nodes=["INSERT INTO t (k1, k2, s) VALUES (0, 1, 10)",
+ "INSERT INTO t (k1, k2, s) VALUES (0, 2, 20)"],
+ only_node1=["DELETE FROM t WHERE k1 = 0 AND k2 = 1"],
+ only_node2=["DELETE FROM t WHERE k1 = 0 AND k2 = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE k1 = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_on_partition_key_column_with_not_empty_partitions(self):
+ self._skip_if_filtering_partition_columns_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k1 int, k2 int, c int, s int STATIC, PRIMARY KEY ((k1, k2), c))",
+ create_index="CREATE INDEX ON t(k1)",
+ both_nodes=["INSERT INTO t (k1, k2, c, s) VALUES (0, 1, 10, 100)",
+ "INSERT INTO t (k1, k2, c, s) VALUES (0, 2, 20, 200)"],
+ only_node1=["DELETE FROM t WHERE k1 = 0 AND k2 = 1"],
+ only_node2=["DELETE FROM t WHERE k1 = 0 AND k2 = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE k1 = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_on_clustering_key_column(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(c)",
+ both_nodes=["INSERT INTO t (k, c) VALUES (1, 0)",
+ "INSERT INTO t (k, c) VALUES (2, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 1"],
+ only_node2=["DELETE FROM t WHERE k = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE c = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_on_static_column_with_empty_partitions(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s int STATIC, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, s) VALUES (1, 0)",
+ "INSERT INTO t (k, s) VALUES (2, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 1"],
+ only_node2=["DELETE FROM t WHERE k = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE s = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_on_static_column_with_empty_partitions_and_rows_after(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s int STATIC, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, s) VALUES (1, 0)",
+ "INSERT INTO t (k, s) VALUES (2, 0)",
+ "INSERT INTO t (k, s) VALUES (3, 0)",
+ "INSERT INTO t (k, c) VALUES (3, 1)",
+ "INSERT INTO t (k, c) VALUES (3, 2)"],
+ only_node1=["DELETE FROM t WHERE k = 1"],
+ only_node2=["DELETE FROM t WHERE k = 2"])
+
+ self._assert_one("SELECT * FROM t WHERE s = 0 LIMIT 1", row=[3, 1, 0])
+ self._assert_all("SELECT * FROM t WHERE s = 0 LIMIT 10", rows=[[3, 1, 0], [3, 2, 0]])
+ self._assert_all("SELECT * FROM t WHERE s = 0", rows=[[3, 1, 0], [3, 2, 0]])
+
+ def test_complementary_deletion_with_limit_on_static_column_with_not_empty_partitions(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s int STATIC, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, c, v, s) VALUES (1, 10, 100, 0)",
+ "INSERT INTO t (k, c, v, s) VALUES (2, 20, 200, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 1"],
+ only_node2=["DELETE FROM t WHERE k = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE s = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_on_static_column_with_not_empty_partitions_and_rows_after(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s int STATIC, v int, PRIMARY KEY(k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, c, v, s) VALUES (1, 10, 100, 0)",
+ "INSERT INTO t (k, c, v, s) VALUES (2, 20, 200, 0)",
+ "INSERT INTO t (k, s) VALUES (3, 0)",
+ "INSERT INTO t (k, c) VALUES (3, 1)",
+ "INSERT INTO t (k, c) VALUES (3, 2)"],
+ only_node1=["DELETE FROM t WHERE k = 1"],
+ only_node2=["DELETE FROM t WHERE k = 2"])
+
+ self._assert_one("SELECT * FROM t WHERE s = 0 LIMIT 1", row=[3, 1, 0, None])
+ self._assert_all("SELECT * FROM t WHERE s = 0 LIMIT 10", rows=[[3, 1, 0, None], [3, 2, 0, None]])
+ self._assert_all("SELECT * FROM t WHERE s = 0", rows=[[3, 1, 0, None], [3, 2, 0, None]])
+
+ def test_complementary_deletion_with_limit_on_regular_column(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 1, 0)",
+ "INSERT INTO t (k, c, v) VALUES (0, 2, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 0 AND c = 1"],
+ only_node2=["DELETE FROM t WHERE k = 0 AND c = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 0 LIMIT 1")
+
+ def test_complementary_deletion_with_limit_and_rows_after(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 1, 0)",
+ "INSERT INTO t (k, c, v) VALUES (0, 2, 0)",
+ "INSERT INTO t (k, c, v) VALUES (0, 3, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 0 AND c = 1",
+ "INSERT INTO t (k, c, v) VALUES (0, 4, 0)"],
+ only_node2=["INSERT INTO t (k, c, v) VALUES (0, 5, 0)",
+ "DELETE FROM t WHERE k = 0 AND c = 2"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 0 LIMIT 1", row=[0, 3, 0])
+ self._assert_all("SELECT * FROM t WHERE v = 0 LIMIT 2", rows=[[0, 3, 0], [0, 4, 0]])
+ self._assert_all("SELECT * FROM t WHERE v = 0 LIMIT 3", rows=[[0, 3, 0], [0, 4, 0], [0, 5, 0]])
+ self._assert_all("SELECT * FROM t WHERE v = 0 LIMIT 4", rows=[[0, 3, 0], [0, 4, 0], [0, 5, 0]])
+
+ def test_complementary_deletion_with_limit_and_rows_between(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 1, 0)",
+ "INSERT INTO t (k, c, v) VALUES (0, 4, 0)"],
+ only_node1=["DELETE FROM t WHERE k = 0 AND c = 1"],
+ only_node2=["INSERT INTO t (k, c, v) VALUES (0, 2, 0)",
+ "INSERT INTO t (k, c, v) VALUES (0, 3, 0)",
+ "DELETE FROM t WHERE k = 0 AND c = 4"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 0 LIMIT 1", row=[0, 2, 0])
+ self._assert_all("SELECT * FROM t WHERE v = 0 LIMIT 2", rows=[[0, 2, 0], [0, 3, 0]])
+ self._assert_all("SELECT * FROM t WHERE v = 0 LIMIT 3", rows=[[0, 2, 0], [0, 3, 0]])
+
+ def test_complementary_update_with_limit_on_static_column_with_empty_partitions(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s text STATIC, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, s) VALUES (1, 'old')",
+ "INSERT INTO t (k, s) VALUES (2, 'old')"],
+ only_node1=["UPDATE t SET s = 'new' WHERE k = 1"],
+ only_node2=["UPDATE t SET s = 'new' WHERE k = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE s = 'old' LIMIT 1")
+ self._assert_one("SELECT k, c, v, s FROM t WHERE s = 'new' LIMIT 1", row=[1, None, None, 'new'])
+ self._assert_all("SELECT k, c, v, s FROM t WHERE s = 'new'",
+ rows=[[1, None, None, 'new'], [2, None, None, 'new']])
+
+ def test_complementary_update_with_limit_on_static_column_with_not_empty_partitions(self):
+ self._skip_if_index_on_static_is_not_supported()
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, s text STATIC, v int, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(s)",
+ both_nodes=["INSERT INTO t (k, c, v, s) VALUES (1, 10, 100, 'old')",
+ "INSERT INTO t (k, c, v, s) VALUES (2, 20, 200, 'old')"],
+ only_node1=["UPDATE t SET s = 'new' WHERE k = 1"],
+ only_node2=["UPDATE t SET s = 'new' WHERE k = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE s = 'old' LIMIT 1")
+ self._assert_one("SELECT k, c, v, s FROM t WHERE s = 'new' LIMIT 1", row=[1, 10, 100, 'new'])
+ self._assert_all("SELECT k, c, v, s FROM t WHERE s = 'new'", rows=[[1, 10, 100, 'new'], [2, 20, 200, 'new']])
+
+ def test_complementary_update_with_limit_on_regular_column(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old')",
+ "INSERT INTO t (k, c, v) VALUES (0, 2, 'old')"],
+ only_node1=["UPDATE t SET v = 'new' WHERE k = 0 AND c = 1"],
+ only_node2=["UPDATE t SET v = 'new' WHERE k = 0 AND c = 2"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_one("SELECT * FROM t WHERE v = 'new' LIMIT 1", row=[0, 1, 'new'])
+ self._assert_all("SELECT * FROM t WHERE v = 'new'", rows=[[0, 1, 'new'], [0, 2, 'new']])
+
+ def test_complementary_update_with_limit_and_rows_between(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old')",
+ "INSERT INTO t (k, c, v) VALUES (0, 4, 'old')"],
+ only_node1=["UPDATE t SET v = 'new' WHERE k = 0 AND c = 1"],
+ only_node2=["INSERT INTO t (k, c, v) VALUES (0, 2, 'old')",
+ "INSERT INTO t (k, c, v) VALUES (0, 3, 'old')",
+ "UPDATE t SET v = 'new' WHERE k = 0 AND c = 4"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 'old' LIMIT 1", row=[0, 2, 'old'])
+ self._assert_all("SELECT * FROM t WHERE v = 'old' LIMIT 2", rows=[[0, 2, 'old'], [0, 3, 'old']])
+ self._assert_all("SELECT * FROM t WHERE v = 'old' LIMIT 3", rows=[[0, 2, 'old'], [0, 3, 'old']])
+ self._assert_one("SELECT * FROM t WHERE v = 'new' LIMIT 1", row=[0, 1, 'new'])
+ self._assert_all("SELECT * FROM t WHERE v = 'new'", rows=[[0, 1, 'new'], [0, 4, 'new']])
+
+ def test_partition_deletion_on_skinny_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v text)",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, v) VALUES (0, 'old') USING TIMESTAMP 1"],
+ only_node2=["DELETE FROM t WHERE k = 0"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+
+ def test_partition_deletion_on_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old') USING TIMESTAMP 1"],
+ only_node2=["DELETE FROM t WHERE k = 0"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+
+ def test_row_deletion_on_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old') USING TIMESTAMP 1"],
+ only_node2=["DELETE FROM t WHERE k = 0 AND c = 1"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+
+ def test_range_deletion_on_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old') USING TIMESTAMP 1",
+ "INSERT INTO t (k, c, v) VALUES (0, 2, 'old') USING TIMESTAMP 1",
+ "INSERT INTO t (k, c, v) VALUES (0, 3, 'old') USING TIMESTAMP 1",
+ "INSERT INTO t (k, c, v) VALUES (0, 4, 'old') USING TIMESTAMP 1"],
+ only_node2=["DELETE FROM t WHERE k = 0 AND c > 1 AND c < 4"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 'old' LIMIT 1", row=[0, 1, 'old'])
+ self._assert_all("SELECT * FROM t WHERE v = 'old'", rows=[[0, 1, 'old'], [0, 4, 'old']])
+
+ def test_mismatching_insertions_on_skinny_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v text)",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, v) VALUES (0, 'old') USING TIMESTAMP 1"],
+ only_node2=["INSERT INTO t (k, v) VALUES (0, 'new') USING TIMESTAMP 2"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+ self._assert_one("SELECT * FROM t WHERE v = 'new'", row=[0, 'new'])
+
+ def test_mismatching_insertions_on_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ only_node1=["INSERT INTO t (k, c, v) VALUES (0, 1, 'old') USING TIMESTAMP 1"],
+ only_node2=["INSERT INTO t (k, c, v) VALUES (0, 1, 'new') USING TIMESTAMP 2"])
+
+ self._assert_none("SELECT * FROM t WHERE v = 'old' LIMIT 1")
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+ self._assert_one("SELECT * FROM t WHERE v = 'new'", row=[0, 1, 'new'])
+
+ def test_consistent_skinny_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v text)",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, v) VALUES (1, 'old')", # updated to 'new'
+ "INSERT INTO t(k, v) VALUES (2, 'old')",
+ "INSERT INTO t(k, v) VALUES (3, 'old')", # updated to 'new'
+ "INSERT INTO t(k, v) VALUES (4, 'old')",
+ "INSERT INTO t(k, v) VALUES (5, 'old')", # deleted partition
+ "UPDATE t SET v = 'new' WHERE k = 1",
+ "UPDATE t SET v = 'new' WHERE k = 3",
+ "DELETE FROM t WHERE k = 5"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 'old' LIMIT 1", row=[2, 'old'])
+ self._assert_one("SELECT * FROM t WHERE v = 'new' LIMIT 1", row=[1, 'new'])
+ self._assert_all("SELECT * FROM t WHERE v = 'old'", rows=[[2, 'old'], [4, 'old']])
+ self._assert_all("SELECT * FROM t WHERE v = 'new'", rows=[[1, 'new'], [3, 'new']])
+
+ def test_consistent_wide_table(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int, c int, v text, PRIMARY KEY (k, c))",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, c, v) VALUES (0, 1, 'old')", # updated to 'new'
+ "INSERT INTO t(k, c, v) VALUES (0, 2, 'old')",
+ "INSERT INTO t(k, c, v) VALUES (0, 3, 'old')", # updated to 'new'
+ "INSERT INTO t(k, c, v) VALUES (0, 4, 'old')",
+ "INSERT INTO t(k, c, v) VALUES (0, 5, 'old')", # deleted row
+ "INSERT INTO t(k, c, v) VALUES (1, 1, 'old')", # deleted partition
+ "INSERT INTO t(k, c, v) VALUES (1, 2, 'old')", # deleted partition
+ "UPDATE t SET v = 'new' WHERE k = 0 AND c = 1",
+ "UPDATE t SET v = 'new' WHERE k = 0 AND c = 3",
+ "DELETE FROM t WHERE k = 0 AND c = 5",
+ "DELETE FROM t WHERE k = 1"])
+
+ self._assert_one("SELECT * FROM t WHERE v = 'old' LIMIT 1", row=[0, 2, 'old'])
+ self._assert_one("SELECT * FROM t WHERE v = 'new' LIMIT 1", row=[0, 1, 'new'])
+ self._assert_all("SELECT * FROM t WHERE v = 'old'", rows=[[0, 2, 'old'], [0, 4, 'old']])
+ self._assert_all("SELECT * FROM t WHERE v = 'new'", rows=[[0, 1, 'new'], [0, 3, 'new']])
+
+ def test_count(self):
+ self._prepare_cluster(
+ create_table="CREATE TABLE t (k int PRIMARY KEY, v text)",
+ create_index="CREATE INDEX ON t(v)",
+ both_nodes=["INSERT INTO t(k, v) VALUES (1, 'old')",
+ "INSERT INTO t(k, v) VALUES (2, 'old')",
+ "INSERT INTO t(k, v) VALUES (3, 'old')",
+ "INSERT INTO t(k, v) VALUES (4, 'old')",
+ "INSERT INTO t(k, v) VALUES (5, 'old')"],
+ only_node1=["UPDATE t SET v = 'new' WHERE k = 2",
+ "UPDATE t SET v = 'new' WHERE k = 4"])
+
+ self._assert_one("SELECT COUNT(*) FROM t WHERE v = 'old' LIMIT 1", row=[3])
+ self._assert_one("SELECT COUNT(*) FROM t WHERE v = 'old'", row=[3])
+ self._assert_one("SELECT COUNT(*) FROM t WHERE v = 'new'", row=[2])
+
+
+@since('3.0.21')
+class TestSecondaryIndexes(ReplicaSideFiltering):
+ """
+ @jira_ticket CASSANDRA-8272
+ Tests the consistency of secondary indexes queries when some of the replicas have stale data.
+ """
+ __test__ = True
+
+ def create_index(self):
+ return True
+
+
+@since('3.0.21')
+class TestAllowFiltering(ReplicaSideFiltering):
+ """
+ @jira_ticket CASSANDRA-8273
+ Test the consistency of queries using ``ALLOW FILTERING`` when some of the replicas have stale data.
+ """
+ __test__ = True
+
+ def create_index(self):
+ return False
+
+ def _test_missed_update_with_transient_replicas(self, missed_by_transient):
+ cluster = self.cluster
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+ 'num_tokens': 1,
+ 'commitlog_sync_period_in_ms': 500,
+ 'enable_transient_replication': True,
+ 'partitioner': 'org.apache.cassandra.dht.OrderPreservingPartitioner'})
+ cluster.set_batch_commitlog(enabled=True)
+ cluster.populate(2, tokens=[0, 1], debug=True, install_byteman=True)
+ node1, node2 = cluster.nodelist()
+ cluster.start()
+
+ self.session = self.patient_exclusive_cql_connection(node1, consistency_level=CL.ALL)
+ self.session.execute("CREATE KEYSPACE %s WITH replication = "
+ "{'class': 'SimpleStrategy', 'replication_factor': '2/1'}" % (keyspace))
+ self.session.execute("USE " + keyspace)
+ self.session.execute("CREATE TABLE t (k int PRIMARY KEY, v text)"
+ " WITH speculative_retry = 'NEVER'"
+ " AND additional_write_policy = 'NEVER'"
+ " AND read_repair = 'NONE'")
+
+ # insert in both nodes with CL=ALL
+ self.session.execute("INSERT INTO t(k, v) VALUES (0, 'old')")
+
+ # update the previous value with CL=ONE only in one replica
+ node = cluster.nodelist()[1 if missed_by_transient else 0]
+ node.byteman_submit(['./byteman/stop_writes.btm'])
+ self.session.execute(SimpleStatement("UPDATE t SET v = 'new' WHERE k = 0", consistency_level=CL.ONE))
+
+ # query with CL=ALL to verify that no old values are resurrected
+ self._assert_none("SELECT * FROM t WHERE v = 'old'")
+ self._assert_one("SELECT * FROM t WHERE v = 'new'", row=[0, 'new'])
+
+ @since('4.0')
+ def test_update_missed_by_transient_replica(self):
+ self._test_missed_update_with_transient_replicas(missed_by_transient=True)
+
+ @since('4.0')
+ def test_update_only_on_transient_replica(self):
+ self._test_missed_update_with_transient_replicas(missed_by_transient=False)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org