You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/05/26 08:24:40 UTC

[cassandra-dtest] branch master updated: Add legacy streaming test for transient replica repair tests, and test for lcs

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git


The following commit(s) were added to refs/heads/master by this push:
     new d7aacd3  Add legacy streaming test for transient replica repair tests, and test for lcs
d7aacd3 is described below

commit d7aacd3fa9b7d4c4fef80f5550a2576303e29890
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon May 11 15:51:22 2020 +0800

    Add legacy streaming test for transient replica repair tests, and test for lcs
    
     patch by Zhao Yang; reviewed by Blake Eggleston, Dinesh Joshi, Ekaterina Dimitrova for CASSANDRA-15783
---
 transient_replication_test.py | 126 +++++++++++++++++++++++++-----------------
 1 file changed, 75 insertions(+), 51 deletions(-)

diff --git a/transient_replication_test.py b/transient_replication_test.py
index 990a984..e04162f 100644
--- a/transient_replication_test.py
+++ b/transient_replication_test.py
@@ -179,6 +179,8 @@ class TransientReplicationBase(Tester):
         # Make sure digest is not attempted against the transient node
         self.node3.byteman_submit(['./byteman/throw_on_digest.btm'])
 
+    def stream_entire_sstables(self):
+        return True
 
     def replication_factor(self):
         return '3/1'
@@ -186,6 +188,10 @@ class TransientReplicationBase(Tester):
     def tokens(self):
         return [0, 1, 2]
 
+    def use_lcs(self):
+        session = self.exclusive_cql_connection(self.node1)
+        session.execute("ALTER TABLE %s.%s with compaction={'class': 'LeveledCompactionStrategy'};" % (self.keyspace, self.table))
+
     def setup_schema(self):
         session = self.exclusive_cql_connection(self.node1)
         replication_params = OrderedDict()
@@ -202,6 +208,7 @@ class TransientReplicationBase(Tester):
         patch_start(self.cluster)
         self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
                                                        'num_tokens': 1,
+                                                       'stream_entire_sstables': self.stream_entire_sstables(),
                                                        'commitlog_sync_period_in_ms': 500,
                                                        'enable_transient_replication': True,
                                                        'dynamic_snitch': False})
@@ -403,13 +410,63 @@ class TestTransientReplication(TransientReplicationBase):
                    [[1, 1, 1]],
                    cl=ConsistencyLevel.QUORUM)
 
-    def _test_speculative_write_repair_cycle(self, primary_range, optimized_repair, repair_coordinator, expect_node3_data):
+    @pytest.mark.no_vnodes
+    def test_cheap_quorums(self):
+        """ writes shouldn't make it to transient nodes """
+        session = self.exclusive_cql_connection(self.node1)
+        for node in self.nodes:
+            self.assert_has_no_sstables(node)
+
+        tm = lambda n: self.table_metrics(n)
+
+        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3:
+            assert tm1.write_count == 0
+            assert tm2.write_count == 0
+            assert tm3.write_count == 0
+            self.insert_row(1, 1, 1, session=session)
+            assert tm1.write_count == 1
+            assert tm2.write_count == 1
+            assert tm3.write_count == 0
+
+    @pytest.mark.no_vnodes
+    def test_speculative_write(self):
+        """ if a full replica isn't responding, we should send the write to the transient replica """
+        session = self.exclusive_cql_connection(self.node1)
+        self.node2.byteman_submit(['./byteman/slow_writes.btm'])
+
+        self.insert_row(1, 1, 1, session=session)
+        self.assert_local_rows(self.node1, [[1,1,1]])
+        self.assert_local_rows(self.node2, [])
+        self.assert_local_rows(self.node3, [[1,1,1]])
+
+    @pytest.mark.skip(reason="Doesn't test quite the right combination of forbidden RF changes right now")
+    def test_keyspace_rf_changes(self):
+        """ they should throw an exception """
+        session = self.exclusive_cql_connection(self.node1)
+        replication_params = OrderedDict()
+        replication_params['class'] = 'NetworkTopologyStrategy'
+        assert self.replication_factor() == '3/1'
+        replication_params['datacenter1'] = '5/2'
+        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items())
+        with pytest.raises(ConfigurationException):
+            session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params))
+
+@since('4.0')
+class TestTransientReplicationRepairStreamEntireSSTable(TransientReplicationBase):
+
+    def stream_entire_sstables(self):
+        return True
+
+    def _test_speculative_write_repair_cycle(self, primary_range, optimized_repair, repair_coordinator, expect_node3_data, use_lcs=False):
         """
         if one of the full replicas is not available, data should be written to the transient replica, but removed after incremental repair
         """
         for node in self.nodes:
             self.assert_has_no_sstables(node)
 
+        if use_lcs:
+            self.use_lcs()
+
         self.node2.byteman_submit(['./byteman/stop_writes.btm'])
         # self.insert_row(1)
         tm = lambda n: self.table_metrics(n)
