You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:47:07 UTC

[39/50] cassandra git commit: CASSANDRA-13299: add more repair test for base with MV using different throttle

CASSANDRA-13299: add more repair test for base with MV using different throttle


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f39b468b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f39b468b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f39b468b

Branch: refs/heads/master
Commit: f39b468b3661fbe17e9960bdc4f21acea69a6893
Parents: 12dd472
Author: Zhao Yang <zh...@gmail.com>
Authored: Sun Aug 13 21:42:11 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Sep 28 05:49:11 2017 -0500

----------------------------------------------------------------------
 materialized_views_test.py | 137 ++++++++++++++++++++++++++++++++++++++--
 1 file changed, 130 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f39b468b/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 7fd8142..12728c6 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -42,6 +42,10 @@ class TestMaterializedViews(Tester):
     @since 3.0
     """
 
+    def _rows_to_list(self, rows):
+        new_list = [list(row) for row in rows]
+        return new_list
+
     def prepare(self, user_table=False, rf=1, options=None, nodes=3, install_byteman=False, **kwargs):
         cluster = self.cluster
         cluster.populate([nodes, 0], install_byteman=install_byteman)
@@ -1442,7 +1446,13 @@ class TestMaterializedViews(Tester):
             if expect_digest:
                 self.fail("Didn't find digest mismatch")
 
-    def simple_repair_test(self):
+    def simple_repair_test_by_base(self):
+        self._simple_repair_test(repair_base=True)
+
+    def simple_repair_test_by_view(self):
+        self._simple_repair_test(repair_view=True)
+
+    def _simple_repair_test(self, repair_base=False, repair_view=False):
         """
         Test that a materialized view are consistent after a simple repair.
         """
@@ -1486,16 +1496,20 @@ class TestMaterializedViews(Tester):
 
         debug('Start node2, and repair')
         node2.start(wait_other_notice=True, wait_for_binary_proto=True)
-        node1.repair()
+        if repair_base:
+            node1.nodetool("repair ks t")
+        if repair_view:
+            node1.nodetool("repair ks t_by_v")
 
-        debug('Verify the data in the MV with CL=ONE. All should be available now.')
+        debug('Verify the data in the MV with CL=ALL. All should be available now and no digest mismatch')
         for i in xrange(1000):
-            assert_one(
-                session,
+            query = SimpleStatement(
                 "SELECT * FROM t_by_v WHERE v = {}".format(i),
-                [i, i, 'a', 3.0],
-                cl=ConsistencyLevel.ONE
+                consistency_level=ConsistencyLevel.ALL
             )
+            result = session.execute(query, trace=True)
+            self.check_trace_events(result.get_query_trace(), False)
+            self.assertEquals(self._rows_to_list(result.current_rows), [[i, i, 'a', 3.0]])
 
     def base_replica_repair_test(self):
         self._base_replica_repair_test()
@@ -1682,6 +1696,115 @@ class TestMaterializedViews(Tester):
             )
 
     @attr('resource-intensive')
+    def throttled_partition_update_test(self):
+        """
+        @jira_ticket: CASSANDRA-13299, test break up large partition when repairing base with mv.
+
+        Provide a configuable batch size(cassandra.mv.mutation.row.count=100) to trottle number
+        of rows to be applied in one mutation
+        """
+
+        session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
+        node1, node2, node3, node4, node5 = self.cluster.nodelist()
+
+        for node in self.cluster.nodelist():
+            node.nodetool("disableautocompaction")
+
+        session.execute("CREATE TABLE ks.t (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY(pk, ck1, ck2))")
+        session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
+                         "WHERE pk IS NOT NULL AND ck1 IS NOT NULL AND ck2 IS NOT NULL "
+                         "PRIMARY KEY (pk, ck2, ck1)"))
+
+        session.cluster.control_connection.wait_for_schema_agreement()
+
+        debug('Shutdown node2 and node3')
+        node2.stop(wait_other_notice=True)
+        node3.stop(wait_other_notice=True)
+
+        size = 50
+        range_deletion_ts = 30
+        partition_deletion_ts = 10
+
+        for ck1 in xrange(size):
+            for ck2 in xrange(size):
+                session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
+                                " VALUES (1, {}, {}, {}, {}) USING TIMESTAMP {}".format(ck1, ck2, ck1, ck2, ck1))
+
+        self._replay_batchlogs()
+
+        for ck1 in xrange(size):
+            for ck2 in xrange(size):
+                assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
+                           [1, ck1, ck2, ck1, ck2])
+                assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
+                           [1, ck1, ck2, ck1, ck2])
+
+        debug('Shutdown node4 and node5')
+        node4.stop(wait_other_notice=True)
+        node5.stop(wait_other_notice=True)
+
+        for ck1 in xrange(size):
+            for ck2 in xrange(size):
+                if ck1 % 2 == 0:  # range tombstone
+                    session.execute("DELETE FROM ks.t USING TIMESTAMP 50 WHERE pk=1 AND ck1={}".format(ck1))
+                elif ck1 == ck2:  # row tombstone
+                    session.execute("DELETE FROM ks.t USING TIMESTAMP 60 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
+                elif ck1 == ck2 - 1:  # cell tombstone
+                    session.execute("DELETE v2 FROM ks.t USING TIMESTAMP 70 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
+
+        # range deletion
+        session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 < 30 and ck1 > 20".format(range_deletion_ts))
+        session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 = 20 and ck2 < 10".format(range_deletion_ts))
+
+        # partition deletion for ck1 <= partition_deletion_ts
+        session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1".format(partition_deletion_ts))
+        self._replay_batchlogs()
+
+        # start nodes with different batch size
+        debug('Starting nodes')
+        node2.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(2)])
+        node3.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5)])
+        node4.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(50)])
+        node5.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5000)])
+        self._replay_batchlogs()
+
+        debug('repairing base table')
+        node1.nodetool("repair ks t")
+        self._replay_batchlogs()
+
+        debug('stop cluster')
+        self.cluster.stop()
+
+        debug('rolling restart to check repaired data on each node')
+        for node in self.cluster.nodelist():
+            debug('starting {}'.format(node.name))
+            node.start(wait_other_notice=True, wait_for_binary_proto=True)
+            session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
+            for ck1 in xrange(size):
+                for ck2 in xrange(size):
+                    if (
+                        ck1 <= partition_deletion_ts or  # partition deletion
+                        ck1 == ck2 or ck1 % 2 == 0 or  # row deletion or range tombstone
+                        (ck1 > 20 and ck1 < 30) or (ck1 == 20 and ck2 < 10)  # range tombstone
+                    ):
+                        assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+                                             "ck1={} AND ck2={}".format(ck1, ck2))
+                        assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+                                             "ck1={} AND ck2={}".format(ck1, ck2))
+                    elif ck1 == ck2 - 1:  # cell tombstone
+                        assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+                                            "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
+                        assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+                                            "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
+                    else:
+                        assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+                                            "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
+                        assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+                                            "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
+            debug('stopping {}'.format(node.name))
+            node.stop(wait_other_notice=True, wait_for_binary_proto=True)
+
+    @attr('resource-intensive')
     def really_complex_repair_test(self):
         """
         Test that a materialized view are consistent after a more complex repair.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org