You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/05/26 08:24:40 UTC
[cassandra-dtest] branch master updated: Add legacy streaming test
for transient replica repair tests, and test for lcs
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git
The following commit(s) were added to refs/heads/master by this push:
new d7aacd3 Add legacy streaming test for transient replica repair tests, and test for lcs
d7aacd3 is described below
commit d7aacd3fa9b7d4c4fef80f5550a2576303e29890
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon May 11 15:51:22 2020 +0800
Add legacy streaming test for transient replica repair tests, and test for lcs
patch by Zhao Yang; reviewed by Blake Eggleston, Dinesh Joshi, Ekaterina Dimitrova for CASSANDRA-15783
---
transient_replication_test.py | 126 +++++++++++++++++++++++++-----------------
1 file changed, 75 insertions(+), 51 deletions(-)
diff --git a/transient_replication_test.py b/transient_replication_test.py
index 990a984..e04162f 100644
--- a/transient_replication_test.py
+++ b/transient_replication_test.py
@@ -179,6 +179,8 @@ class TransientReplicationBase(Tester):
# Make sure digest is not attempted against the transient node
self.node3.byteman_submit(['./byteman/throw_on_digest.btm'])
+ def stream_entire_sstables(self):
+ return True
def replication_factor(self):
return '3/1'
@@ -186,6 +188,10 @@ class TransientReplicationBase(Tester):
def tokens(self):
return [0, 1, 2]
+ def use_lcs(self):
+ session = self.exclusive_cql_connection(self.node1)
+ session.execute("ALTER TABLE %s.%s with compaction={'class': 'LeveledCompactionStrategy'};" % (self.keyspace, self.table))
+
def setup_schema(self):
session = self.exclusive_cql_connection(self.node1)
replication_params = OrderedDict()
@@ -202,6 +208,7 @@ class TransientReplicationBase(Tester):
patch_start(self.cluster)
self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
'num_tokens': 1,
+ 'stream_entire_sstables': self.stream_entire_sstables(),
'commitlog_sync_period_in_ms': 500,
'enable_transient_replication': True,
'dynamic_snitch': False})
@@ -403,13 +410,63 @@ class TestTransientReplication(TransientReplicationBase):
[[1, 1, 1]],
cl=ConsistencyLevel.QUORUM)
- def _test_speculative_write_repair_cycle(self, primary_range, optimized_repair, repair_coordinator, expect_node3_data):
+ @pytest.mark.no_vnodes
+ def test_cheap_quorums(self):
+ """ writes shouldn't make it to transient nodes """
+ session = self.exclusive_cql_connection(self.node1)
+ for node in self.nodes:
+ self.assert_has_no_sstables(node)
+
+ tm = lambda n: self.table_metrics(n)
+
+ with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3:
+ assert tm1.write_count == 0
+ assert tm2.write_count == 0
+ assert tm3.write_count == 0
+ self.insert_row(1, 1, 1, session=session)
+ assert tm1.write_count == 1
+ assert tm2.write_count == 1
+ assert tm3.write_count == 0
+
+ @pytest.mark.no_vnodes
+ def test_speculative_write(self):
+ """ if a full replica isn't responding, we should send the write to the transient replica """
+ session = self.exclusive_cql_connection(self.node1)
+ self.node2.byteman_submit(['./byteman/slow_writes.btm'])
+
+ self.insert_row(1, 1, 1, session=session)
+ self.assert_local_rows(self.node1, [[1,1,1]])
+ self.assert_local_rows(self.node2, [])
+ self.assert_local_rows(self.node3, [[1,1,1]])
+
+ @pytest.mark.skip(reason="Doesn't test quite the right combination of forbidden RF changes right now")
+ def test_keyspace_rf_changes(self):
+ """ they should throw an exception """
+ session = self.exclusive_cql_connection(self.node1)
+ replication_params = OrderedDict()
+ replication_params['class'] = 'NetworkTopologyStrategy'
+ assert self.replication_factor() == '3/1'
+ replication_params['datacenter1'] = '5/2'
+ replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items())
+ with pytest.raises(ConfigurationException):
+ session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params))
+
+@since('4.0')
+class TestTransientReplicationRepairStreamEntireSSTable(TransientReplicationBase):
+
+ def stream_entire_sstables(self):
+ return True
+
+ def _test_speculative_write_repair_cycle(self, primary_range, optimized_repair, repair_coordinator, expect_node3_data, use_lcs=False):
"""
if one of the full replicas is not available, data should be written to the transient replica, but removed after incremental repair
"""
for node in self.nodes:
self.assert_has_no_sstables(node)
+ if use_lcs:
+ self.use_lcs()
+
self.node2.byteman_submit(['./byteman/stop_writes.btm'])
# self.insert_row(1)
tm = lambda n: self.table_metrics(n)
@@ -438,6 +495,9 @@ class TestTransientReplication(TransientReplicationBase):
else:
self.assert_has_no_sstables(self.node3, compact=True)
+ entire_sstable = "true" if self.stream_entire_sstables() else "false"
+ assert self.node2.grep_log('Incoming stream entireSSTable={}'.format(entire_sstable), filename='debug.log')
+
@pytest.mark.no_vnodes
def test_speculative_write_repair_cycle(self):
""" incremental repair from full replica should remove data on node3 """
@@ -448,7 +508,7 @@ class TestTransientReplication(TransientReplicationBase):
@pytest.mark.no_vnodes
def test_primary_range_repair(self):
- """ optimized primary range incremental repair from full replica should remove data on node3 """
+ """ primary range incremental repair from full replica should remove data on node3 """
self._test_speculative_write_repair_cycle(primary_range=True,
optimized_repair=False,
repair_coordinator=self.node1,
@@ -463,6 +523,15 @@ class TestTransientReplication(TransientReplicationBase):
expect_node3_data=False)
@pytest.mark.no_vnodes
+ def test_optimized_primary_range_repair_with_lcs(self):
+ """ optimized primary range incremental repair from full replica should remove data on node3 using LCS """
+ self._test_speculative_write_repair_cycle(primary_range=True,
+ optimized_repair=True,
+ repair_coordinator=self.node1,
+ expect_node3_data=False,
+ use_lcs=True)
+
+ @pytest.mark.no_vnodes
def test_transient_incremental_repair(self):
""" transiently replicated ranges should be skipped when coordinating repairs """
self._test_speculative_write_repair_cycle(primary_range=True,
@@ -471,35 +540,6 @@ class TestTransientReplication(TransientReplicationBase):
expect_node3_data=False)
@pytest.mark.no_vnodes
- def test_cheap_quorums(self):
- """ writes shouldn't make it to transient nodes """
- session = self.exclusive_cql_connection(self.node1)
- for node in self.nodes:
- self.assert_has_no_sstables(node)
-
- tm = lambda n: self.table_metrics(n)
-
- with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3:
- assert tm1.write_count == 0
- assert tm2.write_count == 0
- assert tm3.write_count == 0
- self.insert_row(1, 1, 1, session=session)
- assert tm1.write_count == 1
- assert tm2.write_count == 1
- assert tm3.write_count == 0
-
- @pytest.mark.no_vnodes
- def test_speculative_write(self):
- """ if a full replica isn't responding, we should send the write to the transient replica """
- session = self.exclusive_cql_connection(self.node1)
- self.node2.byteman_submit(['./byteman/slow_writes.btm'])
-
- self.insert_row(1, 1, 1, session=session)
- self.assert_local_rows(self.node1, [[1,1,1]])
- self.assert_local_rows(self.node2, [])
- self.assert_local_rows(self.node3, [[1,1,1]])
-
- @pytest.mark.no_vnodes
def test_full_repair_from_full_replica(self):
""" full repairs shouldn't replicate data to transient replicas """
session = self.exclusive_cql_connection(self.node1)
@@ -537,27 +577,11 @@ class TestTransientReplication(TransientReplicationBase):
self.assert_has_sstables(self.node2, flush=True)
self.assert_has_no_sstables(self.node3, flush=True)
- @pytest.mark.skip(reason="Doesn't test quite the right combination of forbidden RF changes right now")
- def test_keyspace_rf_changes(self):
- """ they should throw an exception """
- session = self.exclusive_cql_connection(self.node1)
- replication_params = OrderedDict()
- replication_params['class'] = 'NetworkTopologyStrategy'
- assert self.replication_factor() == '3/1'
- replication_params['datacenter1'] = '5/2'
- replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items())
- with pytest.raises(ConfigurationException):
- session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params))
-
- def test_disabled_read_repair(self):
- """ shouldn't allow creating tables without read repair disabled """
- session = self.exclusive_cql_connection(self.node1)
- with pytest.raises(InvalidRequest):
- session.execute("CREATE TABLE %s.tbl2 (pk int, ck int, value int, PRIMARY KEY (pk, ck))" % self.keyspace)
-
- with pytest.raises(InvalidRequest):
- session.execute("ALTER TABLE %s.%s WITH read_repair = 'BLOCKING'" % (self.keyspace, self.table))
+@since('4.0')
+class TestTransientReplicationRepairLegacyStreaming(TestTransientReplicationRepairStreamEntireSSTable):
+ def stream_entire_sstables(self):
+ return False
@since('4.0')
class TestTransientReplicationSpeculativeQueries(TransientReplicationBase):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org