@@ -438,6 +495,9 @@ class TestTransientReplication(TransientReplicationBase):
         else:
             self.assert_has_no_sstables(self.node3, compact=True)
 
+        entire_sstable = "true" if self.stream_entire_sstables() else "false"
+        assert self.node2.grep_log('Incoming stream entireSSTable={}'.format(entire_sstable), filename='debug.log')
+
     @pytest.mark.no_vnodes
     def test_speculative_write_repair_cycle(self):
         """ incremental repair from full replica should remove data on node3 """
@@ -448,7 +508,7 @@ class TestTransientReplication(TransientReplicationBase):
 
     @pytest.mark.no_vnodes
     def test_primary_range_repair(self):
-        """ optimized primary range incremental repair from full replica should remove data on node3 """
+        """ primary range incremental repair from full replica should remove data on node3 """
         self._test_speculative_write_repair_cycle(primary_range=True,
                                                   optimized_repair=False,
                                                   repair_coordinator=self.node1,
@@ -463,6 +523,15 @@ class TestTransientReplication(TransientReplicationBase):
                                                   expect_node3_data=False)
 
     @pytest.mark.no_vnodes
+    def test_optimized_primary_range_repair_with_lcs(self):
+        """ optimized primary range incremental repair from full replica should remove data on node3 using LCS """
+        self._test_speculative_write_repair_cycle(primary_range=True,
+                                                  optimized_repair=True,
+                                                  repair_coordinator=self.node1,
+                                                  expect_node3_data=False,
+                                                  use_lcs=True)
+
+    @pytest.mark.no_vnodes
     def test_transient_incremental_repair(self):
         """ transiently replicated ranges should be skipped when coordinating repairs """
         self._test_speculative_write_repair_cycle(primary_range=True,
@@ -471,35 +540,6 @@ class TestTransientReplication(TransientReplicationBase):
                                                   expect_node3_data=False)
 
     @pytest.mark.no_vnodes
-    def test_cheap_quorums(self):
-        """ writes shouldn't make it to transient nodes """
-        session = self.exclusive_cql_connection(self.node1)
-        for node in self.nodes:
-            self.assert_has_no_sstables(node)
-
-        tm = lambda n: self.table_metrics(n)
-
-        with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3:
-            assert tm1.write_count == 0
-            assert tm2.write_count == 0
-            assert tm3.write_count == 0
-            self.insert_row(1, 1, 1, session=session)
-            assert tm1.write_count == 1
-            assert tm2.write_count == 1
-            assert tm3.write_count == 0
-
-    @pytest.mark.no_vnodes
-    def test_speculative_write(self):
-        """ if a full replica isn't responding, we should send the write to the transient replica """
-        session = self.exclusive_cql_connection(self.node1)
-        self.node2.byteman_submit(['./byteman/slow_writes.btm'])
-
-        self.insert_row(1, 1, 1, session=session)
-        self.assert_local_rows(self.node1, [[1,1,1]])
-        self.assert_local_rows(self.node2, [])
-        self.assert_local_rows(self.node3, [[1,1,1]])
-
-    @pytest.mark.no_vnodes
     def test_full_repair_from_full_replica(self):
         """ full repairs shouldn't replicate data to transient replicas """
         session = self.exclusive_cql_connection(self.node1)
@@ -537,27 +577,11 @@ class TestTransientReplication(TransientReplicationBase):
         self.assert_has_sstables(self.node2, flush=True)
         self.assert_has_no_sstables(self.node3, flush=True)
 
-    @pytest.mark.skip(reason="Doesn't test quite the right combination of forbidden RF changes right now")
-    def test_keyspace_rf_changes(self):
-        """ they should throw an exception """
-        session = self.exclusive_cql_connection(self.node1)
-        replication_params = OrderedDict()
-        replication_params['class'] = 'NetworkTopologyStrategy'
-        assert self.replication_factor() == '3/1'
-        replication_params['datacenter1'] = '5/2'
-        replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items())
-        with pytest.raises(ConfigurationException):
-            session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params))
-
-    def test_disabled_read_repair(self):
-        """ shouldn't allow creating tables without read repair disabled """
-        session = self.exclusive_cql_connection(self.node1)
-        with pytest.raises(InvalidRequest):
-            session.execute("CREATE TABLE %s.tbl2 (pk int, ck int, value int, PRIMARY KEY (pk, ck))" % self.keyspace)
-
-        with pytest.raises(InvalidRequest):
-            session.execute("ALTER TABLE %s.%s WITH read_repair = 'BLOCKING'" % (self.keyspace, self.table))
+@since('4.0')
+class TestTransientReplicationRepairLegacyStreaming(TestTransientReplicationRepairStreamEntireSSTable):
 
+    def stream_entire_sstables(self):
+        return False
 
 @since('4.0')
 class TestTransientReplicationSpeculativeQueries(TransientReplicationBase):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org