You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:46:54 UTC

[26/50] cassandra git commit: CASSANDRA-11500: add dtest for complex update/delete tombstones in MV

CASSANDRA-11500: add dtest for complex update/delete tombstones in MV


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d77ace5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d77ace5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d77ace5

Branch: refs/heads/master
Commit: 6d77ace5361f020ba182072ade9f4ab98025c213
Parents: 19b6613
Author: Zhao Yang <zh...@gmail.com>
Authored: Mon May 1 23:24:12 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Tue Sep 5 00:39:48 2017 -0500

----------------------------------------------------------------------
 materialized_views_test.py | 308 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 307 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d77ace5/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 574d90f..637124d 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -66,6 +66,14 @@ class TestMaterializedViews(Tester):
 
         return session
 
+    def update_view(self, session, query, flush, compact=False):
+        session.execute(query)
+        self._replay_batchlogs()
+        if flush:
+            self.cluster.flush()
+        if compact:
+            self.cluster.compact()
+
     def _settle_nodes(self):
         debug("Settling all nodes")
         stage_match = re.compile("(?P<name>\S+)\s+(?P<active>\d+)\s+(?P<pending>\d+)\s+(?P<completed>\d+)\s+(?P<blocked>\d+)\s+(?P<alltimeblocked>\d+)")
@@ -334,7 +342,7 @@ class TestMaterializedViews(Tester):
         assert_invalid(
             session,
             "ALTER TABLE ks.users DROP state;",
-            "Cannot drop column state, depended on by materialized views"
+            "Cannot drop column state on base table with materialized views."
         )
 
     def drop_table_test(self):
@@ -974,6 +982,304 @@ class TestMaterializedViews(Tester):
                 cl=ConsistencyLevel.ALL
             )
 
