You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/21 07:49:51 UTC

cassandra-dtest git commit: Add intial tests for CASSANDRA-14145

Repository: cassandra-dtest
Updated Branches:
  refs/heads/master 97529ccfb -> a5df23d10


Add intial tests for CASSANDRA-14145

Patch by Marcus Eriksson & Sam Tunnicliffe; reviewed by Jordan West for CASSANDRA-14145

closes #37


Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/a5df23d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/a5df23d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/a5df23d1

Branch: refs/heads/master
Commit: a5df23d10af2ecdcc76ebe36649e19c93da830b6
Parents: 97529cc
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Aug 23 20:40:43 2018 +0200
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Sep 21 08:46:27 2018 +0100

----------------------------------------------------------------------
 dtest_setup.py                          |   9 +-
 repair_tests/incremental_repair_test.py | 212 +++++++++++++++++++++++++++
 2 files changed, 219 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/a5df23d1/dtest_setup.py
----------------------------------------------------------------------
diff --git a/dtest_setup.py b/dtest_setup.py
index b8e1b23..756f542 100644
--- a/dtest_setup.py
+++ b/dtest_setup.py
@@ -391,11 +391,16 @@ class DTestSetup:
         # the failure detector can be quite slow in such tests with quick start/stop
         phi_values = {'phi_convict_threshold': 5}
 
+        # enable read time tracking of repaired data between replicas by default
+        repaired_data_tracking_values = {'repaired_data_tracking_for_partition_reads_enabled': 'true',
+                                         'repaired_data_tracking_for_range_reads_enabled': 'true',
+                                         'report_unconfirmed_repaired_data_mismatches': 'true'}
+
         timeout = 15000
         if self.cluster_options is not None and len(self.cluster_options) > 0:
-            values = merge_dicts(self.cluster_options, phi_values)
+            values = merge_dicts(self.cluster_options, phi_values, repaired_data_tracking_values)
         else:
-            values = merge_dicts(phi_values, {
+            values = merge_dicts(phi_values, repaired_data_tracking_values, {
                 'read_request_timeout_in_ms': timeout,
                 'range_request_timeout_in_ms': timeout,
                 'write_request_timeout_in_ms': timeout,

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/a5df23d1/repair_tests/incremental_repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py
index a4fa5a9..42c7705 100644
--- a/repair_tests/incremental_repair_test.py
+++ b/repair_tests/incremental_repair_test.py
@@ -18,6 +18,7 @@ from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_almost_equal, assert_one
 from tools.data import insert_c1c2
 from tools.misc import new_node, ImmutableMapping
+from tools.jmxutils import make_mbean, JolokiaAgent, remove_perf_disable_shared_mem
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
@@ -207,6 +208,7 @@ class TestIncRepair(Tester):
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -246,6 +248,7 @@ class TestIncRepair(Tester):
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
 
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -289,6 +292,7 @@ class TestIncRepair(Tester):
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -332,6 +336,7 @@ class TestIncRepair(Tester):
         """
         # hinted handoff can create SSTable that we don't need after node3 restarted
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -459,6 +464,7 @@ class TestIncRepair(Tester):
         * Verify repairs occurred and repairedAt was updated
         """
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+        self.init_default_config()
         self.cluster.populate(2).start()
         node1, node2 = self.cluster.nodelist()
         node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50'])
@@ -691,6 +697,7 @@ class TestIncRepair(Tester):
         """ Test repaired data remains in sync after a move """
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start()
         node1, node2, node3, node4 = self.cluster.nodelist()
 
@@ -727,6 +734,7 @@ class TestIncRepair(Tester):
         """ Test repaired data remains in sync after a decommission """
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(4).start()
         node1, node2, node3, node4 = self.cluster.nodelist()
 
@@ -763,6 +771,7 @@ class TestIncRepair(Tester):
         """ Test repaired data remains in sync after a bootstrap """
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -804,6 +813,7 @@ class TestIncRepair(Tester):
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -837,6 +847,7 @@ class TestIncRepair(Tester):
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -865,6 +876,7 @@ class TestIncRepair(Tester):
         self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -893,6 +905,7 @@ class TestIncRepair(Tester):
                                                                                      'num_tokens': 1,
                                                                                      'commitlog_sync_period_in_ms': 500,
                                                                                      'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'})
+        self.init_default_config()
         self.cluster.populate(3).start()
         node1, node2, node3 = self.cluster.nodelist()
 
@@ -918,3 +931,202 @@ class TestIncRepair(Tester):
         self.assertRepairedAndUnrepaired(node1, 'ks')
         self.assertRepairedAndUnrepaired(node2, 'ks')
         self.assertRepairedAndUnrepaired(node3, 'ks')
+
+    @since('4.0')
+    def test_repaired_tracking_with_partition_deletes(self):
+        """
+        check that when an tracking repaired data status following a digest mismatch,
+        repaired data mismatches are marked as unconfirmed as we may skip sstables
+        after the partition delete are encountered.
+        @jira_ticket CASSANDRA-14145
+        """
+        session, node1, node2 = self.setup_for_repaired_data_tracking()
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i, i))
+
+        for node in self.cluster.nodelist():
+            node.flush()
+            self.assertNoRepairedSSTables(node, 'ks')
+
+        node1.repair(options=['ks'])
+        node2.stop(wait_other_notice=True)
+
+        session.execute("delete from ks.tbl where k = 5")
+
+        node1.flush()
+        node2.start(wait_other_notice=True)
+
+        # expect unconfirmed inconsistencies as the partition deletes cause some sstables to be skipped
+        with JolokiaAgent(node1) as jmx:
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5",
+                                                     expect_unconfirmed_inconsistencies=True)
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5",
+                                                     expect_unconfirmed_inconsistencies=True)
+            # no digest reads for range queries so blocking read repair metric isn't incremented
+            # *all* sstables are read for partition ranges too, and as the repaired set is still in sync there should
+            # be no inconsistencies
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+    @since('4.0')
+    def test_repaired_tracking_with_varying_sstable_sets(self):
+        """
+        verify that repaired data digests are computed over the merged data for each replica
+        and that the particular number of sstables on each doesn't affect the comparisons
+        both replicas start with the same repaired set, comprising 2 sstables. node1's is
+        then compacted and additional unrepaired data added (which overwrites some in the
+        repaired set). We expect the repaired digests to still match as the tracking will
+        force all sstables containing the partitions to be read
+        there are two variants of this, for single partition slice & names reads and range reads
+        @jira_ticket CASSANDRA-14145
+        """
+        session, node1, node2 = self.setup_for_repaired_data_tracking()
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i, i))
+
+        for node in self.cluster.nodelist():
+            node.flush()
+
+        for i in range(10,20):
+            session.execute(stmt, (i, i, i))
+
+        for node in self.cluster.nodelist():
+            node.flush()
+            self.assertNoRepairedSSTables(node, 'ks')
+
+        node1.repair(options=['ks'])
+        node2.stop(wait_other_notice=True)
+
+        session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)")
+        session.execute("insert into ks.tbl (k, c, v) values (15, 15, 155)")
+        node1.flush()
+        node1.compact()
+        node1.compact()
+        node2.start(wait_other_notice=True)
+
+        # we don't expect any inconsistencies as all repaired data is read on both replicas
+        with JolokiaAgent(node1) as jmx:
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5")
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5")
+            # no digest reads for range queries so read repair metric isn't incremented
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+    @since('4.0')
+    def test_repaired_tracking_with_mismatching_replicas(self):
+        """
+        verify that when replicas have different repaired sets, this can be detected via the digests
+        computed at read time. All nodes have start with the same data, but only 1 replica's sstables
+        are marked repaired. Then a divergence is introduced by overwriting on 1 replica only, which
+        is required to trigger a digest mismatch & full data read (for single partition reads).
+        As the repaired sets are different between the replicas, but no other shortcutting occurs
+        (no partition tombstones or sstable skipping) and no sstables are involved in pending repair
+        session, we expect confirmed inconsistencies to be reported.
+        there are two variants of this, for single partition slice & names reads and range reads
+        @jira_ticket CASSANDRA-14145
+        """
+        session, node1, node2 = self.setup_for_repaired_data_tracking()
+        stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+        stmt.consistency_level = ConsistencyLevel.ALL
+        for i in range(10):
+            session.execute(stmt, (i, i, i))
+
+        for node in self.cluster.nodelist():
+            node.flush()
+
+        for i in range(10,20):
+            session.execute(stmt, (i, i, i))
+
+        for node in self.cluster.nodelist():
+            node.flush()
+            self.assertNoRepairedSSTables(node, 'ks')
+
+        # stop node 2 and mark its sstables repaired
+        node2.stop(wait_other_notice=True)
+        node2.run_sstablerepairedset(keyspace='ks')
+        # before restarting node2 overwrite some data on node1 to trigger digest mismatches
+        session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)")
+        node2.start(wait_for_binary_proto=True)
+
+        out1 = node1.run_sstablemetadata(keyspace='ks').stdout
+        out2 = node2.run_sstablemetadata(keyspace='ks').stdout
+
+        # verify the repaired at times for the sstables on node1/node2
+        assert all(t == 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out1)]])
+        assert all(t > 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out2)]])
+
+        # we expect inconsistencies due to sstables being marked repaired on one replica only
+        # these are marked confirmed because no sessions are pending & all sstables are
+        # skipped due to partition deletes
+        with JolokiaAgent(node1) as jmx:
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5",
+                                                     expect_confirmed_inconsistencies=True)
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5",
+                                                     expect_confirmed_inconsistencies=True)
+            # no digest reads for range queries so read repair metric isn't incremented
+            self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+    def setup_for_repaired_data_tracking(self):
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.init_default_config()
+        self.cluster.populate(2)
+        node1, node2 = self.cluster.nodelist()
+        remove_perf_disable_shared_mem(node1)  # necessary for jmx
+        self.cluster.start()
+
+        session = self.patient_exclusive_cql_connection(node1)
+        session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
+        session.execute("CREATE TABLE ks.tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'")
+        return session, node1, node2
+
+    def query_and_check_repaired_mismatches(self, jmx, session, query,
+                                            expect_read_repair=True,
+                                            expect_unconfirmed_inconsistencies=False,
+                                            expect_confirmed_inconsistencies=False):
+
+        rr_count = make_mbean('metrics', type='ReadRepair', name='ReconcileRead')
+        unconfirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesUnconfirmed,scope=tbl')
+        confirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesConfirmed,scope=tbl')
+
+        rr_before = self.get_attribute_count(jmx, rr_count)
+        uc_before = self.get_attribute_count(jmx, unconfirmed_count)
+        cc_before = self.get_attribute_count(jmx, confirmed_count)
+
+        stmt = SimpleStatement(query)
+        stmt.consistency_level = ConsistencyLevel.ALL
+        session.execute(stmt)
+
+        rr_after = self.get_attribute_count(jmx, rr_count)
+        uc_after = self.get_attribute_count(jmx, unconfirmed_count)
+        cc_after = self.get_attribute_count(jmx, confirmed_count)
+
+        logger.debug("Read Repair Count: {before}, {after}".format(before=rr_before, after=rr_after))
+        logger.debug("Unconfirmed Inconsistency Count: {before}, {after}".format(before=uc_before, after=uc_after))
+        logger.debug("Confirmed Inconsistency Count: {before}, {after}".format(before=cc_before, after=cc_after))
+
+        if expect_read_repair:
+            assert rr_after > rr_before
+        else:
+            assert rr_after == rr_before
+
+        if expect_unconfirmed_inconsistencies:
+            assert uc_after > uc_before
+        else:
+            assert uc_after == uc_before
+
+        if expect_confirmed_inconsistencies:
+            assert cc_after > cc_before
+        else:
+            assert cc_after == cc_before
+
+    def get_attribute_count(self, jmx, bean):
+        # the MBean may not have been initialized, in which case Jolokia agent will return
+        # a HTTP 404 response. If we receive such, we know that the count can only be 0
+        if jmx.has_mbean(bean):
+            return jmx.read_attribute(bean, 'Count')
+        else:
+            return 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org