You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/29 21:10:35 UTC
[32/36] cassandra-dtest git commit: Migrate dtests to use pytest and
python3
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/conftest.py
----------------------------------------------------------------------
diff --git a/conftest.py b/conftest.py
new file mode 100644
index 0000000..650e9c8
--- /dev/null
+++ b/conftest.py
@@ -0,0 +1,489 @@
+import pytest
+import logging
+import os
+import shutil
+import time
+import re
+import platform
+import copy
+import inspect
+import subprocess
+
+from dtest import running_in_docker, cleanup_docker_environment_before_test_execution
+
+from datetime import datetime
+from distutils.version import LooseVersion
+from netifaces import AF_INET
+from psutil import virtual_memory
+
+import netifaces as ni
+
+from ccmlib.common import validate_install_dir, get_version_from_build, is_win
+
+from dtest_setup import DTestSetup
+from dtest_setup_overrides import DTestSetupOverrides
+
+logger = logging.getLogger(__name__)
+
+
+class DTestConfig:
+ def __init__(self):
+ self.use_vnodes = True
+ self.use_off_heap_memtables = False
+ self.num_tokens = -1
+ self.data_dir_count = -1
+ self.force_execution_of_resource_intensive_tests = False
+ self.skip_resource_intensive_tests = False
+ self.cassandra_dir = None
+ self.cassandra_version = None
+ self.delete_logs = False
+ self.execute_upgrade_tests = False
+ self.disable_active_log_watching = False
+ self.keep_test_dir = False
+ self.enable_jacoco_code_coverage = False
+ self.jemalloc_path = find_libjemalloc()
+
+ def setup(self, request):
+ self.use_vnodes = request.config.getoption("--use-vnodes")
+ self.use_off_heap_memtables = request.config.getoption("--use-off-heap-memtables")
+ self.num_tokens = request.config.getoption("--num-tokens")
+ self.data_dir_count = request.config.getoption("--data-dir-count-per-instance")
+ self.force_execution_of_resource_intensive_tests = request.config.getoption("--force-resource-intensive-tests")
+ self.skip_resource_intensive_tests = request.config.getoption("--skip-resource-intensive-tests")
+ if request.config.getoption("--cassandra-dir") is not None:
+ self.cassandra_dir = os.path.expanduser(request.config.getoption("--cassandra-dir"))
+ self.cassandra_version = request.config.getoption("--cassandra-version")
+ self.delete_logs = request.config.getoption("--delete-logs")
+ self.execute_upgrade_tests = request.config.getoption("--execute-upgrade-tests")
+ self.disable_active_log_watching = request.config.getoption("--disable-active-log-watching")
+ self.keep_test_dir = request.config.getoption("--keep-test-dir")
+ self.enable_jacoco_code_coverage = request.config.getoption("--enable-jacoco-code-coverage")
+
+
+def check_required_loopback_interfaces_available():
+ """
+ We need at least 3 loopback interfaces configured to run almost all dtests. On Linux, loopback
+ interfaces are automatically created as they are used, but on Mac they need to be explicitly
+ created. Check if we're running on Mac (Darwin), and if so check we have at least 3 loopback
+ interfaces available, otherwise bail out so we don't run the tests in a known bad config and
+ give the user some helpful advice on how to get their machine into a good known config
+ """
+ if platform.system() == "Darwin":
+ if len(ni.ifaddresses('lo0')[AF_INET]) < 9:
+ pytest.exit("At least 9 loopback interfaces are required to run dtests. "
+ "On Mac you can create the required loopback interfaces by running "
+ "'for i in {1..9}; do sudo ifconfig lo0 alias 127.0.0.$i up; done;'")
+
+
+def pytest_addoption(parser):
+ parser.addoption("--use-vnodes", action="store_true", default=False,
+ help="Determines wither or not to setup clusters using vnodes for tests")
+ parser.addoption("--use-off-heap-memtables", action="store_true", default=False,
+ help="Enable Off Heap Memtables when creating test clusters for tests")
+ parser.addoption("--num-tokens", action="store", default=256,
+ help="Number of tokens to set num_tokens yaml setting to when creating instances "
+ "with vnodes enabled")
+ parser.addoption("--data-dir-count-per-instance", action="store", default=3,
+ help="Control the number of data directories to create per instance")
+ parser.addoption("--force-resource-intensive-tests", action="store_true", default=False,
+ help="Forces the execution of tests marked as resource_intensive")
+ parser.addoption("--skip-resource-intensive-tests", action="store_true", default=False,
+ help="Skip all tests marked as resource_intensive")
+ parser.addoption("--cassandra-dir", action="store", default=None,
+ help="The directory containing the built C* artifacts to run the tests against. "
+ "(e.g. the path to the root of a cloned C* git directory. Before executing dtests using "
+ "this directory you must build C* with 'ant clean jar'). If you're doing C* development and "
+ "want to run the tests this is almost always going to be the correct option.")
+ parser.addoption("--cassandra-version", action="store", default=None,
+ help="A specific C* version to run the dtests against. The dtest framework will "
+ "pull the required artifacts for this version.")
+ parser.addoption("--delete-logs", action="store_true", default=False,
+ help="Delete all generated logs created by a test after the completion of a test.")
+ parser.addoption("--execute-upgrade-tests", action="store_true", default=False,
+ help="Execute Cassandra Upgrade Tests (e.g. tests annotated with the upgrade_test mark)")
+ parser.addoption("--disable-active-log-watching", action="store_true", default=False,
+ help="Disable ccm active log watching, which will cause dtests to check for errors in the "
+ "logs in a single operation instead of semi-realtime processing by consuming "
+ "ccm _log_error_handler callbacks")
+ parser.addoption("--keep-test-dir", action="store_true", default=False,
+ help="Do not remove/cleanup the test ccm cluster directory and it's artifacts "
+ "after the test completes")
+ parser.addoption("--enable-jacoco-code-coverage", action="store_true", default=False,
+ help="Enable JaCoCo Code Coverage Support")
+
+
+def sufficient_system_resources_for_resource_intensive_tests():
+ mem = virtual_memory()
+ total_mem_gb = mem.total/1024/1024/1024
+ logger.info("total available system memory is %dGB" % total_mem_gb)
+ # todo kjkj: do not hard code our bound.. for now just do 9 instances at 3gb a piece
+ return total_mem_gb >= 9*3
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_dtest_setup_overrides():
+ """
+ no-op default implementation of fixture_dtest_setup_overrides.
+ we run this when a test class hasn't implemented their own
+ fixture_dtest_setup_overrides
+ """
+ return DTestSetupOverrides()
+
+
+"""
+Not exactly sure why :\ but, this fixture needs to be scoped to function level and not
+session or class. If you invoke pytest with tests across multiple test classes, when scopped
+at session, the root logger appears to get reset between each test class invocation.
+this means that the first test to run not from the first test class (and all subsequent
+tests), will have the root logger reset and see a level of NOTSET. Scoping it at the
+class level seems to work, and I guess it's not that much extra overhead to setup the
+logger once per test class vs. once per session in the grand scheme of things.
+"""
+@pytest.fixture(scope="function", autouse=True)
+def fixture_logging_setup(request):
+ # set the root logger level to whatever the user asked for
+ # all new loggers created will use the root logger as a template
+ # essentially making this the "default" active log level
+ log_level = logging.INFO
+ try:
+ # first see if logging level overridden by user as command line argument
+ log_level_from_option = pytest.config.getoption("--log-level")
+ if log_level_from_option is not None:
+ log_level = logging.getLevelName(log_level_from_option)
+ else:
+ raise ValueError
+ except ValueError:
+ # nope, user didn't specify it as a command line argument to pytest, check if
+ # we have a default in the loaded pytest.ini. Note: words are seperated in variables
+ # in .ini land with a "_" while the command line arguments use "-"
+ if pytest.config.inicfg.get("log_level") is not None:
+ log_level = logging.getLevelName(pytest.config.inicfg.get("log_level"))
+
+ logging.root.setLevel(log_level)
+
+ logging_format = None
+ try:
+ # first see if logging level overridden by user as command line argument
+ log_format_from_option = pytest.config.getoption("--log-format")
+ if log_format_from_option is not None:
+ logging_format = log_format_from_option
+ else:
+ raise ValueError
+ except ValueError:
+ if pytest.config.inicfg.get("log_format") is not None:
+ logging_format = pytest.config.inicfg.get("log_format")
+
+ logging.basicConfig(level=log_level,
+ format=logging_format)
+
+ # next, regardless of the level we set above (and requested by the user),
+ # reconfigure the "cassandra" logger to minimum INFO level to override the
+ # logging level that the "cassandra.*" imports should use; DEBUG is just
+ # insanely noisy and verbose, with the extra logging of very limited help
+ # in the context of dtest execution
+ if log_level == logging.DEBUG:
+ cassandra_module_log_level = logging.INFO
+ else:
+ cassandra_module_log_level = log_level
+ logging.getLogger("cassandra").setLevel(cassandra_module_log_level)
+
+
+@pytest.fixture(scope="session")
+def log_global_env_facts(fixture_dtest_config):
+ if pytest.config.pluginmanager.hasplugin('junitxml'):
+ my_junit = getattr(pytest.config, '_xml', None)
+ my_junit.add_global_property('USE_VNODES', fixture_dtest_config.use_vnodes)
+
+
+@pytest.fixture
+def fixture_dtest_config(request, fixture_logging_setup):
+ # although we don't use fixture_logging_setup here, we do want to
+ # have that fixture run as a prerequisite to this one.. and right now
+ # this is the only way that can be done with pytests
+ dtest_config = DTestConfig()
+ dtest_config.setup(request)
+ return dtest_config
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_maybe_skip_tests_requiring_novnodes(request):
+ """
+ Fixture run before the start of every test function that checks if the test is marked with
+ the no_vnodes annotation but the tests were started with a configuration that
+ has vnodes enabled. This should always be a no-op as we explicitly deselect tests
+ in pytest_collection_modifyitems that match this configuration -- but this is explicit :)
+ """
+ if request.node.get_marker('no_vnodes'):
+ if request.config.getoption("--use-vnodes"):
+ pytest.skip("Skipping test marked with no_vnodes as tests executed with vnodes enabled via the "
+ "--use-vnodes command line argument")
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_log_test_name_and_date(request):
+ logger.info("Starting execution of %s at %s" % (request.node.name, str(datetime.now())))
+
+
+def _filter_errors(dtest_setup, errors):
+ """Filter errors, removing those that match ignore_log_patterns in the current DTestSetup"""
+ for e in errors:
+ for pattern in dtest_setup.ignore_log_patterns:
+ if re.search(pattern, repr(e)):
+ break
+ else:
+ yield e
+
+
+def check_logs_for_errors(dtest_setup):
+ errors = []
+ for node in dtest_setup.cluster.nodelist():
+ errors = list(_filter_errors(dtest_setup, ['\n'.join(msg) for msg in node.grep_log_for_errors()]))
+ if len(errors) is not 0:
+ for error in errors:
+ if isinstance(error, (bytes, bytearray)):
+ error_str = error.decode("utf-8").strip()
+ else:
+ error_str = error.strip()
+
+ if error_str:
+ logger.error("Unexpected error in {node_name} log, error: \n{error}"
+ .format(node_name=node.name, error=error_str))
+ errors.append(error_str)
+ break
+ return errors
+
+
+def copy_logs(request, cluster, directory=None, name=None):
+ """Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'"""
+ log_saved_dir = "logs"
+ try:
+ os.mkdir(log_saved_dir)
+ except OSError:
+ pass
+
+ if directory is None:
+ directory = log_saved_dir
+ if name is None:
+ name = os.path.join(log_saved_dir, "last")
+ else:
+ name = os.path.join(directory, name)
+ if not os.path.exists(directory):
+ os.mkdir(directory)
+ logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), node.compactionlogfilename())
+ for node in list(cluster.nodes.values())]
+ if len(logs) is not 0:
+ basedir = str(int(time.time() * 1000)) + '_' + request.node.name
+ logdir = os.path.join(directory, basedir)
+ os.mkdir(logdir)
+ for n, log, debuglog, gclog, compactionlog in logs:
+ if os.path.exists(log):
+ assert os.path.getsize(log) >= 0
+ shutil.copyfile(log, os.path.join(logdir, n + ".log"))
+ if os.path.exists(debuglog):
+ assert os.path.getsize(debuglog) >= 0
+ shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log"))
+ if os.path.exists(gclog):
+ assert os.path.getsize(gclog) >= 0
+ shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
+ if os.path.exists(compactionlog):
+ assert os.path.getsize(compactionlog) >= 0
+ shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log"))
+ if os.path.exists(name):
+ os.unlink(name)
+ if not is_win():
+ os.symlink(basedir, name)
+
+
+def reset_environment_vars(initial_environment):
+ pytest_current_test = os.environ.get('PYTEST_CURRENT_TEST')
+ os.environ.clear()
+ os.environ.update(initial_environment)
+ os.environ['PYTEST_CURRENT_TEST'] = pytest_current_test
+
+
+
+
+
+@pytest.fixture(scope='function', autouse=False)
+def fixture_dtest_setup(request, parse_dtest_config, fixture_dtest_setup_overrides, fixture_logging_setup):
+ if running_in_docker():
+ cleanup_docker_environment_before_test_execution()
+
+ # do all of our setup operations to get the enviornment ready for the actual test
+ # to run (e.g. bring up a cluster with the necessary config, populate variables, etc)
+ initial_environment = copy.deepcopy(os.environ)
+ dtest_setup = DTestSetup(dtest_config=parse_dtest_config, setup_overrides=fixture_dtest_setup_overrides)
+ dtest_setup.initialize_cluster()
+
+ if not parse_dtest_config.disable_active_log_watching:
+ dtest_setup.log_watch_thread = dtest_setup.begin_active_log_watch()
+
+ # at this point we're done with our setup operations in this fixture
+ # yield to allow the actual test to run
+ yield dtest_setup
+
+ # phew! we're back after executing the test, now we need to do
+ # all of our teardown and cleanup operations
+
+ reset_environment_vars(initial_environment)
+ dtest_setup.jvm_args = []
+
+ for con in dtest_setup.connections:
+ con.cluster.shutdown()
+ dtest_setup.connections = []
+
+ failed = False
+ try:
+ if not dtest_setup.allow_log_errors:
+ errors = check_logs_for_errors(dtest_setup)
+ if len(errors) > 0:
+ failed = True
+ pytest.fail(msg='Unexpected error found in node logs (see stdout for full details). Errors: [{errors}]'
+ .format(errors=str.join(", ", errors)), pytrace=False)
+ finally:
+ try:
+ # save the logs for inspection
+ if failed or not parse_dtest_config.delete_logs:
+ copy_logs(request, dtest_setup.cluster)
+ except Exception as e:
+ logger.error("Error saving log:", str(e))
+ finally:
+ dtest_setup.cleanup_cluster()
+
+
+def _skip_msg(current_running_version, since_version, max_version):
+ if current_running_version < since_version:
+ return "%s < %s" % (current_running_version, since_version)
+ if max_version and current_running_version > max_version:
+ return "%s > %s" % (current_running_version, max_version)
+
+
+@pytest.fixture(autouse=True)
+def fixture_since(request, fixture_dtest_setup):
+ if request.node.get_marker('since'):
+ max_version_str = request.node.get_marker('since').kwargs.get('max_version', None)
+ max_version = None
+ if max_version_str:
+ max_version = LooseVersion(max_version_str)
+
+ since_str = request.node.get_marker('since').args[0]
+ since = LooseVersion(since_str)
+ current_running_version = fixture_dtest_setup.cluster.version()
+ skip_msg = _skip_msg(current_running_version, since, max_version)
+ if skip_msg:
+ pytest.skip(skip_msg)
+
+
+@pytest.fixture(scope='session', autouse=True)
+def install_debugging_signal_handler():
+ import faulthandler
+ faulthandler.enable()
+
+
+@pytest.fixture(scope='function')
+def parse_dtest_config(request):
+ dtest_config = DTestConfig()
+ dtest_config.setup(request)
+
+ # if we're on mac, check that we have the required loopback interfaces before doing anything!
+ check_required_loopback_interfaces_available()
+
+ try:
+ if dtest_config.cassandra_dir is not None:
+ validate_install_dir(dtest_config.cassandra_dir)
+ except Exception as e:
+ pytest.exit("{}. Did you remember to build C*? ('ant clean jar')".format(e))
+
+ yield dtest_config
+
+
+def pytest_collection_modifyitems(items, config):
+ """
+ This function is called upon during the pytest test collection phase and allows for modification
+ of the test items within the list
+ """
+ if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None:
+ if config.getoption("--cassandra-version") is None:
+ raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir "
+ "or --cassandra-version. Refer to the documentation or invoke the help with --help.")
+
+ selected_items = []
+ deselected_items = []
+
+ sufficient_system_resources_resource_intensive = sufficient_system_resources_for_resource_intensive_tests()
+ logger.debug("has sufficient resources? %s" % sufficient_system_resources_resource_intensive)
+
+ for item in items:
+ # set a timeout for all tests, it may be overwritten at the test level with an additional marker
+ if not item.get_marker("timeout"):
+ item.add_marker(pytest.mark.timeout(60*15))
+
+ deselect_test = False
+
+ if item.get_marker("resource_intensive"):
+ if config.getoption("--force-resource-intensive-tests"):
+ pass
+ if config.getoption("--skip-resource-intensive-tests"):
+ deselect_test = True
+ logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of "
+ "this test re-run with the --force-resource-intensive-tests command line argument" % item.name)
+ if not sufficient_system_resources_resource_intensive:
+ deselect_test = True
+ logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name)
+
+ if item.get_marker("no_vnodes"):
+ if config.getoption("--use-vnodes"):
+ deselect_test = True
+ logger.info("SKIP: Deselecting test %s as the test requires vnodes to be disabled. To run this test, "
+ "re-run without the --use-vnodes command line argument" % item.name)
+
+ if item.get_marker("vnodes"):
+ if not config.getoption("--use-vnodes"):
+ deselect_test = True
+ logger.info("SKIP: Deselecting test %s as the test requires vnodes to be enabled. To run this test, "
+ "re-run with the --use-vnodes command line argument" % item.name)
+
+ for test_item_class in inspect.getmembers(item.module, inspect.isclass):
+ if not hasattr(test_item_class[1], "pytestmark"):
+ continue
+
+ for module_pytest_mark in test_item_class[1].pytestmark:
+ if module_pytest_mark.name == "upgrade_test":
+ if not config.getoption("--execute-upgrade-tests"):
+ deselect_test = True
+
+ if item.get_marker("upgrade_test"):
+ if not config.getoption("--execute-upgrade-tests"):
+ deselect_test = True
+
+ # todo kjkj: deal with no_offheap_memtables mark
+
+ if deselect_test:
+ deselected_items.append(item)
+ else:
+ selected_items.append(item)
+
+ config.hook.pytest_deselected(items=deselected_items)
+ items[:] = selected_items
+
+
+# Determine the location of the libjemalloc jar so that we can specify it
+# through environment variables when start Cassandra. This reduces startup
+# time, making the dtests run faster.
+def find_libjemalloc():
+ if is_win():
+ # let the normal bat script handle finding libjemalloc
+ return ""
+
+ this_dir = os.path.dirname(os.path.realpath(__file__))
+ script = os.path.join(this_dir, "findlibjemalloc.sh")
+ try:
+ p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if stderr or not stdout:
+ return "-" # tells C* not to look for libjemalloc
+ else:
+ return stdout
+ except Exception as exc:
+ print("Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc))
+ return ""
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistency_test.py
----------------------------------------------------------------------
diff --git a/consistency_test.py b/consistency_test.py
index 2eaa2ca..368dba0 100644
--- a/consistency_test.py
+++ b/consistency_test.py
@@ -1,36 +1,30 @@
-import Queue
+import queue
import sys
import threading
import time
+import pytest
+import logging
from collections import OrderedDict, namedtuple
from copy import deepcopy
from cassandra import ConsistencyLevel, consistency_value_to_name
from cassandra.query import SimpleStatement
-from nose.plugins.attrib import attr
-from nose.tools import assert_greater_equal
from tools.assertions import (assert_all, assert_length_equal, assert_none,
assert_unavailable)
-from dtest import DISABLE_VNODES, MultiError, Tester, debug, create_ks, create_cf
+from dtest import MultiError, Tester, create_ks, create_cf
from tools.data import (create_c1c2_table, insert_c1c2, insert_columns,
query_c1c2, rows_to_list)
-from tools.decorators import since
from tools.jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
ExpectedConsistency = namedtuple('ExpectedConsistency', ('num_write_nodes', 'num_read_nodes', 'is_strong'))
class TestHelper(Tester):
- def __init__(self, *args, **kwargs):
- Tester.__init__(self, *args, **kwargs)
- self.lock = threading.Lock()
-
- def log(self, message):
- with self.lock:
- debug(message)
-
def _is_local(self, cl):
return (cl == ConsistencyLevel.LOCAL_QUORUM or
cl == ConsistencyLevel.LOCAL_ONE or
@@ -51,12 +45,12 @@ class TestHelper(Tester):
ConsistencyLevel.ONE: 1,
ConsistencyLevel.TWO: 2,
ConsistencyLevel.THREE: 3,
- ConsistencyLevel.QUORUM: sum(rf_factors) / 2 + 1,
+ ConsistencyLevel.QUORUM: sum(rf_factors) // 2 + 1,
ConsistencyLevel.ALL: sum(rf_factors),
- ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] / 2 + 1,
- ConsistencyLevel.EACH_QUORUM: rf_factors[dc] / 2 + 1,
- ConsistencyLevel.SERIAL: sum(rf_factors) / 2 + 1,
- ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] / 2 + 1,
+ ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] // 2 + 1,
+ ConsistencyLevel.EACH_QUORUM: rf_factors[dc] // 2 + 1,
+ ConsistencyLevel.SERIAL: sum(rf_factors) // 2 + 1,
+ ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] // 2 + 1,
ConsistencyLevel.LOCAL_ONE: 1,
}[cl]
@@ -73,7 +67,7 @@ class TestHelper(Tester):
:return: the data center corresponding to this node
"""
dc = 0
- for i in xrange(1, len(nodes)):
+ for i in range(1, len(nodes)):
if idx < sum(nodes[:i]):
break
dc += 1
@@ -101,7 +95,7 @@ class TestHelper(Tester):
if self._is_local(cl):
return num_nodes_alive[current] >= self._required_nodes(cl, rf_factors, current)
elif cl == ConsistencyLevel.EACH_QUORUM:
- for i in xrange(0, len(rf_factors)):
+ for i in range(0, len(rf_factors)):
if num_nodes_alive[i] < self._required_nodes(cl, rf_factors, i):
return False
return True
@@ -132,7 +126,7 @@ class TestHelper(Tester):
# StorageProxy.getLiveSortedEndpoints(), which is called by the AbstractReadExecutor
# to determine the target replicas. The default case, a SimpleSnitch wrapped in
# a dynamic snitch, may rarely choose a different replica.
- debug('Changing snitch for single dc case')
+ logger.debug('Changing snitch for single dc case')
for node in cluster.nodelist():
node.data_center = 'dc1'
cluster.set_configuration_options(values={
@@ -208,7 +202,7 @@ class TestHelper(Tester):
expected = [[userid, age]] if age else []
ret = rows_to_list(res) == expected
if check_ret:
- self.assertTrue(ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency)))
+ assert ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency))
return ret
def create_counters_table(self, session, requires_local_reads):
@@ -233,10 +227,10 @@ class TestHelper(Tester):
statement = SimpleStatement("SELECT * from counters WHERE id = {}".format(id), consistency_level=consistency)
ret = rows_to_list(session.execute(statement))
if check_ret:
- self.assertEqual(ret[0][1], val, "Got {} from {}, expected {} at {}".format(ret[0][1],
+ assert ret[0][1] == val, "Got {} from {}, expected {} at {}".format(ret[0][1],
session.cluster.contact_points,
val,
- consistency_value_to_name(consistency)))
+ consistency_value_to_name(consistency))
return ret[0][1] if ret else 0
@@ -255,8 +249,8 @@ class TestAvailability(TestHelper):
rf = self.rf
num_alive = nodes
- for node in xrange(nodes):
- debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive))
+ for node in range(nodes):
+ logger.debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive))
session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname)
for combination in combinations:
self._test_insert_query_from_node(session, 0, [rf], [num_alive], *combination)
@@ -274,12 +268,12 @@ class TestAvailability(TestHelper):
rf = self.rf
nodes_alive = deepcopy(nodes)
- rf_factors = rf.values()
+ rf_factors = list(rf.values())
- for i in xrange(0, len(nodes)): # for each dc
- self.log('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive))
- for n in xrange(nodes[i]): # for each node in this dc
- self.log('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive))
+ for i in range(0, len(nodes)): # for each dc
+ logger.debug('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive))
+ for n in range(nodes[i]): # for each node in this dc
+ logger.debug('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive))
node = n + sum(nodes[:i])
session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname)
for combination in combinations:
@@ -292,7 +286,7 @@ class TestAvailability(TestHelper):
"""
Test availability for read and write via the session passed in as a parameter.
"""
- self.log("Connected to %s for %s/%s/%s" %
+ logger.debug("Connected to %s for %s/%s/%s" %
(session.cluster.contact_points, consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl)))
start = 0
@@ -300,13 +294,13 @@ class TestAvailability(TestHelper):
age = 30
if self._should_succeed(write_cl, rf_factors, num_nodes_alive, dc_idx):
- for n in xrange(start, end):
+ for n in range(start, end):
self.insert_user(session, n, age, write_cl, serial_cl)
else:
assert_unavailable(self.insert_user, session, end, age, write_cl, serial_cl)
if self._should_succeed(read_cl, rf_factors, num_nodes_alive, dc_idx):
- for n in xrange(start, end):
+ for n in range(start, end):
self.query_user(session, n, age, read_cl, check_ret)
else:
assert_unavailable(self.query_user, session, end, age, read_cl, check_ret)
@@ -361,7 +355,7 @@ class TestAvailability(TestHelper):
self._test_simple_strategy(combinations)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
def test_network_topology_strategy(self):
"""
Test for multiple datacenters, using network topology replication strategy.
@@ -393,7 +387,7 @@ class TestAvailability(TestHelper):
self._test_network_topology_strategy(combinations)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
@since("3.0")
def test_network_topology_strategy_each_quorum(self):
"""
@@ -432,7 +426,7 @@ class TestAccuracy(TestHelper):
self.read_cl = read_cl
self.serial_cl = serial_cl
- outer.log('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})'
+ logger.debug('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})'
.format(consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl), start, end - 1))
def get_expected_consistency(self, idx):
@@ -459,12 +453,10 @@ class TestAccuracy(TestHelper):
for s in sessions:
if outer.query_user(s, n, val, read_cl, check_ret=expected_consistency.is_strong):
num += 1
- assert_greater_equal(num, expected_consistency.num_write_nodes,
- "Failed to read value from sufficient number of nodes,"
- " required {} but got {} - [{}, {}]"
- .format(expected_consistency.num_write_nodes, num, n, val))
+ assert num >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes," + \
+ " required {} but got {} - [{}, {}]".format(expected_consistency.num_write_nodes, num, n, val)
- for n in xrange(start, end):
+ for n in range(start, end):
age = 30
for s in range(0, len(sessions)):
outer.insert_user(sessions[s], n, age, write_cl, serial_cl)
@@ -499,12 +491,10 @@ class TestAccuracy(TestHelper):
for s in sessions:
results.append(outer.query_counter(s, n, val, read_cl, check_ret=expected_consistency.is_strong))
- assert_greater_equal(results.count(val), expected_consistency.num_write_nodes,
- "Failed to read value from sufficient number of nodes, required {} nodes to have a"
- " counter value of {} at key {}, instead got these values: {}"
- .format(expected_consistency.num_write_nodes, val, n, results))
+ assert results.count(val) >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes, required {} nodes to have a" + \
+ " counter value of {} at key {}, instead got these values: {}".format(expected_consistency.num_write_nodes, val, n, results)
- for n in xrange(start, end):
+ for n in range(start, end):
c = 1
for s in range(0, len(sessions)):
outer.update_counter(sessions[s], n, write_cl, serial_cl)
@@ -534,15 +524,15 @@ class TestAccuracy(TestHelper):
self._start_cluster(save_sessions=True, requires_local_reads=requires_local_reads)
- input_queue = Queue.Queue()
- exceptions_queue = Queue.Queue()
+ input_queue = queue.Queue()
+ exceptions_queue = queue.Queue()
def run():
while not input_queue.empty():
try:
v = TestAccuracy.Validation(self, self.sessions, nodes, rf_factors, *input_queue.get(block=False))
valid_fcn(v)
- except Queue.Empty:
+ except queue.Empty:
pass
except Exception:
exceptions_queue.put(sys.exc_info())
@@ -560,17 +550,17 @@ class TestAccuracy(TestHelper):
t.start()
threads.append(t)
- self.log("Waiting for workers to complete")
+ logger.debug("Waiting for workers to complete")
while exceptions_queue.empty():
time.sleep(0.1)
- if len(filter(lambda t: t.isAlive(), threads)) == 0:
+ if len([t for t in threads if t.isAlive()]) == 0:
break
if not exceptions_queue.empty():
- _, exceptions, tracebacks = zip(*exceptions_queue.queue)
+ _, exceptions, tracebacks = list(zip(*exceptions_queue.queue))
raise MultiError(exceptions=exceptions, tracebacks=tracebacks)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
def test_simple_strategy_users(self):
"""
Test for a single datacenter, users table, only the each quorum reads.
@@ -599,10 +589,10 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.QUORUM, ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL),
]
- self.log("Testing single dc, users")
+ logger.debug("Testing single dc, users")
self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
@since("3.0")
def test_simple_strategy_each_quorum_users(self):
"""
@@ -617,10 +607,10 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
]
- self.log("Testing single dc, users, each quorum reads")
+ logger.debug("Testing single dc, users, each quorum reads")
self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
def test_network_topology_strategy_users(self):
"""
Test for multiple datacenters, users table.
@@ -653,10 +643,10 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL),
]
- self.log("Testing multiple dcs, users")
- self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations),
+ logger.debug("Testing multiple dcs, users")
+ self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations),
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
@since("3.0")
def test_network_topology_strategy_each_quorum_users(self):
"""
@@ -672,8 +662,8 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
]
- self.log("Testing multiple dcs, users, each quorum reads")
- self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations)
+ logger.debug("Testing multiple dcs, users, each quorum reads")
+ self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations)
def test_simple_strategy_counters(self):
"""
@@ -700,7 +690,7 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_QUORUM),
]
- self.log("Testing single dc, counters")
+ logger.debug("Testing single dc, counters")
self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations)
@since("3.0")
@@ -718,10 +708,10 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
]
- self.log("Testing single dc, counters, each quorum reads")
+ logger.debug("Testing single dc, counters, each quorum reads")
self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations)
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
def test_network_topology_strategy_counters(self):
"""
Test for multiple datacenters, counters table.
@@ -749,10 +739,10 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.TWO, ConsistencyLevel.ONE),
]
- self.log("Testing multiple dcs, counters")
- self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations),
+ logger.debug("Testing multiple dcs, counters")
+ self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations),
- @attr("resource-intensive")
+ @pytest.mark.resource_intensive
@since("3.0")
def test_network_topology_strategy_each_quorum_counters(self):
"""
@@ -768,8 +758,8 @@ class TestAccuracy(TestHelper):
(ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
]
- self.log("Testing multiple dcs, counters, each quorum reads")
- self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations),
+ logger.debug("Testing multiple dcs, counters, each quorum reads")
+ self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations),
class TestConsistency(Tester):
@@ -1105,7 +1095,7 @@ class TestConsistency(Tester):
srp = make_mbean('metrics', type='Table', name='ShortReadProtectionRequests', keyspace='test', scope='test')
with JolokiaAgent(node1) as jmx:
# 4 srp requests for node1 and 5 for node2, total of 9
- self.assertEqual(9, jmx.read_attribute(srp, 'Count'))
+ assert 9 == jmx.read_attribute(srp, 'Count')
@since('3.0')
def test_12872(self):
@@ -1174,12 +1164,17 @@ class TestConsistency(Tester):
[[0], [4]],
cl=ConsistencyLevel.ALL)
- def short_read_test(self):
+ def test_short_read(self):
"""
@jira_ticket CASSANDRA-9460
"""
cluster = self.cluster
+ # this test causes the python driver to be extremely noisy due to
+ # frequent starting and stopping of nodes. let's move the log level
+ # of the driver to ERROR for this test only
+ logging.getLogger("cassandra").setLevel('ERROR')
+
# Disable hinted handoff and set batch commit log so this doesn't
# interfer with the test
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
@@ -1196,13 +1191,13 @@ class TestConsistency(Tester):
reversed_key = 'reversed'
# Repeat this test 10 times to make it more easy to spot a null pointer exception caused by a race, see CASSANDRA-9460
- for k in xrange(10):
+ for k in range(10):
# insert 9 columns in two rows
insert_columns(self, session, normal_key, 9)
insert_columns(self, session, reversed_key, 9)
# Delete 3 first columns (and 3 last columns, for the reversed version) with a different node dead each time
- for node, column_number_to_delete in zip(range(1, 4), range(3)):
+ for node, column_number_to_delete in zip(list(range(1, 4)), list(range(3))):
self.stop_node(node)
self.delete(node, normal_key, column_number_to_delete)
self.delete(node, reversed_key, 8 - column_number_to_delete)
@@ -1218,8 +1213,8 @@ class TestConsistency(Tester):
assert_length_equal(res, 3)
# value 0, 1 and 2 have been deleted
- for i in xrange(1, 4):
- self.assertEqual('value{}'.format(i + 2), res[i - 1][1])
+ for i in range(1, 4):
+ assert 'value{}'.format(i + 2) == res[i - 1][1]
# Query 3 firsts columns in reverse order
session = self.patient_cql_connection(node1, 'ks')
@@ -1231,12 +1226,12 @@ class TestConsistency(Tester):
assert_length_equal(res, 3)
# value 6, 7 and 8 have been deleted
- for i in xrange(0, 3):
- self.assertEqual('value{}'.format(5 - i), res[i][1])
+ for i in range(0, 3):
+ assert 'value{}'.format(5 - i) == res[i][1]
session.execute('TRUNCATE cf')
- def short_read_delete_test(self):
+ def test_short_read_delete(self):
""" Test short reads ultimately leaving no columns alive [#4000] """
cluster = self.cluster
@@ -1269,7 +1264,7 @@ class TestConsistency(Tester):
assert_none(session, "SELECT c, v FROM cf WHERE key=\'k0\' LIMIT 1", cl=ConsistencyLevel.QUORUM)
- def short_read_quorum_delete_test(self):
+ def test_short_read_quorum_delete(self):
"""
@jira_ticket CASSANDRA-8933
"""
@@ -1311,11 +1306,11 @@ class TestConsistency(Tester):
node3.stop(wait_other_notice=True)
assert_none(session, "SELECT * FROM t WHERE id = 0 LIMIT 1", cl=ConsistencyLevel.QUORUM)
- def readrepair_test(self):
+ def test_readrepair(self):
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
- if DISABLE_VNODES:
+ if not self.dtest_config.use_vnodes:
cluster.populate(2).start()
else:
tokens = cluster.balanced_tokens(2)
@@ -1333,43 +1328,43 @@ class TestConsistency(Tester):
node2.start(wait_for_binary_proto=True, wait_other_notice=True)
# query everything to cause RR
- for n in xrange(0, 10000):
+ for n in range(0, 10000):
query_c1c2(session, n, ConsistencyLevel.QUORUM)
node1.stop(wait_other_notice=True)
# Check node2 for all the keys that should have been repaired
session = self.patient_cql_connection(node2, keyspace='ks')
- for n in xrange(0, 10000):
+ for n in range(0, 10000):
query_c1c2(session, n, ConsistencyLevel.ONE)
- def quorum_available_during_failure_test(self):
- CL = ConsistencyLevel.QUORUM
- RF = 3
+ def test_quorum_available_during_failure(self):
+ cl = ConsistencyLevel.QUORUM
+ rf = 3
- debug("Creating a ring")
+ logger.debug("Creating a ring")
cluster = self.cluster
- if DISABLE_VNODES:
+ if not self.dtest_config.use_vnodes:
cluster.populate(3).start()
else:
tokens = cluster.balanced_tokens(3)
cluster.populate(3, tokens=tokens).start()
node1, node2, node3 = cluster.nodelist()
- debug("Set to talk to node 2")
+ logger.debug("Set to talk to node 2")
session = self.patient_cql_connection(node2)
- create_ks(session, 'ks', RF)
+ create_ks(session, 'ks', rf)
create_c1c2_table(self, session)
- debug("Generating some data")
- insert_c1c2(session, n=100, consistency=CL)
+ logger.debug("Generating some data")
+ insert_c1c2(session, n=100, consistency=cl)
- debug("Taking down node1")
+ logger.debug("Taking down node1")
node1.stop(wait_other_notice=True)
- debug("Reading back data.")
- for n in xrange(100):
- query_c1c2(session, n, CL)
+ logger.debug("Reading back data.")
+ for n in range(100):
+ query_c1c2(session, n, cl)
def stop_node(self, node_number):
to_stop = self.cluster.nodes["node%d" % node_number]
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistent_bootstrap_test.py
----------------------------------------------------------------------
diff --git a/consistent_bootstrap_test.py b/consistent_bootstrap_test.py
index ada9b39..24626f3 100644
--- a/consistent_bootstrap_test.py
+++ b/consistent_bootstrap_test.py
@@ -1,92 +1,100 @@
+import pytest
+import logging
+
from cassandra import ConsistencyLevel
-from dtest import Tester, debug, create_ks
+from dtest import Tester, create_ks
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
-from tools.decorators import no_vnodes
from tools.misc import new_node
+logger = logging.getLogger(__name__)
+
class TestBootstrapConsistency(Tester):
- @no_vnodes()
- def consistent_reads_after_move_test(self):
- debug("Creating a ring")
+ @pytest.mark.no_vnodes
+ def test_consistent_reads_after_move(self):
+ logger.debug("Creating a ring")
cluster = self.cluster
- cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000,
- 'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0})
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+ 'write_request_timeout_in_ms': 60000,
+ 'read_request_timeout_in_ms': 60000,
+ 'dynamic_snitch_badness_threshold': 0.0})
cluster.set_batch_commitlog(enabled=True)
cluster.populate(3, tokens=[0, 2**48, 2**62]).start()
node1, node2, node3 = cluster.nodelist()
- debug("Set to talk to node 2")
+ logger.debug("Set to talk to node 2")
n2session = self.patient_cql_connection(node2)
create_ks(n2session, 'ks', 2)
create_c1c2_table(self, n2session)
- debug("Generating some data for all nodes")
- insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL)
+ logger.debug("Generating some data for all nodes")
+ insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL)
node1.flush()
- debug("Taking down node1")
+ logger.debug("Taking down node1")
node1.stop(wait_other_notice=True)
- debug("Writing data to node2")
- insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE)
+ logger.debug("Writing data to node2")
+ insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE)
node2.flush()
- debug("Restart node1")
+ logger.debug("Restart node1")
node1.start(wait_other_notice=True)
- debug("Move token on node3")
+ logger.debug("Move token on node3")
node3.move(2)
- debug("Checking that no data was lost")
- for n in xrange(10, 20):
+ logger.debug("Checking that no data was lost")
+ for n in range(10, 20):
query_c1c2(n2session, n, ConsistencyLevel.ALL)
- for n in xrange(30, 1000):
+ for n in range(30, 1000):
query_c1c2(n2session, n, ConsistencyLevel.ALL)
- def consistent_reads_after_bootstrap_test(self):
- debug("Creating a ring")
+ def test_consistent_reads_after_bootstrap(self):
+ logger.debug("Creating a ring")
cluster = self.cluster
- cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000,
- 'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0})
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+ 'write_request_timeout_in_ms': 60000,
+ 'read_request_timeout_in_ms': 60000,
+ 'dynamic_snitch_badness_threshold': 0.0})
cluster.set_batch_commitlog(enabled=True)
cluster.populate(2)
node1, node2 = cluster.nodelist()
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
- debug("Set to talk to node 2")
+ logger.debug("Set to talk to node 2")
n2session = self.patient_cql_connection(node2)
create_ks(n2session, 'ks', 2)
create_c1c2_table(self, n2session)
- debug("Generating some data for all nodes")
- insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL)
+ logger.debug("Generating some data for all nodes")
+ insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL)
node1.flush()
- debug("Taking down node1")
+ logger.debug("Taking down node1")
node1.stop(wait_other_notice=True)
- debug("Writing data to only node2")
- insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE)
+ logger.debug("Writing data to only node2")
+ insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE)
node2.flush()
- debug("Restart node1")
+ logger.debug("Restart node1")
node1.start(wait_other_notice=True)
- debug("Bootstraping node3")
+ logger.debug("Bootstraping node3")
node3 = new_node(cluster)
node3.start(wait_for_binary_proto=True)
n3session = self.patient_cql_connection(node3)
n3session.execute("USE ks")
- debug("Checking that no data was lost")
- for n in xrange(10, 20):
+ logger.debug("Checking that no data was lost")
+ for n in range(10, 20):
query_c1c2(n3session, n, ConsistencyLevel.ALL)
- for n in xrange(30, 1000):
+ for n in range(30, 1000):
query_c1c2(n3session, n, ConsistencyLevel.ALL)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_test.py
----------------------------------------------------------------------
diff --git a/counter_test.py b/counter_test.py
new file mode 100644
index 0000000..ae87d95
--- /dev/null
+++ b/counter_test.py
@@ -0,0 +1,417 @@
+import random
+import time
+import uuid
+import pytest
+import logging
+
+from cassandra import ConsistencyLevel
+from cassandra.query import SimpleStatement
+
+from tools.assertions import assert_invalid, assert_length_equal, assert_one
+from dtest import Tester, create_ks, create_cf
+from tools.data import rows_to_list
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+
+class TestCounters(Tester):
+
+ @since('3.0', max_version='3.12')
+ def test_13691(self):
+ """
+ 2.0 -> 2.1 -> 3.0 counters upgrade test
+ @jira_ticket CASSANDRA-13691
+ """
+ cluster = self.cluster
+ default_install_dir = cluster.get_install_dir()
+
+ #
+ # set up a 2.0 cluster with 3 nodes and set up schema
+ #
+
+ cluster.set_install_dir(version='2.0.17')
+ cluster.populate(3)
+ cluster.start()
+
+ node1, node2, node3 = cluster.nodelist()
+
+ session = self.patient_cql_connection(node1)
+ session.execute("""
+ CREATE KEYSPACE test
+ WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
+ """)
+ session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);")
+
+ #
+ # generate some 2.0 counter columns with local shards
+ #
+
+ query = "UPDATE test.test SET c = c + 1 WHERE id = ?"
+ prepared = session.prepare(query)
+ for i in range(0, 1000):
+ session.execute(prepared, [i])
+
+ cluster.flush()
+ cluster.stop()
+
+ #
+ # upgrade cluster to 2.1
+ #
+
+ cluster.set_install_dir(version='2.1.17')
+ cluster.start()
+ cluster.nodetool("upgradesstables")
+
+ #
+ # upgrade node3 to current (3.0.x or 3.11.x)
+ #
+
+ node3.stop(wait_other_notice=True)
+ node3.set_install_dir(install_dir=default_install_dir)
+ node3.start(wait_other_notice=True)
+
+ #
+ # with a 2.1 coordinator, try to read the table with CL.ALL
+ #
+
+ session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
+ assert_one(session, "SELECT COUNT(*) FROM test.test", [1000])
+
+ @pytest.mark.vnodes
+ def test_counter_leader_with_partial_view(self):
+ """
+ Test leader election with a starting node.
+
+ Testing that nodes do not elect as mutation leader a node with a partial view on the cluster.
+ Note that byteman rules can be syntax checked via the following command:
+ sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm
+
+ @jira_ticket CASSANDRA-13043
+ """
+ cluster = self.cluster
+
+ cluster.populate(3, use_vnodes=True, install_byteman=True)
+ nodes = cluster.nodelist()
+ # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating
+ # takes place iff there is an actual chance for node 2 to be picked.
+ if cluster.version() < '4.0':
+ nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
+ nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
+ else:
+ nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
+ nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
+
+ cluster.start(wait_for_binary_proto=True)
+ session = self.patient_cql_connection(nodes[0])
+ create_ks(session, 'ks', 3)
+ create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
+
+ # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list
+ # nodes that are alive
+ nodes[1].stop(wait=True, wait_other_notice=False)
+ nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm')
+ nodes[1].start(no_wait=True, wait_other_notice=False)
+
+ # Until node 2 is fully alive try to force other nodes to pick him as mutation leader.
+ # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how
+ # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster
+ # and thus will raise an 'UnavailableException' exception.
+ nb_attempts = 50000
+ for i in range(0, nb_attempts):
+ # Change the name of the counter for the sake of randomization
+ q = SimpleStatement(
+ query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i,
+ consistency_level=ConsistencyLevel.QUORUM
+ )
+ session.execute(q)
+
+ def test_simple_increment(self):
+ """ Simple incrementation test (Created for #3465, that wasn't a bug) """
+ cluster = self.cluster
+
+ cluster.populate(3).start()
+ nodes = cluster.nodelist()
+
+ session = self.patient_cql_connection(nodes[0])
+ create_ks(session, 'ks', 3)
+ create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
+
+ sessions = [self.patient_cql_connection(node, 'ks') for node in nodes]
+ nb_increment = 50
+ nb_counter = 10
+
+ for i in range(0, nb_increment):
+ for c in range(0, nb_counter):
+ session = sessions[(i + c) % len(nodes)]
+ query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM)
+ session.execute(query)
+
+ session = sessions[i % len(nodes)]
+ keys = ",".join(["'counter%i'" % c for c in range(0, nb_counter)])
+ query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM)
+ res = list(session.execute(query))
+
+ assert_length_equal(res, nb_counter)
+ for c in range(0, nb_counter):
+ assert len(res[c]) == 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c]))
+ assert res[c][1] == i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0])
+
+ def test_upgrade(self):
+ """ Test for bug of #4436 """
+ cluster = self.cluster
+
+ cluster.populate(2).start()
+ nodes = cluster.nodelist()
+
+ session = self.patient_cql_connection(nodes[0])
+ create_ks(session, 'ks', 2)
+
+ query = """
+ CREATE TABLE counterTable (
+ k int PRIMARY KEY,
+ c counter
+ )
+ """
+ query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }"
+
+ session.execute(query)
+ time.sleep(2)
+
+ keys = list(range(0, 4))
+ updates = 50
+
+ def make_updates():
+ session = self.patient_cql_connection(nodes[0], keyspace='ks')
+ upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;"
+ batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"])
+
+ for i in range(0, updates):
+ query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM)
+ session.execute(query)
+
+ def check(i):
+ session = self.patient_cql_connection(nodes[0], keyspace='ks')
+ query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM)
+ rows = list(session.execute(query))
+
+ assert len(rows) == len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows))
+ for row in rows:
+ assert row[1], i * updates == "Unexpected value {}".format(str(row))
+
+ def rolling_restart():
+ # Rolling restart
+ for i in range(0, 2):
+ time.sleep(.2)
+ nodes[i].nodetool("drain")
+ nodes[i].stop(wait_other_notice=False)
+ nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True)
+ time.sleep(.2)
+
+ make_updates()
+ check(1)
+ rolling_restart()
+
+ make_updates()
+ check(2)
+ rolling_restart()
+
+ make_updates()
+ check(3)
+ rolling_restart()
+
+ check(3)
+
+ def test_counter_consistency(self):
+ """
+ Do a bunch of writes with ONE, read back with ALL and check results.
+ """
+ cluster = self.cluster
+ cluster.populate(3).start()
+ node1, node2, node3 = cluster.nodelist()
+ session = self.patient_cql_connection(node1)
+ create_ks(session, 'counter_tests', 3)
+
+ stmt = """
+ CREATE TABLE counter_table (
+ id uuid PRIMARY KEY,
+ counter_one COUNTER,
+ counter_two COUNTER,
+ )
+ """
+ session.execute(stmt)
+
+ counters = []
+ # establish 50 counters (2x25 rows)
+ for i in range(25):
+ _id = str(uuid.uuid4())
+ counters.append(
+ {_id: {'counter_one': 1, 'counter_two': 1}}
+ )
+
+ query = SimpleStatement("""
+ UPDATE counter_table
+ SET counter_one = counter_one + 1, counter_two = counter_two + 1
+ where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE)
+ session.execute(query)
+
+ # increment a bunch of counters with CL.ONE
+ for i in range(10000):
+ counter = counters[random.randint(0, len(counters) - 1)]
+ counter_id = list(counter.keys())[0]
+
+ query = SimpleStatement("""
+ UPDATE counter_table
+ SET counter_one = counter_one + 2
+ where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+ session.execute(query)
+
+ query = SimpleStatement("""
+ UPDATE counter_table
+ SET counter_two = counter_two + 10
+ where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+ session.execute(query)
+
+ query = SimpleStatement("""
+ UPDATE counter_table
+ SET counter_one = counter_one - 1
+ where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+ session.execute(query)
+
+ query = SimpleStatement("""
+ UPDATE counter_table
+ SET counter_two = counter_two - 5
+ where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+ session.execute(query)
+
+ # update expectations to match (assumed) db state
+ counter[counter_id]['counter_one'] += 1
+ counter[counter_id]['counter_two'] += 5
+
+ # let's verify the counts are correct, using CL.ALL
+ for counter_dict in counters:
+ counter_id = list(counter_dict.keys())[0]
+
+ query = SimpleStatement("""
+ SELECT counter_one, counter_two
+ FROM counter_table WHERE id = {uuid}
+ """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL)
+ rows = list(session.execute(query))
+
+ counter_one_actual, counter_two_actual = rows[0]
+
+ assert counter_one_actual == counter_dict[counter_id]['counter_one']
+ assert counter_two_actual == counter_dict[counter_id]['counter_two']
+
+ def test_multi_counter_update(self):
+ """
+ Test for singlular update statements that will affect multiple counters.
+ """
+ cluster = self.cluster
+ cluster.populate(3).start()
+ node1, node2, node3 = cluster.nodelist()
+ session = self.patient_cql_connection(node1)
+ create_ks(session, 'counter_tests', 3)
+
+ session.execute("""
+ CREATE TABLE counter_table (
+ id text,
+ myuuid uuid,
+ counter_one COUNTER,
+ PRIMARY KEY (id, myuuid))
+ """)
+
+ expected_counts = {}
+
+ # set up expectations
+ for i in range(1, 6):
+ _id = uuid.uuid4()
+
+ expected_counts[_id] = i
+
+ for k, v in list(expected_counts.items()):
+ session.execute("""
+ UPDATE counter_table set counter_one = counter_one + {v}
+ WHERE id='foo' and myuuid = {k}
+ """.format(k=k, v=v))
+
+ for k, v in list(expected_counts.items()):
+ count = list(session.execute("""
+ SELECT counter_one FROM counter_table
+ WHERE id = 'foo' and myuuid = {k}
+ """.format(k=k)))
+
+ assert v == count[0][0]
+
+ @since("2.0", max_version="3.X")
+ def test_validate_empty_column_name(self):
+ cluster = self.cluster
+ cluster.populate(1).start()
+ node1 = cluster.nodelist()[0]
+ session = self.patient_cql_connection(node1)
+ create_ks(session, 'counter_tests', 1)
+
+ session.execute("""
+ CREATE TABLE compact_counter_table (
+ pk int,
+ ck text,
+ value counter,
+ PRIMARY KEY (pk, ck))
+ WITH COMPACT STORAGE
+ """)
+
+ assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''")
+ assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''")
+
+ session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'")
+ session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'")
+
+ assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3])
+
+ @since('2.0')
+ def test_drop_counter_column(self):
+ """Test for CASSANDRA-7831"""
+ cluster = self.cluster
+ cluster.populate(1).start()
+ node1, = cluster.nodelist()
+ session = self.patient_cql_connection(node1)
+ create_ks(session, 'counter_tests', 1)
+
+ session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")
+
+ session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
+ row = list(session.execute("SELECT * from counter_bug"))
+
+ assert rows_to_list(row)[0] == [1, 1]
+ assert len(row) == 1
+
+ session.execute("ALTER TABLE counter_bug drop c")
+
+ assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
+
+ @since("2.0", max_version="3.X") # Compact Storage
+ def test_compact_counter_cluster(self):
+ """
+ @jira_ticket CASSANDRA-12219
+ This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8
+ """
+ cluster = self.cluster
+ cluster.populate(3).start()
+ node1 = cluster.nodelist()[0]
+ session = self.patient_cql_connection(node1)
+ create_ks(session, 'counter_tests', 1)
+
+ session.execute("""
+ CREATE TABLE IF NOT EXISTS counter_cs (
+ key bigint PRIMARY KEY,
+ data counter
+ ) WITH COMPACT STORAGE
+ """)
+
+ for outer in range(0, 5):
+ for idx in range(0, 5):
+ session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx))
+
+ for idx in range(0, 5):
+ row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx)))
+ assert rows_to_list(row)[0][0] == 5
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_tests.py
----------------------------------------------------------------------
diff --git a/counter_tests.py b/counter_tests.py
deleted file mode 100644
index 1de495d..0000000
--- a/counter_tests.py
+++ /dev/null
@@ -1,414 +0,0 @@
-import random
-import time
-import uuid
-
-from cassandra import ConsistencyLevel
-from cassandra.query import SimpleStatement
-
-from tools.assertions import assert_invalid, assert_length_equal, assert_one
-from dtest import Tester, create_ks, create_cf
-from tools.data import rows_to_list
-from tools.decorators import since
-
-
-class TestCounters(Tester):
-
- @since('3.0', max_version='3.12')
- def test_13691(self):
- """
- 2.0 -> 2.1 -> 3.0 counters upgrade test
- @jira_ticket CASSANDRA-13691
- """
- cluster = self.cluster
- default_install_dir = cluster.get_install_dir()
-
- #
- # set up a 2.0 cluster with 3 nodes and set up schema
- #
-
- cluster.set_install_dir(version='2.0.17')
- cluster.populate(3)
- cluster.start()
-
- node1, node2, node3 = cluster.nodelist()
-
- session = self.patient_cql_connection(node1)
- session.execute("""
- CREATE KEYSPACE test
- WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
- """)
- session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);")
-
- #
- # generate some 2.0 counter columns with local shards
- #
-
- query = "UPDATE test.test SET c = c + 1 WHERE id = ?"
- prepared = session.prepare(query)
- for i in range(0, 1000):
- session.execute(prepared, [i])
-
- cluster.flush()
- cluster.stop()
-
- #
- # upgrade cluster to 2.1
- #
-
- cluster.set_install_dir(version='2.1.17')
- cluster.start()
- cluster.nodetool("upgradesstables")
-
- #
- # upgrade node3 to current (3.0.x or 3.11.x)
- #
-
- node3.stop(wait_other_notice=True)
- node3.set_install_dir(install_dir=default_install_dir)
- node3.start(wait_other_notice=True)
-
- #
- # with a 2.1 coordinator, try to read the table with CL.ALL
- #
-
- session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
- assert_one(session, "SELECT COUNT(*) FROM test.test", [1000])
-
- def counter_leader_with_partial_view_test(self):
- """
- Test leader election with a starting node.
-
- Testing that nodes do not elect as mutation leader a node with a partial view on the cluster.
- Note that byteman rules can be syntax checked via the following command:
- sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm
-
- @jira_ticket CASSANDRA-13043
- """
- cluster = self.cluster
-
- cluster.populate(3, use_vnodes=True, install_byteman=True)
- nodes = cluster.nodelist()
- # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating
- # takes place iff there is an actual chance for node 2 to be picked.
- if cluster.version() < '4.0':
- nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
- nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
- else:
- nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
- nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
-
- cluster.start(wait_for_binary_proto=True)
- session = self.patient_cql_connection(nodes[0])
- create_ks(session, 'ks', 3)
- create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
-
- # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list
- # nodes that are alive
- nodes[1].stop(wait=True, wait_other_notice=False)
- nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm')
- nodes[1].start(no_wait=True, wait_other_notice=False)
-
- # Until node 2 is fully alive try to force other nodes to pick him as mutation leader.
- # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how
- # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster
- # and thus will raise an 'UnavailableException' exception.
- nb_attempts = 50000
- for i in xrange(0, nb_attempts):
- # Change the name of the counter for the sake of randomization
- q = SimpleStatement(
- query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i,
- consistency_level=ConsistencyLevel.QUORUM
- )
- session.execute(q)
-
- def simple_increment_test(self):
- """ Simple incrementation test (Created for #3465, that wasn't a bug) """
- cluster = self.cluster
-
- cluster.populate(3).start()
- nodes = cluster.nodelist()
-
- session = self.patient_cql_connection(nodes[0])
- create_ks(session, 'ks', 3)
- create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
-
- sessions = [self.patient_cql_connection(node, 'ks') for node in nodes]
- nb_increment = 50
- nb_counter = 10
-
- for i in xrange(0, nb_increment):
- for c in xrange(0, nb_counter):
- session = sessions[(i + c) % len(nodes)]
- query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM)
- session.execute(query)
-
- session = sessions[i % len(nodes)]
- keys = ",".join(["'counter%i'" % c for c in xrange(0, nb_counter)])
- query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM)
- res = list(session.execute(query))
-
- assert_length_equal(res, nb_counter)
- for c in xrange(0, nb_counter):
- self.assertEqual(len(res[c]), 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c])))
- self.assertEqual(res[c][1], i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0]))
-
- def upgrade_test(self):
- """ Test for bug of #4436 """
-
- cluster = self.cluster
-
- cluster.populate(2).start()
- nodes = cluster.nodelist()
-
- session = self.patient_cql_connection(nodes[0])
- create_ks(session, 'ks', 2)
-
- query = """
- CREATE TABLE counterTable (
- k int PRIMARY KEY,
- c counter
- )
- """
- query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }"
-
- session.execute(query)
- time.sleep(2)
-
- keys = range(0, 4)
- updates = 50
-
- def make_updates():
- session = self.patient_cql_connection(nodes[0], keyspace='ks')
- upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;"
- batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"])
-
- for i in range(0, updates):
- query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM)
- session.execute(query)
-
- def check(i):
- session = self.patient_cql_connection(nodes[0], keyspace='ks')
- query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM)
- rows = list(session.execute(query))
-
- self.assertEqual(len(rows), len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows)))
- for row in rows:
- self.assertEqual(row[1], i * updates, "Unexpected value {}".format(str(row)))
-
- def rolling_restart():
- # Rolling restart
- for i in range(0, 2):
- time.sleep(.2)
- nodes[i].nodetool("drain")
- nodes[i].stop(wait_other_notice=False)
- nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True)
- time.sleep(.2)
-
- make_updates()
- check(1)
- rolling_restart()
-
- make_updates()
- check(2)
- rolling_restart()
-
- make_updates()
- check(3)
- rolling_restart()
-
- check(3)
-
- def counter_consistency_test(self):
- """
- Do a bunch of writes with ONE, read back with ALL and check results.
- """
- cluster = self.cluster
- cluster.populate(3).start()
- node1, node2, node3 = cluster.nodelist()
- session = self.patient_cql_connection(node1)
- create_ks(session, 'counter_tests', 3)
-
- stmt = """
- CREATE TABLE counter_table (
- id uuid PRIMARY KEY,
- counter_one COUNTER,
- counter_two COUNTER,
- )
- """
- session.execute(stmt)
-
- counters = []
- # establish 50 counters (2x25 rows)
- for i in xrange(25):
- _id = str(uuid.uuid4())
- counters.append(
- {_id: {'counter_one': 1, 'counter_two': 1}}
- )
-
- query = SimpleStatement("""
- UPDATE counter_table
- SET counter_one = counter_one + 1, counter_two = counter_two + 1
- where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE)
- session.execute(query)
-
- # increment a bunch of counters with CL.ONE
- for i in xrange(10000):
- counter = counters[random.randint(0, len(counters) - 1)]
- counter_id = counter.keys()[0]
-
- query = SimpleStatement("""
- UPDATE counter_table
- SET counter_one = counter_one + 2
- where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
- session.execute(query)
-
- query = SimpleStatement("""
- UPDATE counter_table
- SET counter_two = counter_two + 10
- where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
- session.execute(query)
-
- query = SimpleStatement("""
- UPDATE counter_table
- SET counter_one = counter_one - 1
- where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
- session.execute(query)
-
- query = SimpleStatement("""
- UPDATE counter_table
- SET counter_two = counter_two - 5
- where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
- session.execute(query)
-
- # update expectations to match (assumed) db state
- counter[counter_id]['counter_one'] += 1
- counter[counter_id]['counter_two'] += 5
-
- # let's verify the counts are correct, using CL.ALL
- for counter_dict in counters:
- counter_id = counter_dict.keys()[0]
-
- query = SimpleStatement("""
- SELECT counter_one, counter_two
- FROM counter_table WHERE id = {uuid}
- """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL)
- rows = list(session.execute(query))
-
- counter_one_actual, counter_two_actual = rows[0]
-
- self.assertEqual(counter_one_actual, counter_dict[counter_id]['counter_one'])
- self.assertEqual(counter_two_actual, counter_dict[counter_id]['counter_two'])
-
- def multi_counter_update_test(self):
- """
- Test for singlular update statements that will affect multiple counters.
- """
- cluster = self.cluster
- cluster.populate(3).start()
- node1, node2, node3 = cluster.nodelist()
- session = self.patient_cql_connection(node1)
- create_ks(session, 'counter_tests', 3)
-
- session.execute("""
- CREATE TABLE counter_table (
- id text,
- myuuid uuid,
- counter_one COUNTER,
- PRIMARY KEY (id, myuuid))
- """)
-
- expected_counts = {}
-
- # set up expectations
- for i in range(1, 6):
- _id = uuid.uuid4()
-
- expected_counts[_id] = i
-
- for k, v in expected_counts.items():
- session.execute("""
- UPDATE counter_table set counter_one = counter_one + {v}
- WHERE id='foo' and myuuid = {k}
- """.format(k=k, v=v))
-
- for k, v in expected_counts.items():
- count = list(session.execute("""
- SELECT counter_one FROM counter_table
- WHERE id = 'foo' and myuuid = {k}
- """.format(k=k)))
-
- self.assertEqual(v, count[0][0])
-
- @since("2.0", max_version="3.X")
- def validate_empty_column_name_test(self):
- cluster = self.cluster
- cluster.populate(1).start()
- node1 = cluster.nodelist()[0]
- session = self.patient_cql_connection(node1)
- create_ks(session, 'counter_tests', 1)
-
- session.execute("""
- CREATE TABLE compact_counter_table (
- pk int,
- ck text,
- value counter,
- PRIMARY KEY (pk, ck))
- WITH COMPACT STORAGE
- """)
-
- assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''")
- assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''")
-
- session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'")
- session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'")
-
- assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3])
-
- @since('2.0')
- def drop_counter_column_test(self):
- """Test for CASSANDRA-7831"""
- cluster = self.cluster
- cluster.populate(1).start()
- node1, = cluster.nodelist()
- session = self.patient_cql_connection(node1)
- create_ks(session, 'counter_tests', 1)
-
- session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")
-
- session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
- row = list(session.execute("SELECT * from counter_bug"))
-
- self.assertEqual(rows_to_list(row)[0], [1, 1])
- self.assertEqual(len(row), 1)
-
- session.execute("ALTER TABLE counter_bug drop c")
-
- assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
-
- @since("2.0", max_version="3.X") # Compact Storage
- def compact_counter_cluster_test(self):
- """
- @jira_ticket CASSANDRA-12219
- This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8
- """
-
- cluster = self.cluster
- cluster.populate(3).start()
- node1 = cluster.nodelist()[0]
- session = self.patient_cql_connection(node1)
- create_ks(session, 'counter_tests', 1)
-
- session.execute("""
- CREATE TABLE IF NOT EXISTS counter_cs (
- key bigint PRIMARY KEY,
- data counter
- ) WITH COMPACT STORAGE
- """)
-
- for outer in range(0, 5):
- for idx in range(0, 5):
- session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx))
-
- for idx in range(0, 5):
- row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx)))
- self.assertEqual(rows_to_list(row)[0][0], 5)
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/cql_prepared_test.py
----------------------------------------------------------------------
diff --git a/cql_prepared_test.py b/cql_prepared_test.py
index 0dfe6f0..c039b90 100644
--- a/cql_prepared_test.py
+++ b/cql_prepared_test.py
@@ -1,7 +1,11 @@
import time
+import pytest
+import logging
from dtest import Tester, create_ks
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
@since("1.2")
@@ -18,7 +22,7 @@ class TestCQL(Tester):
create_ks(session, 'ks', 1)
return session
- def batch_preparation_test(self):
+ def test_batch_preparation(self):
""" Test preparation of batch statement (#4202) """
session = self.prepare()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org