+    @since('3.0')
+    def test_no_base_column_in_view_pk_complex_timestamp_with_flush(self):
+        self._test_no_base_column_in_view_pk_complex_timestamp(flush=True)
+
+    @since('3.0')
+    def test_no_base_column_in_view_pk_complex_timestamp_without_flush(self):
+        self._test_no_base_column_in_view_pk_complex_timestamp(flush=False)
+
+    def _test_no_base_column_in_view_pk_complex_timestamp(self, flush):
+        """
+        Able to shadow old view row if all columns in base are removed including unselected
+        Able to recreate view row if at least one selected column alive
+
+        @jira_ticket CASSANDRA-11500
+        """
+        session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+        node1, node2, node3 = self.cluster.nodelist()
+
+        session.execute('USE ks')
+        session.execute("CREATE TABLE t (k int, c int, a int, b int, e int, f int, primary key(k, c))")
+        session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT k,c,a,b FROM t "
+                         "WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)"))
+        session.cluster.control_connection.wait_for_schema_agreement()
+
+        # update unselected, view row should be alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 1 SET e=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, 1, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        # remove unselected, add selected column, view row should be alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
+
+        # remove selected column, view row is removed
+        self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=null WHERE k=1 AND c=1;", flush)
+        assert_none(session, "SELECT * FROM t")
+        assert_none(session, "SELECT * FROM mv")
+
+        # update unselected with ts=3, view row should be alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        # insert livenesssInfo, view row should be alive
+        self.update_view(session, "INSERT INTO t(k,c) VALUES(1,1) USING TIMESTAMP 3", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        # remove unselected, view row should be alive because of base livenessInfo alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=null WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        # add selected column, view row should be alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+        # update unselected, view row should be alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+        # delete with ts=3, view row should be alive due to unselected@ts4
+        self.update_view(session, "DELETE FROM t USING TIMESTAMP 3 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        # remove unselected, view row should be removed
+        self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=null WHERE k=1 AND c=1;", flush)
+        assert_none(session, "SELECT * FROM t")
+        assert_none(session, "SELECT * FROM mv")
+
+        # add selected with ts=7, view row is alive
+        self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
+
+        # remove selected with ts=7, view row is dead
+        self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=null WHERE k=1 AND c=1;", flush)
+        assert_none(session, "SELECT * FROM t")
+        assert_none(session, "SELECT * FROM mv")
+
+        # add selected with ts=5, view row is alive (selected column should not affects each other)
+        self.update_view(session, "UPDATE t USING TIMESTAMP 5 SET a=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+        # add selected with ttl=5
+        self.update_view(session, "UPDATE t USING TTL 10 SET a=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
+        assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
+
+        time.sleep(10)
+
+        # update unselected with ttl=10, view row should be alive
+        self.update_view(session, "UPDATE t USING TTL 10 SET f=1 WHERE k=1 AND c=1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
+
+        time.sleep(10)
+
+        # view row still alive due to base livenessInfo
+        assert_none(session, "SELECT * FROM t")
+        assert_none(session, "SELECT * FROM mv")
+
+    @since('3.0')
+    def test_base_column_in_view_pk_complex_timestamp_with_flush(self):
+        self._test_base_column_in_view_pk_complex_timestamp(flush=True)
+
+    @since('3.0')
+    def test_base_column_in_view_pk_complex_timestamp_without_flush(self):
+        self._test_base_column_in_view_pk_complex_timestamp(flush=False)
+
+    def _test_base_column_in_view_pk_complex_timestamp(self, flush):
+        """
+        Able to shadow old view row with column ts greater than pk's ts and re-insert the view row
+        Able to shadow old view row with column ts smaller than pk's ts and re-insert the view row
+
+        @jira_ticket CASSANDRA-11500
+        """
+        session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+        node1, node2, node3 = self.cluster.nodelist()
+
+        session.execute('USE ks')
+        session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
+        session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
+                         "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
+        session.cluster.control_connection.wait_for_schema_agreement()
+
+        # Set initial values TS=1
+        self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;", flush)
+        assert_one(session, "SELECT * FROM t", [1, 1, 1])
+        assert_one(session, "SELECT * FROM mv", [1, 1, 1])
+
+        # increase b ts to 10
+        self.update_view(session, "UPDATE t USING TIMESTAMP 10 SET b = 2 WHERE k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+
+        # switch entries. shadow a = 1, insert a = 2
+        self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET a = 2 WHERE k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
+
+        # switch entries. shadow a = 2, insert a = 1
+        self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a = 1 WHERE k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+
+        # switch entries. shadow a = 1, insert a = 2
+        self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET a = 2 WHERE k = 1;", flush, compact=True)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
+
+        # able to shadow view row even if base-column in view pk's ts is smaller than row timestamp
+        # set row TS = 20, a@6, b@20
+        self.update_view(session, "DELETE FROM t USING TIMESTAMP 5 where k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, None, 2, 10])
+        assert_none(session, "SELECT k,a,b,writetime(b) FROM mv")
+        self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 6;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
+        self.update_view(session, "INSERT INTO t (k, b) VALUES (1, 1) USING TIMESTAMP 20;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 1, 20])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
+
+        # switch entries. shadow a = 1, insert a = 2
+        self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET a = 2 WHERE k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 2, 1, 7, 20])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 1, 20])
+
+        # switch entries. shadow a = 2, insert a = 1
+        self.update_view(session, "UPDATE t USING TIMESTAMP 8 SET a = 1 WHERE k = 1;", flush)
+        assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 1, 1, 8, 20])
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
+
+        # create another view row
+        self.update_view(session, "INSERT INTO t (k, a, b) VALUES (2, 2, 2);", flush)
+        assert_one(session, "SELECT k,a,b FROM t WHERE k = 2", [2, 2, 2])
+        assert_one(session, "SELECT k,a,b FROM mv WHERE k = 2", [2, 2, 2])
+
+        # stop node2, node3
+        debug('Shutdown node2')
+        node2.stop(wait_other_notice=True)
+        debug('Shutdown node3')
+        node3.stop(wait_other_notice=True)
+        # shadow a = 1, create a = 2
+        query = SimpleStatement("UPDATE t USING TIMESTAMP 9 SET a = 2 WHERE k = 1", consistency_level=ConsistencyLevel.ONE)
+        self.update_view(session, query, flush)
+        # shadow (a=2, k=2) after 3 second
+        query = SimpleStatement("UPDATE t USING TTL 3 SET a = 2 WHERE k = 2", consistency_level=ConsistencyLevel.ONE)
+        self.update_view(session, query, flush)
+
+        debug('Starting node2')
+        node2.start(wait_other_notice=True)
+        debug('Starting node3')
+        node3.start(wait_other_notice=True)
+
+        # For k = 1 & a = 1, We should get a digest mismatch of tombstones and repaired
+        query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 1", consistency_level=ConsistencyLevel.ALL)
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), True)
+        self.assertEqual(0, len(result.current_rows))
+
+        # For k = 1 & a = 1, second time no digest mismatch
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), False)
+        assert_none(session, "SELECT * FROM mv WHERE k = 1 AND a = 1")
+        self.assertEqual(0, len(result.current_rows))
+
+        # For k = 1 & a = 2, We should get a digest mismatch of data and repaired for a = 2
+        query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 2", consistency_level=ConsistencyLevel.ALL)
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), True)
+        self.assertEqual(1, len(result.current_rows))
+
+        # For k = 1 & a = 2, second time no digest mismatch
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), False)
+        self.assertEqual(1, len(result.current_rows))
+        assert_one(session, "SELECT k,a,b,writetime(b) FROM mv WHERE k = 1", [1, 2, 1, 20])
+
+        time.sleep(3)
+        # For k = 2 & a = 2, We should get a digest mismatch of expired and repaired
+        query = SimpleStatement("SELECT * FROM mv WHERE k = 2 AND a = 2", consistency_level=ConsistencyLevel.ALL)
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), True)
+        debug(result.current_rows)
+        self.assertEqual(0, len(result.current_rows))
+
+        # For k = 2 & a = 2, second time no digest mismatch
+        result = session.execute(query, trace=True)
+        self.check_trace_events(result.get_query_trace(), False)
+        self.assertEqual(0, len(result.current_rows))
+
+    @since('4.0')
+    def test_base_column_in_view_pk_commutative_tombstone_with_flush(self):
+        self._test_base_column_in_view_pk_commutative_tombstone_(flush=True)
+
+    @since('4.0')
+    def test_base_column_in_view_pk_commutative_tombstone_without_flush(self):
+        self._test_base_column_in_view_pk_commutative_tombstone_(flush=False)
+
+    def _test_base_column_in_view_pk_commutative_tombstone_(self, flush):
+        """
+        view row deletion should be commutative with newer view livenessInfo, otherwise deleted columns may be resurrected.
+        @jira_ticket CASSANDRA-13409
+        """
+        session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
+        node1 = self.cluster.nodelist()[0]
+
+        session.execute('USE ks')
+        session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
+        session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
+                         "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
+        session.cluster.control_connection.wait_for_schema_agreement()
+        for node in self.cluster.nodelist():
+            node.nodetool("disableautocompaction")
+
+        # sstable 1, Set initial values TS=1
+        self.update_view(session, "INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 1", flush)
+        assert_one(session, "SELECT * FROM t_by_v", [1, 1, 'a', 3.0])
+
+        # sstable 2, change v's value and TS=2, tombstones v=1 and adds v=0 record
+        self.update_view(session, "DELETE FROM t USING TIMESTAMP 2 WHERE id = 1;", flush)
+        assert_none(session, "SELECT * FROM t_by_v")
+        assert_none(session, "SELECT * FROM t")
+
+        # sstable 3, tombstones of mv created by base deletion should remain.
+        self.update_view(session, "INSERT INTO t (id, v) VALUES (1, 1) USING TIMESTAMP 3", flush)
+        assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None])
+
+        # sstable 4, shadow view row (id=1, v=1), insert (id=1, v=2, ts=4)
+        self.update_view(session, "UPDATE t USING TIMESTAMP 4 set v = 2 WHERE id = 1;", flush)
+        assert_one(session, "SELECT * FROM t_by_v", [2, 1, None, None])
+        assert_one(session, "SELECT * FROM t", [1, 2, None, None])
+
+        # sstable 5, shadow view row (id=1, v=2), insert (id=1, v=1 ts=5)
+        self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = 1 WHERE id = 1;", flush)
+        assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+        assert_one(session, "SELECT * FROM t", [1, 1, None, None])  # data deleted by row-tombstone@2 should not resurrect
+
+        if flush:
+            for node in self.cluster.nodelist():
+                sstable_files = ' '.join(node.get_sstable_data_files('ks', 't_by_v'))
+                debug('Compacting {}'.format(sstable_files))
+                node.nodetool('compact --user-defined {}'.format(sstable_files))
+            assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
+            assert_one(session, "SELECT * FROM t", [1, 1, None, None])  # data deleted by row-tombstone@2 should not resurrect
+
+        # shadow view row (id=1, v=1)
+        self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = null WHERE id = 1;", flush)
+        assert_none(session, "SELECT * FROM t_by_v")
+        assert_one(session, "SELECT * FROM t", [1, None, None, None])
+
     def view_tombstone_test(self):
         """
         Test that a materialized views properly tombstone


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org