You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:46:51 UTC

[23/50] cassandra git commit: Add tests for mixed version batchlog replay

Add tests for mixed version batchlog replay

Patch by Jeff Jirsa; reviewed by Aleksey Yeschenko


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ac9c9560
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ac9c9560
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ac9c9560

Branch: refs/heads/master
Commit: ac9c95607ce439de596da41c368d79c67d6dcdda
Parents: 6d5ee37
Author: Jeff Jirsa <jj...@apple.com>
Authored: Mon Aug 14 12:55:17 2017 -0700
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Sat Aug 26 01:21:00 2017 +0100

----------------------------------------------------------------------
 batch_test.py                         | 96 +++++++++++++++++++++++-------
 byteman/fail_after_batchlog_write.btm | 19 ++++++
 2 files changed, 94 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9c9560/batch_test.py
----------------------------------------------------------------------
diff --git a/batch_test.py b/batch_test.py
index 4194f10..5c25c46 100644
--- a/batch_test.py
+++ b/batch_test.py
@@ -1,6 +1,7 @@
 import sys
 import time
 from unittest import skipIf
+from nose.tools import assert_greater_equal
 
 from cassandra import ConsistencyLevel, Timeout, Unavailable
 from cassandra.query import SimpleStatement
@@ -9,6 +10,8 @@ from dtest import Tester, create_ks, debug
 from tools.assertions import (assert_all, assert_invalid, assert_one,
                               assert_unavailable)
 from tools.decorators import since
+from tools.jmxutils import (JolokiaAgent, make_mbean,
+                            remove_perf_disable_shared_mem)
 
 
 class TestBatch(Tester):
@@ -295,6 +298,15 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4)
 
     @since('3.0', max_version='3.x')
+    def batchlog_replay_compatibility_1_test(self):
+        """
+        @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster.
+
+        Here we have one 3.0/3.x node and two 2.2 nodes and we send the batch request to the 3.0 node.
+        """
+        self._batchlog_replay_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4)
+
+    @since('3.0', max_version='3.x')
     @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
     def logged_batch_compatibility_2_test(self):
         """
@@ -324,6 +336,15 @@ class TestBatch(Tester):
         self._logged_batch_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4)
 
     @since('3.0', max_version='3.x')
+    def batchlog_replay_compatibility_4_test(self):
+        """
+        @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster.
+
+        Here we have two 3.0/3.x nodes and one 2.2 node and we send the batch request to the 2.2 node.
+        """
+        self._batchlog_replay_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4)
+
+    @since('3.0', max_version='3.x')
     @skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
     def logged_batch_compatibility_5_test(self):
         """
@@ -346,6 +367,43 @@ class TestBatch(Tester):
         res = sorted(rows)
         self.assertEquals([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]), list(res[1])])
 
+    def _batchlog_replay_compatibility_test(self, coordinator_idx, current_nodes, previous_version, previous_nodes, protocol_version):
+        session = self.prepare_mixed(coordinator_idx, current_nodes, previous_version, previous_nodes,
+                                     protocol_version=protocol_version, install_byteman=True)
+
+        coordinator = self.cluster.nodelist()[coordinator_idx]
+        coordinator.byteman_submit(['./byteman/fail_after_batchlog_write.btm'])
+        debug("Injected byteman scripts to enable batchlog replay {}".format(coordinator.name))
+
+        query = """
+            BEGIN BATCH
+            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
+            INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
+            APPLY BATCH
+        """
+        session.execute(query)
+
+        total_batches_replayed = 0
+        blm = make_mbean('db', type='BatchlogManager')
+
+        for n in self.cluster.nodelist():
+            if n == coordinator:
+                continue
+
+            with JolokiaAgent(n) as jmx:
+                debug('Forcing batchlog replay for {}'.format(n.name))
+                jmx.execute_method(blm, 'forceBatchlogReplay')
+                batches_replayed = jmx.read_attribute(blm, 'TotalBatchesReplayed')
+                debug('{} batches replayed on node {}'.format(batches_replayed, n.name))
+                total_batches_replayed += batches_replayed
+
+        assert_greater_equal(total_batches_replayed, 2)
+
+        for node in self.cluster.nodelist():
+            session = self.patient_exclusive_cql_connection(node, protocol_version=protocol_version)
+            rows = sorted(session.execute('SELECT id, firstname, lastname FROM ks.users'))
+            self.assertEqual([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(rows[0]), list(rows[1])])
+
     def assert_timedout(self, session, query, cl, acknowledged_by=None,
                         received_responses=None):
         try:
@@ -366,15 +424,17 @@ class TestBatch(Tester):
         else:
             assert False, "Expecting TimedOutException but no exception was raised"
 
-    def prepare(self, nodes=1, compression=True, version=None, protocol_version=None):
-        if not self.cluster.nodelist():
-            self.cluster.populate(nodes)
-            if version:
-                for node in self.cluster.nodelist():
-                    node.set_install_dir(version=version)
-                    debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir()))
+    def prepare(self, nodes=1, compression=True, version=None, protocol_version=None, install_byteman=False):
+        if version:
+            self.cluster.set_install_dir(version=version)
+            debug("Set cassandra dir to {}".format(self.cluster.get_install_dir()))
+
+        self.cluster.populate(nodes, install_byteman=install_byteman)
+
+        for n in self.cluster.nodelist():
+            remove_perf_disable_shared_mem(n)
 
-            self.cluster.start(wait_other_notice=True)
+        self.cluster.start(wait_other_notice=True)
 
         node1 = self.cluster.nodelist()[0]
         session = self.patient_cql_connection(node1, protocol_version=protocol_version)
@@ -405,13 +465,12 @@ class TestBatch(Tester):
 
         time.sleep(.5)
 
-    def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None):
-
+    def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None, install_byteman=False):
         debug("Testing with {} node(s) at version '{}', {} node(s) at current version"
               .format(previous_nodes, previous_version, current_nodes))
 
         # start a cluster using the previous version
-        self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version)
+        self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version, install_byteman=install_byteman)
 
         # then upgrade the current nodes to the current version but not the previous nodes
         for i in xrange(current_nodes):
@@ -426,18 +485,13 @@ class TestBatch(Tester):
         """
         Upgrade a node to the current version
         """
-        debug('Upgrading {}'.format(node.name))
-
-        debug('Shutting down node: ' + node.name)
-        node.drain()
-        node.watch_log_for("DRAINED")
+        debug('Upgrading {} to the current version'.format(node.name))
+        debug('Shutting down {}'.format(node.name))
         node.stop(wait_other_notice=False)
-
         self.set_node_to_current_version(node)
-        debug("Set new cassandra dir for {}: {}".format(node.name, node.get_install_dir()))
-
+        debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir()))
+        # needed for jmx
+        remove_perf_disable_shared_mem(node)
         # Restart nodes on new version
         debug('Starting {} on new version ({})'.format(node.name, node.get_cassandra_version()))
         node.start(wait_other_notice=True, wait_for_binary_proto=True)
-        debug('Upgrading sstables')
-        node.nodetool('upgradesstables -a')

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9c9560/byteman/fail_after_batchlog_write.btm
----------------------------------------------------------------------
diff --git a/byteman/fail_after_batchlog_write.btm b/byteman/fail_after_batchlog_write.btm
new file mode 100644
index 0000000..8574b00
--- /dev/null
+++ b/byteman/fail_after_batchlog_write.btm
@@ -0,0 +1,19 @@
+#
+# Inject node failure immediately after batchlog write.
+# Method signature required in 3.x to avoid pausing before legacy mutations sent
+#
+RULE skip writing batched mutations
+CLASS org.apache.cassandra.service.StorageProxy
+METHOD syncWriteBatchedMutations
+AT ENTRY
+IF TRUE
+DO return
+ENDRULE
+
+RULE skip removing from batchlog
+CLASS org.apache.cassandra.service.StorageProxy
+METHOD asyncRemoveFromBatchlog
+AT ENTRY
+IF TRUE
+DO return
+ENDRULE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org