You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/11/15 21:46:51 UTC
[23/50] cassandra git commit: Add tests for mixed version batchlog
replay
Add tests for mixed version batchlog replay
Patch by Jeff Jirsa; reviewed by Aleksey Yeschenko
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ac9c9560
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ac9c9560
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ac9c9560
Branch: refs/heads/master
Commit: ac9c95607ce439de596da41c368d79c67d6dcdda
Parents: 6d5ee37
Author: Jeff Jirsa <jj...@apple.com>
Authored: Mon Aug 14 12:55:17 2017 -0700
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Sat Aug 26 01:21:00 2017 +0100
----------------------------------------------------------------------
batch_test.py | 96 +++++++++++++++++++++++-------
byteman/fail_after_batchlog_write.btm | 19 ++++++
2 files changed, 94 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9c9560/batch_test.py
----------------------------------------------------------------------
diff --git a/batch_test.py b/batch_test.py
index 4194f10..5c25c46 100644
--- a/batch_test.py
+++ b/batch_test.py
@@ -1,6 +1,7 @@
import sys
import time
from unittest import skipIf
+from nose.tools import assert_greater_equal
from cassandra import ConsistencyLevel, Timeout, Unavailable
from cassandra.query import SimpleStatement
@@ -9,6 +10,8 @@ from dtest import Tester, create_ks, debug
from tools.assertions import (assert_all, assert_invalid, assert_one,
assert_unavailable)
from tools.decorators import since
+from tools.jmxutils import (JolokiaAgent, make_mbean,
+ remove_perf_disable_shared_mem)
class TestBatch(Tester):
@@ -295,6 +298,15 @@ class TestBatch(Tester):
self._logged_batch_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4)
@since('3.0', max_version='3.x')
+ def batchlog_replay_compatibility_1_test(self):
+ """
+ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster.
+
+ Here we have one 3.0/3.x node and two 2.2 nodes and we send the batch request to the 3.0 node.
+ """
+ self._batchlog_replay_compatibility_test(0, 1, 'github:apache/cassandra-2.2', 2, 4)
+
+ @since('3.0', max_version='3.x')
@skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
def logged_batch_compatibility_2_test(self):
"""
@@ -324,6 +336,15 @@ class TestBatch(Tester):
self._logged_batch_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4)
@since('3.0', max_version='3.x')
+ def batchlog_replay_compatibility_4_test(self):
+ """
+ @jira_ticket CASSANDRA-9673, test that logged batches still work with a mixed version cluster.
+
+ Here we have two 3.0/3.x nodes and one 2.2 node and we send the batch request to the 2.2 node.
+ """
+ self._batchlog_replay_compatibility_test(2, 2, 'github:apache/cassandra-2.2', 1, 4)
+
+ @since('3.0', max_version='3.x')
@skipIf(sys.platform == 'win32', 'Windows production support only on 2.2+')
def logged_batch_compatibility_5_test(self):
"""
@@ -346,6 +367,43 @@ class TestBatch(Tester):
res = sorted(rows)
self.assertEquals([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(res[0]), list(res[1])])
+ def _batchlog_replay_compatibility_test(self, coordinator_idx, current_nodes, previous_version, previous_nodes, protocol_version):
+ session = self.prepare_mixed(coordinator_idx, current_nodes, previous_version, previous_nodes,
+ protocol_version=protocol_version, install_byteman=True)
+
+ coordinator = self.cluster.nodelist()[coordinator_idx]
+ coordinator.byteman_submit(['./byteman/fail_after_batchlog_write.btm'])
+ debug("Injected byteman scripts to enable batchlog replay {}".format(coordinator.name))
+
+ query = """
+ BEGIN BATCH
+ INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
+ INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
+ APPLY BATCH
+ """
+ session.execute(query)
+
+ total_batches_replayed = 0
+ blm = make_mbean('db', type='BatchlogManager')
+
+ for n in self.cluster.nodelist():
+ if n == coordinator:
+ continue
+
+ with JolokiaAgent(n) as jmx:
+ debug('Forcing batchlog replay for {}'.format(n.name))
+ jmx.execute_method(blm, 'forceBatchlogReplay')
+ batches_replayed = jmx.read_attribute(blm, 'TotalBatchesReplayed')
+ debug('{} batches replayed on node {}'.format(batches_replayed, n.name))
+ total_batches_replayed += batches_replayed
+
+ assert_greater_equal(total_batches_replayed, 2)
+
+ for node in self.cluster.nodelist():
+ session = self.patient_exclusive_cql_connection(node, protocol_version=protocol_version)
+ rows = sorted(session.execute('SELECT id, firstname, lastname FROM ks.users'))
+ self.assertEqual([[0, 'Jack', 'Sparrow'], [1, 'Will', 'Turner']], [list(rows[0]), list(rows[1])])
+
def assert_timedout(self, session, query, cl, acknowledged_by=None,
received_responses=None):
try:
@@ -366,15 +424,17 @@ class TestBatch(Tester):
else:
assert False, "Expecting TimedOutException but no exception was raised"
- def prepare(self, nodes=1, compression=True, version=None, protocol_version=None):
- if not self.cluster.nodelist():
- self.cluster.populate(nodes)
- if version:
- for node in self.cluster.nodelist():
- node.set_install_dir(version=version)
- debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir()))
+ def prepare(self, nodes=1, compression=True, version=None, protocol_version=None, install_byteman=False):
+ if version:
+ self.cluster.set_install_dir(version=version)
+ debug("Set cassandra dir to {}".format(self.cluster.get_install_dir()))
+
+ self.cluster.populate(nodes, install_byteman=install_byteman)
+
+ for n in self.cluster.nodelist():
+ remove_perf_disable_shared_mem(n)
- self.cluster.start(wait_other_notice=True)
+ self.cluster.start(wait_other_notice=True)
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node1, protocol_version=protocol_version)
@@ -405,13 +465,12 @@ class TestBatch(Tester):
time.sleep(.5)
- def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None):
-
+ def prepare_mixed(self, coordinator_idx, current_nodes, previous_version, previous_nodes, compression=True, protocol_version=None, install_byteman=False):
debug("Testing with {} node(s) at version '{}', {} node(s) at current version"
.format(previous_nodes, previous_version, current_nodes))
# start a cluster using the previous version
- self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version)
+ self.prepare(previous_nodes + current_nodes, compression, previous_version, protocol_version=protocol_version, install_byteman=install_byteman)
# then upgrade the current nodes to the current version but not the previous nodes
for i in xrange(current_nodes):
@@ -426,18 +485,13 @@ class TestBatch(Tester):
"""
Upgrade a node to the current version
"""
- debug('Upgrading {}'.format(node.name))
-
- debug('Shutting down node: ' + node.name)
- node.drain()
- node.watch_log_for("DRAINED")
+ debug('Upgrading {} to the current version'.format(node.name))
+ debug('Shutting down {}'.format(node.name))
node.stop(wait_other_notice=False)
-
self.set_node_to_current_version(node)
- debug("Set new cassandra dir for {}: {}".format(node.name, node.get_install_dir()))
-
+ debug("Set cassandra dir for {} to {}".format(node.name, node.get_install_dir()))
+ # needed for jmx
+ remove_perf_disable_shared_mem(node)
# Restart nodes on new version
debug('Starting {} on new version ({})'.format(node.name, node.get_cassandra_version()))
node.start(wait_other_notice=True, wait_for_binary_proto=True)
- debug('Upgrading sstables')
- node.nodetool('upgradesstables -a')
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac9c9560/byteman/fail_after_batchlog_write.btm
----------------------------------------------------------------------
diff --git a/byteman/fail_after_batchlog_write.btm b/byteman/fail_after_batchlog_write.btm
new file mode 100644
index 0000000..8574b00
--- /dev/null
+++ b/byteman/fail_after_batchlog_write.btm
@@ -0,0 +1,19 @@
+#
+# Inject node failure immediately after batchlog write.
+# Method signature required in 3.x to avoid pausing before legacy mutations sent
+#
+RULE skip writing batched mutations
+CLASS org.apache.cassandra.service.StorageProxy
+METHOD syncWriteBatchedMutations
+AT ENTRY
+IF TRUE
+DO return
+ENDRULE
+
+RULE skip removing from batchlog
+CLASS org.apache.cassandra.service.StorageProxy
+METHOD asyncRemoveFromBatchlog
+AT ENTRY
+IF TRUE
+DO return
+ENDRULE
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org