You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:47:07 UTC
[39/50] cassandra git commit: CASSANDRA-13299: add more repair test
for base with MV using different throttle
CASSANDRA-13299: add more repair test for base with MV using different throttle
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f39b468b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f39b468b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f39b468b
Branch: refs/heads/master
Commit: f39b468b3661fbe17e9960bdc4f21acea69a6893
Parents: 12dd472
Author: Zhao Yang <zh...@gmail.com>
Authored: Sun Aug 13 21:42:11 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Sep 28 05:49:11 2017 -0500
----------------------------------------------------------------------
materialized_views_test.py | 137 ++++++++++++++++++++++++++++++++++++++--
1 file changed, 130 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f39b468b/materialized_views_test.py
----------------------------------------------------------------------
diff --git a/materialized_views_test.py b/materialized_views_test.py
index 7fd8142..12728c6 100644
--- a/materialized_views_test.py
+++ b/materialized_views_test.py
@@ -42,6 +42,10 @@ class TestMaterializedViews(Tester):
@since 3.0
"""
+ def _rows_to_list(self, rows):
+ new_list = [list(row) for row in rows]
+ return new_list
+
def prepare(self, user_table=False, rf=1, options=None, nodes=3, install_byteman=False, **kwargs):
cluster = self.cluster
cluster.populate([nodes, 0], install_byteman=install_byteman)
@@ -1442,7 +1446,13 @@ class TestMaterializedViews(Tester):
if expect_digest:
self.fail("Didn't find digest mismatch")
- def simple_repair_test(self):
+ def simple_repair_test_by_base(self):
+ self._simple_repair_test(repair_base=True)
+
+ def simple_repair_test_by_view(self):
+ self._simple_repair_test(repair_view=True)
+
+ def _simple_repair_test(self, repair_base=False, repair_view=False):
"""
Test that a materialized view are consistent after a simple repair.
"""
@@ -1486,16 +1496,20 @@ class TestMaterializedViews(Tester):
debug('Start node2, and repair')
node2.start(wait_other_notice=True, wait_for_binary_proto=True)
- node1.repair()
+ if repair_base:
+ node1.nodetool("repair ks t")
+ if repair_view:
+ node1.nodetool("repair ks t_by_v")
- debug('Verify the data in the MV with CL=ONE. All should be available now.')
+ debug('Verify the data in the MV with CL=ALL. All should be available now and no digest mismatch')
for i in xrange(1000):
- assert_one(
- session,
+ query = SimpleStatement(
"SELECT * FROM t_by_v WHERE v = {}".format(i),
- [i, i, 'a', 3.0],
- cl=ConsistencyLevel.ONE
+ consistency_level=ConsistencyLevel.ALL
)
+ result = session.execute(query, trace=True)
+ self.check_trace_events(result.get_query_trace(), False)
+ self.assertEquals(self._rows_to_list(result.current_rows), [[i, i, 'a', 3.0]])
def base_replica_repair_test(self):
self._base_replica_repair_test()
@@ -1682,6 +1696,115 @@ class TestMaterializedViews(Tester):
)
@attr('resource-intensive')
+ def throttled_partition_update_test(self):
+ """
+ @jira_ticket: CASSANDRA-13299, test break up large partition when repairing base with mv.
+
+ Provide a configuable batch size(cassandra.mv.mutation.row.count=100) to trottle number
+ of rows to be applied in one mutation
+ """
+
+ session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
+ node1, node2, node3, node4, node5 = self.cluster.nodelist()
+
+ for node in self.cluster.nodelist():
+ node.nodetool("disableautocompaction")
+
+ session.execute("CREATE TABLE ks.t (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY(pk, ck1, ck2))")
+ session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
+ "WHERE pk IS NOT NULL AND ck1 IS NOT NULL AND ck2 IS NOT NULL "
+ "PRIMARY KEY (pk, ck2, ck1)"))
+
+ session.cluster.control_connection.wait_for_schema_agreement()
+
+ debug('Shutdown node2 and node3')
+ node2.stop(wait_other_notice=True)
+ node3.stop(wait_other_notice=True)
+
+ size = 50
+ range_deletion_ts = 30
+ partition_deletion_ts = 10
+
+ for ck1 in xrange(size):
+ for ck2 in xrange(size):
+ session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
+ " VALUES (1, {}, {}, {}, {}) USING TIMESTAMP {}".format(ck1, ck2, ck1, ck2, ck1))
+
+ self._replay_batchlogs()
+
+ for ck1 in xrange(size):
+ for ck2 in xrange(size):
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
+ [1, ck1, ck2, ck1, ck2])
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
+ [1, ck1, ck2, ck1, ck2])
+
+ debug('Shutdown node4 and node5')
+ node4.stop(wait_other_notice=True)
+ node5.stop(wait_other_notice=True)
+
+ for ck1 in xrange(size):
+ for ck2 in xrange(size):
+ if ck1 % 2 == 0: # range tombstone
+ session.execute("DELETE FROM ks.t USING TIMESTAMP 50 WHERE pk=1 AND ck1={}".format(ck1))
+ elif ck1 == ck2: # row tombstone
+ session.execute("DELETE FROM ks.t USING TIMESTAMP 60 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
+ elif ck1 == ck2 - 1: # cell tombstone
+ session.execute("DELETE v2 FROM ks.t USING TIMESTAMP 70 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
+
+ # range deletion
+ session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 < 30 and ck1 > 20".format(range_deletion_ts))
+ session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 = 20 and ck2 < 10".format(range_deletion_ts))
+
+ # partition deletion for ck1 <= partition_deletion_ts
+ session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1".format(partition_deletion_ts))
+ self._replay_batchlogs()
+
+ # start nodes with different batch size
+ debug('Starting nodes')
+ node2.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(2)])
+ node3.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5)])
+ node4.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(50)])
+ node5.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5000)])
+ self._replay_batchlogs()
+
+ debug('repairing base table')
+ node1.nodetool("repair ks t")
+ self._replay_batchlogs()
+
+ debug('stop cluster')
+ self.cluster.stop()
+
+ debug('rolling restart to check repaired data on each node')
+ for node in self.cluster.nodelist():
+ debug('starting {}'.format(node.name))
+ node.start(wait_other_notice=True, wait_for_binary_proto=True)
+ session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
+ for ck1 in xrange(size):
+ for ck2 in xrange(size):
+ if (
+ ck1 <= partition_deletion_ts or # partition deletion
+ ck1 == ck2 or ck1 % 2 == 0 or # row deletion or range tombstone
+ (ck1 > 20 and ck1 < 30) or (ck1 == 20 and ck2 < 10) # range tombstone
+ ):
+ assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2))
+ assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2))
+ elif ck1 == ck2 - 1: # cell tombstone
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
+ else:
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
+ assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
+ "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
+ debug('stopping {}'.format(node.name))
+ node.stop(wait_other_notice=True, wait_for_binary_proto=True)
+
+ @attr('resource-intensive')
def really_complex_repair_test(self):
"""
Test that a materialized view are consistent after a more complex repair.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org