You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:46:54 UTC
[26/50] cassandra git commit: CASSANDRA-11500: add dtest for complex
update/delete tombstones in MV
CASSANDRA-11500: add dtest for complex update/delete tombstones in MV
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d77ace5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d77ace5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d77ace5
Branch: refs/heads/master
Commit: 6d77ace5361f020ba182072ade9f4ab98025c213
Parents: 19b6613
Author: Zhao Yang <zh...@gmail.com>
Authored: Mon May 1 23:24:12 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Tue Sep 5 00:39:48 2017 -0500
----------------------------------------------------------------------
materialized_views_test.py | 308 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 307 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d77ace5/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 574d90f..637124d 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -66,6 +66,14 @@ class TestMaterializedViews(Tester):
return session
+ def update_view(self, session, query, flush, compact=False):
+ session.execute(query)
+ self._replay_batchlogs()
+ if flush:
+ self.cluster.flush()
+ if compact:
+ self.cluster.compact()
+
def _settle_nodes(self):
debug("Settling all nodes")
stage_match = re.compile("(?P<name>\S+)\s+(?P<active>\d+)\s+(?P<pending>\d+)\s+(?P<completed>\d+)\s+(?P<blocked>\d+)\s+(?P<alltimeblocked>\d+)")
@@ -334,7 +342,7 @@ class TestMaterializedViews(Tester):
assert_invalid(
session,
"ALTER TABLE ks.users DROP state;",
- "Cannot drop column state, depended on by materialized views"
+ "Cannot drop column state on base table with materialized views."
)
def drop_table_test(self):
@@ -974,6 +982,304 @@ class TestMaterializedViews(Tester):
cl=ConsistencyLevel.ALL
)
+ @since('3.0')
+ def test_no_base_column_in_view_pk_complex_timestamp_with_flush(self):
+ self._test_no_base_column_in_view_pk_complex_timestamp(flush=True)
+
+ @since('3.0')
+ def test_no_base_column_in_view_pk_complex_timestamp_without_flush(self):
+ self._test_no_base_column_in_view_pk_complex_timestamp(flush=False)
+
+ def _test_no_base_column_in_view_pk_complex_timestamp(self, flush):
+ """
+ Able to shadow old view row if all columns in base are removed including unselected
+ Able to recreate view row if at least one selected column alive
+
+ @jira_ticket CASSANDRA-11500
+ """
+ session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+ node1, node2, node3 = self.cluster.nodelist()
+
+ session.execute('USE ks')
+ session.execute("CREATE TABLE t (k int, c int, a int, b int, e int, f int, primary key(k, c))")
+ session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT k,c,a,b FROM t "
+ "WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)"))
+ session.cluster.control_connection.wait_for_schema_agreement()
+
+ # update unselected, view row should be alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 1 SET e=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, 1, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ # remove unselected, add selected column, view row should be alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
+
+ # remove selected column, view row is removed
+ self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=null WHERE k=1 AND c=1;", flush)
+ assert_none(session, "SELECT * FROM t")
+ assert_none(session, "SELECT * FROM mv")
+
+ # update unselected with ts=3, view row should be alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ # insert livenesssInfo, view row should be alive
+ self.update_view(session, "INSERT INTO t(k,c) VALUES(1,1) USING TIMESTAMP 3", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ # remove unselected, view row should be alive because of base livenessInfo alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=null WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ # add selected column, view row should be alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+ # update unselected, view row should be alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+ # delete with ts=3, view row should be alive due to unselected@ts4
+ self.update_view(session, "DELETE FROM t USING TIMESTAMP 3 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ # remove unselected, view row should be removed
+ self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=null WHERE k=1 AND c=1;", flush)
+ assert_none(session, "SELECT * FROM t")
+ assert_none(session, "SELECT * FROM mv")
+
+ # add selected with ts=7, view row is alive
+ self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
+
+ # remove selected with ts=7, view row is dead
+ self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=null WHERE k=1 AND c=1;", flush)
+ assert_none(session, "SELECT * FROM t")
+ assert_none(session, "SELECT * FROM mv")
+
+ # add selected with ts=5, view row is alive (selected column should not affects each other)
+ self.update_view(session, "UPDATE t USING TIMESTAMP 5 SET a=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+ # add selected with ttl=5
+ self.update_view(session, "UPDATE t USING TTL 10 SET a=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+ assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+ time.sleep(10)
+
+ # update unselected with ttl=10, view row should be alive
+ self.update_view(session, "UPDATE t USING TTL 10 SET f=1 WHERE k=1 AND c=1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+ time.sleep(10)
+
+ # view row still alive due to base livenessInfo
+ assert_none(session, "SELECT * FROM t")
+ assert_none(session, "SELECT * FROM mv")
+
+ @since('3.0')
+ def test_base_column_in_view_pk_complex_timestamp_with_flush(self):
+ self._test_base_column_in_view_pk_complex_timestamp(flush=True)
+
+ @since('3.0')
+ def test_base_column_in_view_pk_complex_timestamp_without_flush(self):
+ self._test_base_column_in_view_pk_complex_timestamp(flush=False)
+
+ def _test_base_column_in_view_pk_complex_timestamp(self, flush):
+ """
+ Able to shadow old view row with column ts greater than pk's ts and re-insert the view row
+ Able to shadow old view row with column ts smaller than pk's ts and re-insert the view row
+
+ @jira_ticket CASSANDRA-11500
+ """
+ session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+ node1, node2, node3 = self.cluster.nodelist()
+
+ session.execute('USE ks')
+ session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
+ session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
+ "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
+ session.cluster.control_connection.wait_for_schema_agreement()
+
+ # Set initial values TS=1
+ self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;", flush)
+ assert_one(session, "SELECT * FROM t", [1, 1, 1])
+ assert_one(session, "SELECT * FROM mv", [1, 1, 1])
+
+ # increase b ts to 10
+ self.update_view(session, "UPDATE t USING TIMESTAMP 10 SET b = 2 WHERE k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+
+ # switch entries. shadow a = 1, insert a = 2
+ self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET a = 2 WHERE k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
+
+ # switch entries. shadow a = 2, insert a = 1
+ self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a = 1 WHERE k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+
+ # switch entries. shadow a = 1, insert a = 2
+ self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET a = 2 WHERE k = 1;", flush, compact=True)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
+
+ # able to shadow view row even if base-column in view pk's ts is smaller than row timestamp
+ # set row TS = 20, a@6, b@20
+ self.update_view(session, "DELETE FROM t USING TIMESTAMP 5 where k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, None, 2, 10])
+ assert_none(session, "SELECT k,a,b,writetime(b) FROM mv")
+ self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 6;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+ self.update_view(session, "INSERT INTO t (k, b) VALUES (1, 1) USING TIMESTAMP 20;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 1, 20])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
+
+ # switch entries. shadow a = 1, insert a = 2
+ self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET a = 2 WHERE k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 2, 1, 7, 20])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 1, 20])
+
+ # switch entries. shadow a = 2, insert a = 1
+ self.update_view(session, "UPDATE t USING TIMESTAMP 8 SET a = 1 WHERE k = 1;", flush)
+ assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 1, 1, 8, 20])
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
+
+ # create another view row
+ self.update_view(session, "INSERT INTO t (k, a, b) VALUES (2, 2, 2);", flush)
+ assert_one(session, "SELECT k,a,b FROM t WHERE k = 2", [2, 2, 2])
+ assert_one(session, "SELECT k,a,b FROM mv WHERE k = 2", [2, 2, 2])
+
+ # stop node2, node3
+ debug('Shutdown node2')
+ node2.stop(wait_other_notice=True)
+ debug('Shutdown node3')
+ node3.stop(wait_other_notice=True)
+ # shadow a = 1, create a = 2
+ query = SimpleStatement("UPDATE t USING TIMESTAMP 9 SET a = 2 WHERE k = 1", consistency_level=ConsistencyLevel.ONE)
+ self.update_view(session, query, flush)
+ # shadow (a=2, k=2) after 3 second
+ query = SimpleStatement("UPDATE t USING TTL 3 SET a = 2 WHERE k = 2", consistency_level=ConsistencyLevel.ONE)
+ self.update_view(session, query, flush)
+
+ debug('Starting node2')
+ node2.start(wait_other_notice=True)
+ debug('Starting node3')
+ node3.start(wait_other_notice=True)
+
+ # For k = 1 & a = 1, We should get a digest mismatch of tombstones and repaired
+ query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 1", consistency_level=ConsistencyLevel.ALL)
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), True)
+ self.assertEqual(0, len(result.current_rows))
+
+ # For k = 1 & a = 1, second time no digest mismatch
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), False)
+ assert_none(session, "SELECT * FROM mv WHERE k = 1 AND a = 1")
+ self.assertEqual(0, len(result.current_rows))
+
+ # For k = 1 & a = 2, We should get a digest mismatch of data and repaired for a = 2
+ query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 2", consistency_level=ConsistencyLevel.ALL)
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), True)
+ self.assertEqual(1, len(result.current_rows))
+
+ # For k = 1 & a = 2, second time no digest mismatch
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), False)
+ self.assertEqual(1, len(result.current_rows))
+ assert_one(session, "SELECT k,a,b,writetime(b) FROM mv WHERE k = 1", [1, 2, 1, 20])
+
+ time.sleep(3)
+ # For k = 2 & a = 2, We should get a digest mismatch of expired and repaired
+ query = SimpleStatement("SELECT * FROM mv WHERE k = 2 AND a = 2", consistency_level=ConsistencyLevel.ALL)
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), True)
+ debug(result.current_rows)
+ self.assertEqual(0, len(result.current_rows))
+
+ # For k = 2 & a = 2, second time no digest mismatch
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), False)
+ self.assertEqual(0, len(result.current_rows))
+
+ @since('4.0')
+ def test_base_column_in_view_pk_commutative_tombstone_with_flush(self):
+ self._test_base_column_in_view_pk_commutative_tombstone_(flush=True)
+
+ @since('4.0')
+ def test_base_column_in_view_pk_commutative_tombstone_without_flush(self):
+ self._test_base_column_in_view_pk_commutative_tombstone_(flush=False)
+
+ def _test_base_column_in_view_pk_commutative_tombstone_(self, flush):
+ """
+ view row deletion should be commutative with newer view livenessInfo, otherwise deleted columns may be resurrected.
+ @jira_ticket CASSANDRA-13409
+ """
+ session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+ node1 = self.cluster.nodelist()[0]
+
+ session.execute('USE ks')
+ session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
+ session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
+ "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
+ session.cluster.control_connection.wait_for_schema_agreement()
+ for node in self.cluster.nodelist():
+ node.nodetool("disableautocompaction")
+
+ # sstable 1, Set initial values TS=1
+ self.update_view(session, "INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 1", flush)
+ assert_one(session, "SELECT * FROM t_by_v", [1, 1, 'a', 3.0])
+
+ # sstable 2, change v's value and TS=2, tombstones v=1 and adds v=0 record
+ self.update_view(session, "DELETE FROM t USING TIMESTAMP 2 WHERE id = 1;", flush)
+ assert_none(session, "SELECT * FROM t_by_v")
+ assert_none(session, "SELECT * FROM t")
+
+ # sstable 3, tombstones of mv created by base deletion should remain.
+ self.update_view(session, "INSERT INTO t (id, v) VALUES (1, 1) USING TIMESTAMP 3", flush)
+ assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None])
+
+ # sstable 4, shadow view row (id=1, v=1), insert (id=1, v=2, ts=4)
+ self.update_view(session, "UPDATE t USING TIMESTAMP 4 set v = 2 WHERE id = 1;", flush)
+ assert_one(session, "SELECT * FROM t_by_v", [2, 1, None, None])
+ assert_one(session, "SELECT * FROM t", [1, 2, None, None])
+
+ # sstable 5, shadow view row (id=1, v=2), insert (id=1, v=1 ts=5)
+ self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = 1 WHERE id = 1;", flush)
+ assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
+
+ if flush:
+ for node in self.cluster.nodelist():
+ sstable_files = ' '.join(node.get_sstable_data_files('ks', 't_by_v'))
+ debug('Compacting {}'.format(sstable_files))
+ node.nodetool('compact --user-defined {}'.format(sstable_files))
+ assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+ assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
+
+ # shadow view row (id=1, v=1)
+ self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = null WHERE id = 1;", flush)
+ assert_none(session, "SELECT * FROM t_by_v")
+ assert_one(session, "SELECT * FROM t", [1, None, None, None])
+
def view_tombstone_test(self):
"""
Test that a materialized views properly tombstone
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org