You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/29 21:10:35 UTC

[32/36] cassandra-dtest git commit: Migrate dtests to use pytest and python3

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/conftest.py
----------------------------------------------------------------------
diff --git a/conftest.py b/conftest.py
new file mode 100644
index 0000000..650e9c8
--- /dev/null
+++ b/conftest.py
@@ -0,0 +1,489 @@
+import pytest
+import logging
+import os
+import shutil
+import time
+import re
+import platform
+import copy
+import inspect
+import subprocess
+
+from dtest import running_in_docker, cleanup_docker_environment_before_test_execution
+
+from datetime import datetime
+from distutils.version import LooseVersion
+from netifaces import AF_INET
+from psutil import virtual_memory
+
+import netifaces as ni
+
+from ccmlib.common import validate_install_dir, get_version_from_build, is_win
+
+from dtest_setup import DTestSetup
+from dtest_setup_overrides import DTestSetupOverrides
+
+logger = logging.getLogger(__name__)
+
+
+class DTestConfig:
+    def __init__(self):
+        self.use_vnodes = True
+        self.use_off_heap_memtables = False
+        self.num_tokens = -1
+        self.data_dir_count = -1
+        self.force_execution_of_resource_intensive_tests = False
+        self.skip_resource_intensive_tests = False
+        self.cassandra_dir = None
+        self.cassandra_version = None
+        self.delete_logs = False
+        self.execute_upgrade_tests = False
+        self.disable_active_log_watching = False
+        self.keep_test_dir = False
+        self.enable_jacoco_code_coverage = False
+        self.jemalloc_path = find_libjemalloc()
+
+    def setup(self, request):
+        self.use_vnodes = request.config.getoption("--use-vnodes")
+        self.use_off_heap_memtables = request.config.getoption("--use-off-heap-memtables")
+        self.num_tokens = request.config.getoption("--num-tokens")
+        self.data_dir_count = request.config.getoption("--data-dir-count-per-instance")
+        self.force_execution_of_resource_intensive_tests = request.config.getoption("--force-resource-intensive-tests")
+        self.skip_resource_intensive_tests = request.config.getoption("--skip-resource-intensive-tests")
+        if request.config.getoption("--cassandra-dir") is not None:
+            self.cassandra_dir = os.path.expanduser(request.config.getoption("--cassandra-dir"))
+        self.cassandra_version = request.config.getoption("--cassandra-version")
+        self.delete_logs = request.config.getoption("--delete-logs")
+        self.execute_upgrade_tests = request.config.getoption("--execute-upgrade-tests")
+        self.disable_active_log_watching = request.config.getoption("--disable-active-log-watching")
+        self.keep_test_dir = request.config.getoption("--keep-test-dir")
+        self.enable_jacoco_code_coverage = request.config.getoption("--enable-jacoco-code-coverage")
+
+
+def check_required_loopback_interfaces_available():
+    """
+    We need at least 3 loopback interfaces configured to run almost all dtests. On Linux, loopback
+    interfaces are automatically created as they are used, but on Mac they need to be explicitly
+    created. Check if we're running on Mac (Darwin), and if so check we have at least 3 loopback
+    interfaces available, otherwise bail out so we don't run the tests in a known bad config and
+    give the user some helpful advice on how to get their machine into a good known config
+    """
+    if platform.system() == "Darwin":
+        if len(ni.ifaddresses('lo0')[AF_INET]) < 9:
+            pytest.exit("At least 9 loopback interfaces are required to run dtests. "
+                            "On Mac you can create the required loopback interfaces by running "
+                            "'for i in {1..9}; do sudo ifconfig lo0 alias 127.0.0.$i up; done;'")
+
+
+def pytest_addoption(parser):
+    parser.addoption("--use-vnodes", action="store_true", default=False,
+                     help="Determines wither or not to setup clusters using vnodes for tests")
+    parser.addoption("--use-off-heap-memtables", action="store_true", default=False,
+                     help="Enable Off Heap Memtables when creating test clusters for tests")
+    parser.addoption("--num-tokens", action="store", default=256,
+                     help="Number of tokens to set num_tokens yaml setting to when creating instances "
+                          "with vnodes enabled")
+    parser.addoption("--data-dir-count-per-instance", action="store", default=3,
+                     help="Control the number of data directories to create per instance")
+    parser.addoption("--force-resource-intensive-tests", action="store_true", default=False,
+                     help="Forces the execution of tests marked as resource_intensive")
+    parser.addoption("--skip-resource-intensive-tests", action="store_true", default=False,
+                     help="Skip all tests marked as resource_intensive")
+    parser.addoption("--cassandra-dir", action="store", default=None,
+                     help="The directory containing the built C* artifacts to run the tests against. "
+                          "(e.g. the path to the root of a cloned C* git directory. Before executing dtests using "
+                          "this directory you must build C* with 'ant clean jar'). If you're doing C* development and "
+                          "want to run the tests this is almost always going to be the correct option.")
+    parser.addoption("--cassandra-version", action="store", default=None,
+                     help="A specific C* version to run the dtests against. The dtest framework will "
+                          "pull the required artifacts for this version.")
+    parser.addoption("--delete-logs", action="store_true", default=False,
+                     help="Delete all generated logs created by a test after the completion of a test.")
+    parser.addoption("--execute-upgrade-tests", action="store_true", default=False,
+                     help="Execute Cassandra Upgrade Tests (e.g. tests annotated with the upgrade_test mark)")
+    parser.addoption("--disable-active-log-watching", action="store_true", default=False,
+                     help="Disable ccm active log watching, which will cause dtests to check for errors in the "
+                          "logs in a single operation instead of semi-realtime processing by consuming "
+                          "ccm _log_error_handler callbacks")
+    parser.addoption("--keep-test-dir", action="store_true", default=False,
+                     help="Do not remove/cleanup the test ccm cluster directory and it's artifacts "
+                          "after the test completes")
+    parser.addoption("--enable-jacoco-code-coverage", action="store_true", default=False,
+                     help="Enable JaCoCo Code Coverage Support")
+
+
+def sufficient_system_resources_for_resource_intensive_tests():
+    mem = virtual_memory()
+    total_mem_gb = mem.total/1024/1024/1024
+    logger.info("total available system memory is %dGB" % total_mem_gb)
+    # todo kjkj: do not hard code our bound.. for now just do 9 instances at 3gb a piece
+    return total_mem_gb >= 9*3
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_dtest_setup_overrides():
+    """
+    no-op default implementation of fixture_dtest_setup_overrides.
+    we run this when a test class hasn't implemented their own
+    fixture_dtest_setup_overrides
+    """
+    return DTestSetupOverrides()
+
+
+"""
+Not exactly sure why :\ but, this fixture needs to be scoped to function level and not
+session or class. If you invoke pytest with tests across multiple test classes, when scopped
+at session, the root logger appears to get reset between each test class invocation.
+this means that the first test to run not from the first test class (and all subsequent 
+tests), will have the root logger reset and see a level of NOTSET. Scoping it at the
+class level seems to work, and I guess it's not that much extra overhead to setup the
+logger once per test class vs. once per session in the grand scheme of things.
+"""
+@pytest.fixture(scope="function", autouse=True)
+def fixture_logging_setup(request):
+    # set the root logger level to whatever the user asked for
+    # all new loggers created will use the root logger as a template
+    # essentially making this the "default" active log level
+    log_level = logging.INFO
+    try:
+        # first see if logging level overridden by user as command line argument
+        log_level_from_option = pytest.config.getoption("--log-level")
+        if log_level_from_option is not None:
+            log_level = logging.getLevelName(log_level_from_option)
+        else:
+            raise ValueError
+    except ValueError:
+        # nope, user didn't specify it as a command line argument to pytest, check if
+        # we have a default in the loaded pytest.ini. Note: words are seperated in variables
+        # in .ini land with a "_" while the command line arguments use "-"
+        if pytest.config.inicfg.get("log_level") is not None:
+            log_level = logging.getLevelName(pytest.config.inicfg.get("log_level"))
+
+    logging.root.setLevel(log_level)
+
+    logging_format = None
+    try:
+        # first see if logging level overridden by user as command line argument
+        log_format_from_option = pytest.config.getoption("--log-format")
+        if log_format_from_option is not None:
+            logging_format = log_format_from_option
+        else:
+            raise ValueError
+    except ValueError:
+        if pytest.config.inicfg.get("log_format") is not None:
+            logging_format = pytest.config.inicfg.get("log_format")
+
+    logging.basicConfig(level=log_level,
+                        format=logging_format)
+
+    # next, regardless of the level we set above (and requested by the user),
+    # reconfigure the "cassandra" logger to minimum INFO level to override the
+    # logging level that the "cassandra.*" imports should use; DEBUG is just
+    # insanely noisy and verbose, with the extra logging of very limited help
+    # in the context of dtest execution
+    if log_level == logging.DEBUG:
+        cassandra_module_log_level = logging.INFO
+    else:
+        cassandra_module_log_level = log_level
+    logging.getLogger("cassandra").setLevel(cassandra_module_log_level)
+
+
+@pytest.fixture(scope="session")
+def log_global_env_facts(fixture_dtest_config):
+    if pytest.config.pluginmanager.hasplugin('junitxml'):
+        my_junit = getattr(pytest.config, '_xml', None)
+        my_junit.add_global_property('USE_VNODES', fixture_dtest_config.use_vnodes)
+
+
+@pytest.fixture
+def fixture_dtest_config(request, fixture_logging_setup):
+    # although we don't use fixture_logging_setup here, we do want to
+    # have that fixture run as a prerequisite to this one.. and right now
+    # this is the only way that can be done with pytests
+    dtest_config = DTestConfig()
+    dtest_config.setup(request)
+    return dtest_config
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_maybe_skip_tests_requiring_novnodes(request):
+    """
+    Fixture run before the start of every test function that checks if the test is marked with
+    the no_vnodes annotation but the tests were started with a configuration that
+    has vnodes enabled. This should always be a no-op as we explicitly deselect tests
+    in pytest_collection_modifyitems that match this configuration -- but this is explicit :)
+    """
+    if request.node.get_marker('no_vnodes'):
+        if request.config.getoption("--use-vnodes"):
+            pytest.skip("Skipping test marked with no_vnodes as tests executed with vnodes enabled via the "
+                        "--use-vnodes command line argument")
+
+
+@pytest.fixture(scope='function', autouse=True)
+def fixture_log_test_name_and_date(request):
+    logger.info("Starting execution of %s at %s" % (request.node.name, str(datetime.now())))
+
+
+def _filter_errors(dtest_setup, errors):
+    """Filter errors, removing those that match ignore_log_patterns in the current DTestSetup"""
+    for e in errors:
+        for pattern in dtest_setup.ignore_log_patterns:
+            if re.search(pattern, repr(e)):
+                break
+        else:
+            yield e
+
+
+def check_logs_for_errors(dtest_setup):
+    errors = []
+    for node in dtest_setup.cluster.nodelist():
+        errors = list(_filter_errors(dtest_setup, ['\n'.join(msg) for msg in node.grep_log_for_errors()]))
+        if len(errors) is not 0:
+            for error in errors:
+                if isinstance(error, (bytes, bytearray)):
+                    error_str = error.decode("utf-8").strip()
+                else:
+                    error_str = error.strip()
+
+                if error_str:
+                    logger.error("Unexpected error in {node_name} log, error: \n{error}"
+                                 .format(node_name=node.name, error=error_str))
+                    errors.append(error_str)
+                    break
+    return errors
+
+
+def copy_logs(request, cluster, directory=None, name=None):
+    """Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'"""
+    log_saved_dir = "logs"
+    try:
+        os.mkdir(log_saved_dir)
+    except OSError:
+        pass
+
+    if directory is None:
+        directory = log_saved_dir
+    if name is None:
+        name = os.path.join(log_saved_dir, "last")
+    else:
+        name = os.path.join(directory, name)
+    if not os.path.exists(directory):
+        os.mkdir(directory)
+    logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(), node.compactionlogfilename())
+            for node in list(cluster.nodes.values())]
+    if len(logs) is not 0:
+        basedir = str(int(time.time() * 1000)) + '_' + request.node.name
+        logdir = os.path.join(directory, basedir)
+        os.mkdir(logdir)
+        for n, log, debuglog, gclog, compactionlog in logs:
+            if os.path.exists(log):
+                assert os.path.getsize(log) >= 0
+                shutil.copyfile(log, os.path.join(logdir, n + ".log"))
+            if os.path.exists(debuglog):
+                assert os.path.getsize(debuglog) >= 0
+                shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log"))
+            if os.path.exists(gclog):
+                assert os.path.getsize(gclog) >= 0
+                shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
+            if os.path.exists(compactionlog):
+                assert os.path.getsize(compactionlog) >= 0
+                shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log"))
+        if os.path.exists(name):
+            os.unlink(name)
+        if not is_win():
+            os.symlink(basedir, name)
+
+
+def reset_environment_vars(initial_environment):
+    pytest_current_test = os.environ.get('PYTEST_CURRENT_TEST')
+    os.environ.clear()
+    os.environ.update(initial_environment)
+    os.environ['PYTEST_CURRENT_TEST'] = pytest_current_test
+
+
+
+
+
+@pytest.fixture(scope='function', autouse=False)
+def fixture_dtest_setup(request, parse_dtest_config, fixture_dtest_setup_overrides, fixture_logging_setup):
+    if running_in_docker():
+        cleanup_docker_environment_before_test_execution()
+
+    # do all of our setup operations to get the enviornment ready for the actual test
+    # to run (e.g. bring up a cluster with the necessary config, populate variables, etc)
+    initial_environment = copy.deepcopy(os.environ)
+    dtest_setup = DTestSetup(dtest_config=parse_dtest_config, setup_overrides=fixture_dtest_setup_overrides)
+    dtest_setup.initialize_cluster()
+
+    if not parse_dtest_config.disable_active_log_watching:
+        dtest_setup.log_watch_thread = dtest_setup.begin_active_log_watch()
+
+    # at this point we're done with our setup operations in this fixture
+    # yield to allow the actual test to run
+    yield dtest_setup
+
+    # phew! we're back after executing the test, now we need to do
+    # all of our teardown and cleanup operations
+
+    reset_environment_vars(initial_environment)
+    dtest_setup.jvm_args = []
+
+    for con in dtest_setup.connections:
+        con.cluster.shutdown()
+    dtest_setup.connections = []
+
+    failed = False
+    try:
+        if not dtest_setup.allow_log_errors:
+            errors = check_logs_for_errors(dtest_setup)
+            if len(errors) > 0:
+                failed = True
+                pytest.fail(msg='Unexpected error found in node logs (see stdout for full details). Errors: [{errors}]'
+                                     .format(errors=str.join(", ", errors)), pytrace=False)
+    finally:
+        try:
+            # save the logs for inspection
+            if failed or not parse_dtest_config.delete_logs:
+                copy_logs(request, dtest_setup.cluster)
+        except Exception as e:
+            logger.error("Error saving log:", str(e))
+        finally:
+            dtest_setup.cleanup_cluster()
+
+
+def _skip_msg(current_running_version, since_version, max_version):
+    if current_running_version < since_version:
+        return "%s < %s" % (current_running_version, since_version)
+    if max_version and current_running_version > max_version:
+        return "%s > %s" % (current_running_version, max_version)
+
+
+@pytest.fixture(autouse=True)
+def fixture_since(request, fixture_dtest_setup):
+    if request.node.get_marker('since'):
+        max_version_str = request.node.get_marker('since').kwargs.get('max_version', None)
+        max_version = None
+        if max_version_str:
+            max_version = LooseVersion(max_version_str)
+
+        since_str = request.node.get_marker('since').args[0]
+        since = LooseVersion(since_str)
+        current_running_version = fixture_dtest_setup.cluster.version()
+        skip_msg = _skip_msg(current_running_version, since, max_version)
+        if skip_msg:
+            pytest.skip(skip_msg)
+
+
+@pytest.fixture(scope='session', autouse=True)
+def install_debugging_signal_handler():
+    import faulthandler
+    faulthandler.enable()
+
+
+@pytest.fixture(scope='function')
+def parse_dtest_config(request):
+    dtest_config = DTestConfig()
+    dtest_config.setup(request)
+
+    # if we're on mac, check that we have the required loopback interfaces before doing anything!
+    check_required_loopback_interfaces_available()
+
+    try:
+        if dtest_config.cassandra_dir is not None:
+            validate_install_dir(dtest_config.cassandra_dir)
+    except Exception as e:
+        pytest.exit("{}. Did you remember to build C*? ('ant clean jar')".format(e))
+
+    yield dtest_config
+
+
+def pytest_collection_modifyitems(items, config):
+    """
+    This function is called upon during the pytest test collection phase and allows for modification
+    of the test items within the list
+    """
+    if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None:
+        if config.getoption("--cassandra-version") is None:
+            raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir "
+                            "or --cassandra-version. Refer to the documentation or invoke the help with --help.")
+
+    selected_items = []
+    deselected_items = []
+
+    sufficient_system_resources_resource_intensive = sufficient_system_resources_for_resource_intensive_tests()
+    logger.debug("has sufficient resources? %s" % sufficient_system_resources_resource_intensive)
+
+    for item in items:
+        #  set a timeout for all tests, it may be overwritten at the test level with an additional marker
+        if not item.get_marker("timeout"):
+            item.add_marker(pytest.mark.timeout(60*15))
+
+        deselect_test = False
+
+        if item.get_marker("resource_intensive"):
+            if config.getoption("--force-resource-intensive-tests"):
+                pass
+            if config.getoption("--skip-resource-intensive-tests"):
+                deselect_test = True
+                logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of "
+                      "this test re-run with the --force-resource-intensive-tests command line argument" % item.name)
+            if not sufficient_system_resources_resource_intensive:
+                deselect_test = True
+                logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name)
+
+        if item.get_marker("no_vnodes"):
+            if config.getoption("--use-vnodes"):
+                deselect_test = True
+                logger.info("SKIP: Deselecting test %s as the test requires vnodes to be disabled. To run this test, "
+                      "re-run without the --use-vnodes command line argument" % item.name)
+
+        if item.get_marker("vnodes"):
+            if not config.getoption("--use-vnodes"):
+                deselect_test = True
+                logger.info("SKIP: Deselecting test %s as the test requires vnodes to be enabled. To run this test, "
+                            "re-run with the --use-vnodes command line argument" % item.name)
+
+        for test_item_class in inspect.getmembers(item.module, inspect.isclass):
+            if not hasattr(test_item_class[1], "pytestmark"):
+                continue
+
+            for module_pytest_mark in test_item_class[1].pytestmark:
+                if module_pytest_mark.name == "upgrade_test":
+                    if not config.getoption("--execute-upgrade-tests"):
+                        deselect_test = True
+
+        if item.get_marker("upgrade_test"):
+            if not config.getoption("--execute-upgrade-tests"):
+                deselect_test = True
+
+        # todo kjkj: deal with no_offheap_memtables mark
+
+        if deselect_test:
+            deselected_items.append(item)
+        else:
+            selected_items.append(item)
+
+    config.hook.pytest_deselected(items=deselected_items)
+    items[:] = selected_items
+
+
+# Determine the location of the libjemalloc jar so that we can specify it
+# through environment variables when start Cassandra.  This reduces startup
+# time, making the dtests run faster.
+def find_libjemalloc():
+    if is_win():
+        # let the normal bat script handle finding libjemalloc
+        return ""
+
+    this_dir = os.path.dirname(os.path.realpath(__file__))
+    script = os.path.join(this_dir, "findlibjemalloc.sh")
+    try:
+        p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        if stderr or not stdout:
+            return "-"  # tells C* not to look for libjemalloc
+        else:
+            return stdout
+    except Exception as exc:
+        print("Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc))
+        return ""

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistency_test.py
----------------------------------------------------------------------
diff --git a/consistency_test.py b/consistency_test.py
index 2eaa2ca..368dba0 100644
--- a/consistency_test.py
+++ b/consistency_test.py
@@ -1,36 +1,30 @@
-import Queue
+import queue
 import sys
 import threading
 import time
+import pytest
+import logging
 from collections import OrderedDict, namedtuple
 from copy import deepcopy
 
 from cassandra import ConsistencyLevel, consistency_value_to_name
 from cassandra.query import SimpleStatement
-from nose.plugins.attrib import attr
-from nose.tools import assert_greater_equal
 
 from tools.assertions import (assert_all, assert_length_equal, assert_none,
                               assert_unavailable)
-from dtest import DISABLE_VNODES, MultiError, Tester, debug, create_ks, create_cf
+from dtest import MultiError, Tester, create_ks, create_cf
 from tools.data import (create_c1c2_table, insert_c1c2, insert_columns,
                         query_c1c2, rows_to_list)
-from tools.decorators import since
 from tools.jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 ExpectedConsistency = namedtuple('ExpectedConsistency', ('num_write_nodes', 'num_read_nodes', 'is_strong'))
 
 
 class TestHelper(Tester):
 
-    def __init__(self, *args, **kwargs):
-        Tester.__init__(self, *args, **kwargs)
-        self.lock = threading.Lock()
-
-    def log(self, message):
-        with self.lock:
-            debug(message)
-
     def _is_local(self, cl):
         return (cl == ConsistencyLevel.LOCAL_QUORUM or
                 cl == ConsistencyLevel.LOCAL_ONE or
@@ -51,12 +45,12 @@ class TestHelper(Tester):
             ConsistencyLevel.ONE: 1,
             ConsistencyLevel.TWO: 2,
             ConsistencyLevel.THREE: 3,
-            ConsistencyLevel.QUORUM: sum(rf_factors) / 2 + 1,
+            ConsistencyLevel.QUORUM: sum(rf_factors) // 2 + 1,
             ConsistencyLevel.ALL: sum(rf_factors),
-            ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] / 2 + 1,
-            ConsistencyLevel.EACH_QUORUM: rf_factors[dc] / 2 + 1,
-            ConsistencyLevel.SERIAL: sum(rf_factors) / 2 + 1,
-            ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] / 2 + 1,
+            ConsistencyLevel.LOCAL_QUORUM: rf_factors[dc] // 2 + 1,
+            ConsistencyLevel.EACH_QUORUM: rf_factors[dc] // 2 + 1,
+            ConsistencyLevel.SERIAL: sum(rf_factors) // 2 + 1,
+            ConsistencyLevel.LOCAL_SERIAL: rf_factors[dc] // 2 + 1,
             ConsistencyLevel.LOCAL_ONE: 1,
         }[cl]
 
@@ -73,7 +67,7 @@ class TestHelper(Tester):
                 :return: the data center corresponding to this node
                 """
                 dc = 0
-                for i in xrange(1, len(nodes)):
+                for i in range(1, len(nodes)):
                     if idx < sum(nodes[:i]):
                         break
                     dc += 1
@@ -101,7 +95,7 @@ class TestHelper(Tester):
         if self._is_local(cl):
             return num_nodes_alive[current] >= self._required_nodes(cl, rf_factors, current)
         elif cl == ConsistencyLevel.EACH_QUORUM:
-            for i in xrange(0, len(rf_factors)):
+            for i in range(0, len(rf_factors)):
                 if num_nodes_alive[i] < self._required_nodes(cl, rf_factors, i):
                     return False
             return True
@@ -132,7 +126,7 @@ class TestHelper(Tester):
             # StorageProxy.getLiveSortedEndpoints(), which is called by the AbstractReadExecutor
             # to determine the target replicas. The default case, a SimpleSnitch wrapped in
             # a dynamic snitch, may rarely choose a different replica.
-            debug('Changing snitch for single dc case')
+            logger.debug('Changing snitch for single dc case')
             for node in cluster.nodelist():
                 node.data_center = 'dc1'
             cluster.set_configuration_options(values={
@@ -208,7 +202,7 @@ class TestHelper(Tester):
         expected = [[userid, age]] if age else []
         ret = rows_to_list(res) == expected
         if check_ret:
-            self.assertTrue(ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency)))
+            assert ret, "Got {} from {}, expected {} at {}".format(rows_to_list(res), session.cluster.contact_points, expected, consistency_value_to_name(consistency))
         return ret
 
     def create_counters_table(self, session, requires_local_reads):
@@ -233,10 +227,10 @@ class TestHelper(Tester):
         statement = SimpleStatement("SELECT * from counters WHERE id = {}".format(id), consistency_level=consistency)
         ret = rows_to_list(session.execute(statement))
         if check_ret:
-            self.assertEqual(ret[0][1], val, "Got {} from {}, expected {} at {}".format(ret[0][1],
+            assert ret[0][1] == val, "Got {} from {}, expected {} at {}".format(ret[0][1],
                                                                                         session.cluster.contact_points,
                                                                                         val,
-                                                                                        consistency_value_to_name(consistency)))
+                                                                                        consistency_value_to_name(consistency))
         return ret[0][1] if ret else 0
 
 
@@ -255,8 +249,8 @@ class TestAvailability(TestHelper):
         rf = self.rf
 
         num_alive = nodes
-        for node in xrange(nodes):
-            debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive))
+        for node in range(nodes):
+            logger.debug('Testing node {} in single dc with {} nodes alive'.format(node, num_alive))
             session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname)
             for combination in combinations:
                 self._test_insert_query_from_node(session, 0, [rf], [num_alive], *combination)
@@ -274,12 +268,12 @@ class TestAvailability(TestHelper):
         rf = self.rf
 
         nodes_alive = deepcopy(nodes)
-        rf_factors = rf.values()
+        rf_factors = list(rf.values())
 
-        for i in xrange(0, len(nodes)):  # for each dc
-            self.log('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive))
-            for n in xrange(nodes[i]):  # for each node in this dc
-                self.log('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive))
+        for i in range(0, len(nodes)):  # for each dc
+            logger.debug('Testing dc {} with rf {} and {} nodes alive'.format(i, rf_factors[i], nodes_alive))
+            for n in range(nodes[i]):  # for each node in this dc
+                logger.debug('Testing node {} in dc {} with {} nodes alive'.format(n, i, nodes_alive))
                 node = n + sum(nodes[:i])
                 session = self.patient_exclusive_cql_connection(cluster.nodelist()[node], self.ksname)
                 for combination in combinations:
@@ -292,7 +286,7 @@ class TestAvailability(TestHelper):
         """
         Test availability for read and write via the session passed in as a parameter.
         """
-        self.log("Connected to %s for %s/%s/%s" %
+        logger.debug("Connected to %s for %s/%s/%s" %
                  (session.cluster.contact_points, consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl)))
 
         start = 0
@@ -300,13 +294,13 @@ class TestAvailability(TestHelper):
         age = 30
 
         if self._should_succeed(write_cl, rf_factors, num_nodes_alive, dc_idx):
-            for n in xrange(start, end):
+            for n in range(start, end):
                 self.insert_user(session, n, age, write_cl, serial_cl)
         else:
             assert_unavailable(self.insert_user, session, end, age, write_cl, serial_cl)
 
         if self._should_succeed(read_cl, rf_factors, num_nodes_alive, dc_idx):
-            for n in xrange(start, end):
+            for n in range(start, end):
                 self.query_user(session, n, age, read_cl, check_ret)
         else:
             assert_unavailable(self.query_user, session, end, age, read_cl, check_ret)
@@ -361,7 +355,7 @@ class TestAvailability(TestHelper):
 
         self._test_simple_strategy(combinations)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_network_topology_strategy(self):
         """
         Test for multiple datacenters, using network topology replication strategy.
@@ -393,7 +387,7 @@ class TestAvailability(TestHelper):
 
         self._test_network_topology_strategy(combinations)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     @since("3.0")
     def test_network_topology_strategy_each_quorum(self):
         """
@@ -432,7 +426,7 @@ class TestAccuracy(TestHelper):
             self.read_cl = read_cl
             self.serial_cl = serial_cl
 
-            outer.log('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})'
+            logger.debug('Testing accuracy with WRITE/READ/SERIAL consistency set to {}/{}/{} (keys : {} to {})'
                       .format(consistency_value_to_name(write_cl), consistency_value_to_name(read_cl), consistency_value_to_name(serial_cl), start, end - 1))
 
         def get_expected_consistency(self, idx):
@@ -459,12 +453,10 @@ class TestAccuracy(TestHelper):
                 for s in sessions:
                     if outer.query_user(s, n, val, read_cl, check_ret=expected_consistency.is_strong):
                         num += 1
-                assert_greater_equal(num, expected_consistency.num_write_nodes,
-                                     "Failed to read value from sufficient number of nodes,"
-                                     " required {} but got {} - [{}, {}]"
-                                     .format(expected_consistency.num_write_nodes, num, n, val))
+                assert num >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes," + \
+                                     " required {} but got {} - [{}, {}]".format(expected_consistency.num_write_nodes, num, n, val)
 
-            for n in xrange(start, end):
+            for n in range(start, end):
                 age = 30
                 for s in range(0, len(sessions)):
                     outer.insert_user(sessions[s], n, age, write_cl, serial_cl)
@@ -499,12 +491,10 @@ class TestAccuracy(TestHelper):
                 for s in sessions:
                     results.append(outer.query_counter(s, n, val, read_cl, check_ret=expected_consistency.is_strong))
 
-                assert_greater_equal(results.count(val), expected_consistency.num_write_nodes,
-                                     "Failed to read value from sufficient number of nodes, required {} nodes to have a"
-                                     " counter value of {} at key {}, instead got these values: {}"
-                                     .format(expected_consistency.num_write_nodes, val, n, results))
+                assert results.count(val) >= expected_consistency.num_write_nodes, "Failed to read value from sufficient number of nodes, required {} nodes to have a" + \
+                                     " counter value of {} at key {}, instead got these values: {}".format(expected_consistency.num_write_nodes, val, n, results)
 
-            for n in xrange(start, end):
+            for n in range(start, end):
                 c = 1
                 for s in range(0, len(sessions)):
                     outer.update_counter(sessions[s], n, write_cl, serial_cl)
@@ -534,15 +524,15 @@ class TestAccuracy(TestHelper):
 
         self._start_cluster(save_sessions=True, requires_local_reads=requires_local_reads)
 
-        input_queue = Queue.Queue()
-        exceptions_queue = Queue.Queue()
+        input_queue = queue.Queue()
+        exceptions_queue = queue.Queue()
 
         def run():
             while not input_queue.empty():
                 try:
                     v = TestAccuracy.Validation(self, self.sessions, nodes, rf_factors, *input_queue.get(block=False))
                     valid_fcn(v)
-                except Queue.Empty:
+                except queue.Empty:
                     pass
                 except Exception:
                     exceptions_queue.put(sys.exc_info())
@@ -560,17 +550,17 @@ class TestAccuracy(TestHelper):
             t.start()
             threads.append(t)
 
-        self.log("Waiting for workers to complete")
+        logger.debug("Waiting for workers to complete")
         while exceptions_queue.empty():
             time.sleep(0.1)
-            if len(filter(lambda t: t.isAlive(), threads)) == 0:
+            if len([t for t in threads if t.isAlive()]) == 0:
                 break
 
         if not exceptions_queue.empty():
-            _, exceptions, tracebacks = zip(*exceptions_queue.queue)
+            _, exceptions, tracebacks = list(zip(*exceptions_queue.queue))
             raise MultiError(exceptions=exceptions, tracebacks=tracebacks)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_simple_strategy_users(self):
         """
         Test for a single datacenter, users table, only the each quorum reads.
@@ -599,10 +589,10 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.QUORUM, ConsistencyLevel.LOCAL_SERIAL, ConsistencyLevel.SERIAL),
         ]
 
-        self.log("Testing single dc, users")
+        logger.debug("Testing single dc, users")
         self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     @since("3.0")
     def test_simple_strategy_each_quorum_users(self):
         """
@@ -617,10 +607,10 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
         ]
 
-        self.log("Testing single dc, users, each quorum reads")
+        logger.debug("Testing single dc, users, each quorum reads")
         self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, [self.nodes], [self.rf], combinations)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_network_topology_strategy_users(self):
         """
         Test for multiple datacenters, users table.
@@ -653,10 +643,10 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL),
         ]
 
-        self.log("Testing multiple dcs, users")
-        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations),
+        logger.debug("Testing multiple dcs, users")
+        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations),
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     @since("3.0")
     def test_network_topology_strategy_each_quorum_users(self):
         """
@@ -672,8 +662,8 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
         ]
 
-        self.log("Testing multiple dcs, users, each quorum reads")
-        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, self.rf.values(), combinations)
+        logger.debug("Testing multiple dcs, users, each quorum reads")
+        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_users, self.nodes, list(self.rf.values()), combinations)
 
     def test_simple_strategy_counters(self):
         """
@@ -700,7 +690,7 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_QUORUM),
         ]
 
-        self.log("Testing single dc, counters")
+        logger.debug("Testing single dc, counters")
         self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations)
 
     @since("3.0")
@@ -718,10 +708,10 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
         ]
 
-        self.log("Testing single dc, counters, each quorum reads")
+        logger.debug("Testing single dc, counters, each quorum reads")
         self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, [self.nodes], [self.rf], combinations)
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_network_topology_strategy_counters(self):
         """
         Test for multiple datacenters, counters table.
@@ -749,10 +739,10 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.TWO, ConsistencyLevel.ONE),
         ]
 
-        self.log("Testing multiple dcs, counters")
-        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations),
+        logger.debug("Testing multiple dcs, counters")
+        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations),
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     @since("3.0")
     def test_network_topology_strategy_each_quorum_counters(self):
         """
@@ -768,8 +758,8 @@ class TestAccuracy(TestHelper):
             (ConsistencyLevel.EACH_QUORUM, ConsistencyLevel.EACH_QUORUM),
         ]
 
-        self.log("Testing multiple dcs, counters, each quorum reads")
-        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, self.rf.values(), combinations),
+        logger.debug("Testing multiple dcs, counters, each quorum reads")
+        self._run_test_function_in_parallel(TestAccuracy.Validation.validate_counters, self.nodes, list(self.rf.values()), combinations),
 
 
 class TestConsistency(Tester):
@@ -1105,7 +1095,7 @@ class TestConsistency(Tester):
         srp = make_mbean('metrics', type='Table', name='ShortReadProtectionRequests', keyspace='test', scope='test')
         with JolokiaAgent(node1) as jmx:
             # 4 srp requests for node1 and 5 for node2, total of 9
-            self.assertEqual(9, jmx.read_attribute(srp, 'Count'))
+            assert 9 == jmx.read_attribute(srp, 'Count')
 
     @since('3.0')
     def test_12872(self):
@@ -1174,12 +1164,17 @@ class TestConsistency(Tester):
                    [[0], [4]],
                    cl=ConsistencyLevel.ALL)
 
-    def short_read_test(self):
+    def test_short_read(self):
         """
         @jira_ticket CASSANDRA-9460
         """
         cluster = self.cluster
 
+        # this test causes the python driver to be extremely noisy due to
+        # frequent starting and stopping of nodes. let's move the log level
+        # of the driver to ERROR for this test only
+        logging.getLogger("cassandra").setLevel('ERROR')
+
         # Disable hinted handoff and set batch commit log so this doesn't
         # interfer with the test
         cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
@@ -1196,13 +1191,13 @@ class TestConsistency(Tester):
         reversed_key = 'reversed'
 
         # Repeat this test 10 times to make it more easy to spot a null pointer exception caused by a race, see CASSANDRA-9460
-        for k in xrange(10):
+        for k in range(10):
             # insert 9 columns in two rows
             insert_columns(self, session, normal_key, 9)
             insert_columns(self, session, reversed_key, 9)
 
             # Delete 3 first columns (and 3 last columns, for the reversed version) with a different node dead each time
-            for node, column_number_to_delete in zip(range(1, 4), range(3)):
+            for node, column_number_to_delete in zip(list(range(1, 4)), list(range(3))):
                 self.stop_node(node)
                 self.delete(node, normal_key, column_number_to_delete)
                 self.delete(node, reversed_key, 8 - column_number_to_delete)
@@ -1218,8 +1213,8 @@ class TestConsistency(Tester):
             assert_length_equal(res, 3)
 
             # value 0, 1 and 2 have been deleted
-            for i in xrange(1, 4):
-                self.assertEqual('value{}'.format(i + 2), res[i - 1][1])
+            for i in range(1, 4):
+                assert 'value{}'.format(i + 2) == res[i - 1][1]
 
             # Query 3 firsts columns in reverse order
             session = self.patient_cql_connection(node1, 'ks')
@@ -1231,12 +1226,12 @@ class TestConsistency(Tester):
             assert_length_equal(res, 3)
 
             # value 6, 7 and 8 have been deleted
-            for i in xrange(0, 3):
-                self.assertEqual('value{}'.format(5 - i), res[i][1])
+            for i in range(0, 3):
+                assert 'value{}'.format(5 - i) == res[i][1]
 
             session.execute('TRUNCATE cf')
 
-    def short_read_delete_test(self):
+    def test_short_read_delete(self):
         """ Test short reads ultimately leaving no columns alive [#4000] """
         cluster = self.cluster
 
@@ -1269,7 +1264,7 @@ class TestConsistency(Tester):
 
         assert_none(session, "SELECT c, v FROM cf WHERE key=\'k0\' LIMIT 1", cl=ConsistencyLevel.QUORUM)
 
-    def short_read_quorum_delete_test(self):
+    def test_short_read_quorum_delete(self):
         """
         @jira_ticket CASSANDRA-8933
         """
@@ -1311,11 +1306,11 @@ class TestConsistency(Tester):
         node3.stop(wait_other_notice=True)
         assert_none(session, "SELECT * FROM t WHERE id = 0 LIMIT 1", cl=ConsistencyLevel.QUORUM)
 
-    def readrepair_test(self):
+    def test_readrepair(self):
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
 
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             cluster.populate(2).start()
         else:
             tokens = cluster.balanced_tokens(2)
@@ -1333,43 +1328,43 @@ class TestConsistency(Tester):
         node2.start(wait_for_binary_proto=True, wait_other_notice=True)
 
         # query everything to cause RR
-        for n in xrange(0, 10000):
+        for n in range(0, 10000):
             query_c1c2(session, n, ConsistencyLevel.QUORUM)
 
         node1.stop(wait_other_notice=True)
 
         # Check node2 for all the keys that should have been repaired
         session = self.patient_cql_connection(node2, keyspace='ks')
-        for n in xrange(0, 10000):
+        for n in range(0, 10000):
             query_c1c2(session, n, ConsistencyLevel.ONE)
 
-    def quorum_available_during_failure_test(self):
-        CL = ConsistencyLevel.QUORUM
-        RF = 3
+    def test_quorum_available_during_failure(self):
+        cl = ConsistencyLevel.QUORUM
+        rf = 3
 
-        debug("Creating a ring")
+        logger.debug("Creating a ring")
         cluster = self.cluster
-        if DISABLE_VNODES:
+        if not self.dtest_config.use_vnodes:
             cluster.populate(3).start()
         else:
             tokens = cluster.balanced_tokens(3)
             cluster.populate(3, tokens=tokens).start()
         node1, node2, node3 = cluster.nodelist()
 
-        debug("Set to talk to node 2")
+        logger.debug("Set to talk to node 2")
         session = self.patient_cql_connection(node2)
-        create_ks(session, 'ks', RF)
+        create_ks(session, 'ks', rf)
         create_c1c2_table(self, session)
 
-        debug("Generating some data")
-        insert_c1c2(session, n=100, consistency=CL)
+        logger.debug("Generating some data")
+        insert_c1c2(session, n=100, consistency=cl)
 
-        debug("Taking down node1")
+        logger.debug("Taking down node1")
         node1.stop(wait_other_notice=True)
 
-        debug("Reading back data.")
-        for n in xrange(100):
-            query_c1c2(session, n, CL)
+        logger.debug("Reading back data.")
+        for n in range(100):
+            query_c1c2(session, n, cl)
 
     def stop_node(self, node_number):
         to_stop = self.cluster.nodes["node%d" % node_number]

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/consistent_bootstrap_test.py
----------------------------------------------------------------------
diff --git a/consistent_bootstrap_test.py b/consistent_bootstrap_test.py
index ada9b39..24626f3 100644
--- a/consistent_bootstrap_test.py
+++ b/consistent_bootstrap_test.py
@@ -1,92 +1,100 @@
+import pytest
+import logging
+
 from cassandra import ConsistencyLevel
 
-from dtest import Tester, debug, create_ks
+from dtest import Tester, create_ks
 from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
-from tools.decorators import no_vnodes
 from tools.misc import new_node
 
+logger = logging.getLogger(__name__)
+
 
 class TestBootstrapConsistency(Tester):
 
-    @no_vnodes()
-    def consistent_reads_after_move_test(self):
-        debug("Creating a ring")
+    @pytest.mark.no_vnodes
+    def test_consistent_reads_after_move(self):
+        logger.debug("Creating a ring")
         cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000,
-                                                  'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0})
+        cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+                                                  'write_request_timeout_in_ms': 60000,
+                                                  'read_request_timeout_in_ms': 60000,
+                                                  'dynamic_snitch_badness_threshold': 0.0})
         cluster.set_batch_commitlog(enabled=True)
 
         cluster.populate(3, tokens=[0, 2**48, 2**62]).start()
         node1, node2, node3 = cluster.nodelist()
 
-        debug("Set to talk to node 2")
+        logger.debug("Set to talk to node 2")
         n2session = self.patient_cql_connection(node2)
         create_ks(n2session, 'ks', 2)
         create_c1c2_table(self, n2session)
 
-        debug("Generating some data for all nodes")
-        insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL)
+        logger.debug("Generating some data for all nodes")
+        insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL)
 
         node1.flush()
-        debug("Taking down node1")
+        logger.debug("Taking down node1")
         node1.stop(wait_other_notice=True)
 
-        debug("Writing data to node2")
-        insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE)
+        logger.debug("Writing data to node2")
+        insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE)
         node2.flush()
 
-        debug("Restart node1")
+        logger.debug("Restart node1")
         node1.start(wait_other_notice=True)
 
-        debug("Move token on node3")
+        logger.debug("Move token on node3")
         node3.move(2)
 
-        debug("Checking that no data was lost")
-        for n in xrange(10, 20):
+        logger.debug("Checking that no data was lost")
+        for n in range(10, 20):
             query_c1c2(n2session, n, ConsistencyLevel.ALL)
 
-        for n in xrange(30, 1000):
+        for n in range(30, 1000):
             query_c1c2(n2session, n, ConsistencyLevel.ALL)
 
-    def consistent_reads_after_bootstrap_test(self):
-        debug("Creating a ring")
+    def test_consistent_reads_after_bootstrap(self):
+        logger.debug("Creating a ring")
         cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'write_request_timeout_in_ms': 60000,
-                                                  'read_request_timeout_in_ms': 60000, 'dynamic_snitch_badness_threshold': 0.0})
+        cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+                                                  'write_request_timeout_in_ms': 60000,
+                                                  'read_request_timeout_in_ms': 60000,
+                                                  'dynamic_snitch_badness_threshold': 0.0})
         cluster.set_batch_commitlog(enabled=True)
 
         cluster.populate(2)
         node1, node2 = cluster.nodelist()
         cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
 
-        debug("Set to talk to node 2")
+        logger.debug("Set to talk to node 2")
         n2session = self.patient_cql_connection(node2)
         create_ks(n2session, 'ks', 2)
         create_c1c2_table(self, n2session)
 
-        debug("Generating some data for all nodes")
-        insert_c1c2(n2session, keys=range(10, 20), consistency=ConsistencyLevel.ALL)
+        logger.debug("Generating some data for all nodes")
+        insert_c1c2(n2session, keys=list(range(10, 20)), consistency=ConsistencyLevel.ALL)
 
         node1.flush()
-        debug("Taking down node1")
+        logger.debug("Taking down node1")
         node1.stop(wait_other_notice=True)
 
-        debug("Writing data to only node2")
-        insert_c1c2(n2session, keys=range(30, 1000), consistency=ConsistencyLevel.ONE)
+        logger.debug("Writing data to only node2")
+        insert_c1c2(n2session, keys=list(range(30, 1000)), consistency=ConsistencyLevel.ONE)
         node2.flush()
 
-        debug("Restart node1")
+        logger.debug("Restart node1")
         node1.start(wait_other_notice=True)
 
-        debug("Bootstraping node3")
+        logger.debug("Bootstraping node3")
         node3 = new_node(cluster)
         node3.start(wait_for_binary_proto=True)
 
         n3session = self.patient_cql_connection(node3)
         n3session.execute("USE ks")
-        debug("Checking that no data was lost")
-        for n in xrange(10, 20):
+        logger.debug("Checking that no data was lost")
+        for n in range(10, 20):
             query_c1c2(n3session, n, ConsistencyLevel.ALL)
 
-        for n in xrange(30, 1000):
+        for n in range(30, 1000):
             query_c1c2(n3session, n, ConsistencyLevel.ALL)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_test.py
----------------------------------------------------------------------
diff --git a/counter_test.py b/counter_test.py
new file mode 100644
index 0000000..ae87d95
--- /dev/null
+++ b/counter_test.py
@@ -0,0 +1,417 @@
+import random
+import time
+import uuid
+import pytest
+import logging
+
+from cassandra import ConsistencyLevel
+from cassandra.query import SimpleStatement
+
+from tools.assertions import assert_invalid, assert_length_equal, assert_one
+from dtest import Tester, create_ks, create_cf
+from tools.data import rows_to_list
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+
+class TestCounters(Tester):
+
+    @since('3.0', max_version='3.12')
+    def test_13691(self):
+        """
+        2.0 -> 2.1 -> 3.0 counters upgrade test
+        @jira_ticket CASSANDRA-13691
+        """
+        cluster = self.cluster
+        default_install_dir = cluster.get_install_dir()
+
+        #
+        # set up a 2.0 cluster with 3 nodes and set up schema
+        #
+
+        cluster.set_install_dir(version='2.0.17')
+        cluster.populate(3)
+        cluster.start()
+
+        node1, node2, node3 = cluster.nodelist()
+
+        session = self.patient_cql_connection(node1)
+        session.execute("""
+            CREATE KEYSPACE test
+                WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
+            """)
+        session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);")
+
+        #
+        # generate some 2.0 counter columns with local shards
+        #
+
+        query = "UPDATE test.test SET c = c + 1 WHERE id = ?"
+        prepared = session.prepare(query)
+        for i in range(0, 1000):
+            session.execute(prepared, [i])
+
+        cluster.flush()
+        cluster.stop()
+
+        #
+        # upgrade cluster to 2.1
+        #
+
+        cluster.set_install_dir(version='2.1.17')
+        cluster.start()
+        cluster.nodetool("upgradesstables")
+
+        #
+        # upgrade node3 to current (3.0.x or 3.11.x)
+        #
+
+        node3.stop(wait_other_notice=True)
+        node3.set_install_dir(install_dir=default_install_dir)
+        node3.start(wait_other_notice=True)
+
+        #
+        # with a 2.1 coordinator, try to read the table with CL.ALL
+        #
+
+        session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
+        assert_one(session, "SELECT COUNT(*) FROM test.test", [1000])
+
+    @pytest.mark.vnodes
+    def test_counter_leader_with_partial_view(self):
+        """
+        Test leader election with a starting node.
+
+        Testing that nodes do not elect as mutation leader a node with a partial view on the cluster.
+        Note that byteman rules can be syntax checked via the following command:
+            sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm
+
+        @jira_ticket CASSANDRA-13043
+        """
+        cluster = self.cluster
+
+        cluster.populate(3, use_vnodes=True, install_byteman=True)
+        nodes = cluster.nodelist()
+        # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating
+        # takes place iff there is an actual chance for node 2 to be picked.
+        if cluster.version() < '4.0':
+            nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
+            nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
+        else:
+            nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
+            nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
+
+        cluster.start(wait_for_binary_proto=True)
+        session = self.patient_cql_connection(nodes[0])
+        create_ks(session, 'ks', 3)
+        create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
+
+        # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list
+        # nodes that are alive
+        nodes[1].stop(wait=True, wait_other_notice=False)
+        nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm')
+        nodes[1].start(no_wait=True, wait_other_notice=False)
+
+        # Until node 2 is fully alive try to force other nodes to pick him as mutation leader.
+        # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how
+        # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster
+        # and thus will raise an 'UnavailableException' exception.
+        nb_attempts = 50000
+        for i in range(0, nb_attempts):
+            # Change the name of the counter for the sake of randomization
+            q = SimpleStatement(
+                query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i,
+                consistency_level=ConsistencyLevel.QUORUM
+            )
+            session.execute(q)
+
+    def test_simple_increment(self):
+        """ Simple incrementation test (Created for #3465, that wasn't a bug) """
+        cluster = self.cluster
+
+        cluster.populate(3).start()
+        nodes = cluster.nodelist()
+
+        session = self.patient_cql_connection(nodes[0])
+        create_ks(session, 'ks', 3)
+        create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
+
+        sessions = [self.patient_cql_connection(node, 'ks') for node in nodes]
+        nb_increment = 50
+        nb_counter = 10
+
+        for i in range(0, nb_increment):
+            for c in range(0, nb_counter):
+                session = sessions[(i + c) % len(nodes)]
+                query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM)
+                session.execute(query)
+
+            session = sessions[i % len(nodes)]
+            keys = ",".join(["'counter%i'" % c for c in range(0, nb_counter)])
+            query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM)
+            res = list(session.execute(query))
+
+            assert_length_equal(res, nb_counter)
+            for c in range(0, nb_counter):
+                assert len(res[c]) == 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c]))
+                assert res[c][1] == i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0])
+
+    def test_upgrade(self):
+        """ Test for bug of #4436 """
+        cluster = self.cluster
+
+        cluster.populate(2).start()
+        nodes = cluster.nodelist()
+
+        session = self.patient_cql_connection(nodes[0])
+        create_ks(session, 'ks', 2)
+
+        query = """
+            CREATE TABLE counterTable (
+                k int PRIMARY KEY,
+                c counter
+            )
+        """
+        query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }"
+
+        session.execute(query)
+        time.sleep(2)
+
+        keys = list(range(0, 4))
+        updates = 50
+
+        def make_updates():
+            session = self.patient_cql_connection(nodes[0], keyspace='ks')
+            upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;"
+            batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"])
+
+            for i in range(0, updates):
+                query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM)
+                session.execute(query)
+
+        def check(i):
+            session = self.patient_cql_connection(nodes[0], keyspace='ks')
+            query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM)
+            rows = list(session.execute(query))
+
+            assert len(rows) == len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows))
+            for row in rows:
+                assert row[1], i * updates == "Unexpected value {}".format(str(row))
+
+        def rolling_restart():
+            # Rolling restart
+            for i in range(0, 2):
+                time.sleep(.2)
+                nodes[i].nodetool("drain")
+                nodes[i].stop(wait_other_notice=False)
+                nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True)
+                time.sleep(.2)
+
+        make_updates()
+        check(1)
+        rolling_restart()
+
+        make_updates()
+        check(2)
+        rolling_restart()
+
+        make_updates()
+        check(3)
+        rolling_restart()
+
+        check(3)
+
+    def test_counter_consistency(self):
+        """
+        Do a bunch of writes with ONE, read back with ALL and check results.
+        """
+        cluster = self.cluster
+        cluster.populate(3).start()
+        node1, node2, node3 = cluster.nodelist()
+        session = self.patient_cql_connection(node1)
+        create_ks(session, 'counter_tests', 3)
+
+        stmt = """
+              CREATE TABLE counter_table (
+              id uuid PRIMARY KEY,
+              counter_one COUNTER,
+              counter_two COUNTER,
+              )
+           """
+        session.execute(stmt)
+
+        counters = []
+        # establish 50 counters (2x25 rows)
+        for i in range(25):
+            _id = str(uuid.uuid4())
+            counters.append(
+                {_id: {'counter_one': 1, 'counter_two': 1}}
+            )
+
+            query = SimpleStatement("""
+                UPDATE counter_table
+                SET counter_one = counter_one + 1, counter_two = counter_two + 1
+                where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE)
+            session.execute(query)
+
+        # increment a bunch of counters with CL.ONE
+        for i in range(10000):
+            counter = counters[random.randint(0, len(counters) - 1)]
+            counter_id = list(counter.keys())[0]
+
+            query = SimpleStatement("""
+                UPDATE counter_table
+                SET counter_one = counter_one + 2
+                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+            session.execute(query)
+
+            query = SimpleStatement("""
+                UPDATE counter_table
+                SET counter_two = counter_two + 10
+                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+            session.execute(query)
+
+            query = SimpleStatement("""
+                UPDATE counter_table
+                SET counter_one = counter_one - 1
+                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+            session.execute(query)
+
+            query = SimpleStatement("""
+                UPDATE counter_table
+                SET counter_two = counter_two - 5
+                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
+            session.execute(query)
+
+            # update expectations to match (assumed) db state
+            counter[counter_id]['counter_one'] += 1
+            counter[counter_id]['counter_two'] += 5
+
+        # let's verify the counts are correct, using CL.ALL
+        for counter_dict in counters:
+            counter_id = list(counter_dict.keys())[0]
+
+            query = SimpleStatement("""
+                SELECT counter_one, counter_two
+                FROM counter_table WHERE id = {uuid}
+                """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL)
+            rows = list(session.execute(query))
+
+            counter_one_actual, counter_two_actual = rows[0]
+
+            assert counter_one_actual == counter_dict[counter_id]['counter_one']
+            assert counter_two_actual == counter_dict[counter_id]['counter_two']
+
+    def test_multi_counter_update(self):
+        """
+        Test for singlular update statements that will affect multiple counters.
+        """
+        cluster = self.cluster
+        cluster.populate(3).start()
+        node1, node2, node3 = cluster.nodelist()
+        session = self.patient_cql_connection(node1)
+        create_ks(session, 'counter_tests', 3)
+
+        session.execute("""
+            CREATE TABLE counter_table (
+            id text,
+            myuuid uuid,
+            counter_one COUNTER,
+            PRIMARY KEY (id, myuuid))
+            """)
+
+        expected_counts = {}
+
+        # set up expectations
+        for i in range(1, 6):
+            _id = uuid.uuid4()
+
+            expected_counts[_id] = i
+
+        for k, v in list(expected_counts.items()):
+            session.execute("""
+                UPDATE counter_table set counter_one = counter_one + {v}
+                WHERE id='foo' and myuuid = {k}
+                """.format(k=k, v=v))
+
+        for k, v in list(expected_counts.items()):
+            count = list(session.execute("""
+                SELECT counter_one FROM counter_table
+                WHERE id = 'foo' and myuuid = {k}
+                """.format(k=k)))
+
+            assert v == count[0][0]
+
+    @since("2.0", max_version="3.X")
+    def test_validate_empty_column_name(self):
+        cluster = self.cluster
+        cluster.populate(1).start()
+        node1 = cluster.nodelist()[0]
+        session = self.patient_cql_connection(node1)
+        create_ks(session, 'counter_tests', 1)
+
+        session.execute("""
+            CREATE TABLE compact_counter_table (
+                pk int,
+                ck text,
+                value counter,
+                PRIMARY KEY (pk, ck))
+            WITH COMPACT STORAGE
+            """)
+
+        assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''")
+        assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''")
+
+        session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'")
+        session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'")
+
+        assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3])
+
+    @since('2.0')
+    def test_drop_counter_column(self):
+        """Test for CASSANDRA-7831"""
+        cluster = self.cluster
+        cluster.populate(1).start()
+        node1, = cluster.nodelist()
+        session = self.patient_cql_connection(node1)
+        create_ks(session, 'counter_tests', 1)
+
+        session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")
+
+        session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
+        row = list(session.execute("SELECT * from counter_bug"))
+
+        assert rows_to_list(row)[0] == [1, 1]
+        assert len(row) == 1
+
+        session.execute("ALTER TABLE counter_bug drop c")
+
+        assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
+
+    @since("2.0", max_version="3.X") # Compact Storage
+    def test_compact_counter_cluster(self):
+        """
+        @jira_ticket CASSANDRA-12219
+        This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8
+        """
+        cluster = self.cluster
+        cluster.populate(3).start()
+        node1 = cluster.nodelist()[0]
+        session = self.patient_cql_connection(node1)
+        create_ks(session, 'counter_tests', 1)
+
+        session.execute("""
+            CREATE TABLE IF NOT EXISTS counter_cs (
+                key bigint PRIMARY KEY,
+                data counter
+            ) WITH COMPACT STORAGE
+            """)
+
+        for outer in range(0, 5):
+            for idx in range(0, 5):
+                session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx))
+
+        for idx in range(0, 5):
+            row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx)))
+            assert rows_to_list(row)[0][0] == 5

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/counter_tests.py
----------------------------------------------------------------------
diff --git a/counter_tests.py b/counter_tests.py
deleted file mode 100644
index 1de495d..0000000
--- a/counter_tests.py
+++ /dev/null
@@ -1,414 +0,0 @@
-import random
-import time
-import uuid
-
-from cassandra import ConsistencyLevel
-from cassandra.query import SimpleStatement
-
-from tools.assertions import assert_invalid, assert_length_equal, assert_one
-from dtest import Tester, create_ks, create_cf
-from tools.data import rows_to_list
-from tools.decorators import since
-
-
-class TestCounters(Tester):
-
-    @since('3.0', max_version='3.12')
-    def test_13691(self):
-        """
-        2.0 -> 2.1 -> 3.0 counters upgrade test
-        @jira_ticket CASSANDRA-13691
-        """
-        cluster = self.cluster
-        default_install_dir = cluster.get_install_dir()
-
-        #
-        # set up a 2.0 cluster with 3 nodes and set up schema
-        #
-
-        cluster.set_install_dir(version='2.0.17')
-        cluster.populate(3)
-        cluster.start()
-
-        node1, node2, node3 = cluster.nodelist()
-
-        session = self.patient_cql_connection(node1)
-        session.execute("""
-            CREATE KEYSPACE test
-                WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
-            """)
-        session.execute("CREATE TABLE test.test (id int PRIMARY KEY, c counter);")
-
-        #
-        # generate some 2.0 counter columns with local shards
-        #
-
-        query = "UPDATE test.test SET c = c + 1 WHERE id = ?"
-        prepared = session.prepare(query)
-        for i in range(0, 1000):
-            session.execute(prepared, [i])
-
-        cluster.flush()
-        cluster.stop()
-
-        #
-        # upgrade cluster to 2.1
-        #
-
-        cluster.set_install_dir(version='2.1.17')
-        cluster.start()
-        cluster.nodetool("upgradesstables")
-
-        #
-        # upgrade node3 to current (3.0.x or 3.11.x)
-        #
-
-        node3.stop(wait_other_notice=True)
-        node3.set_install_dir(install_dir=default_install_dir)
-        node3.start(wait_other_notice=True)
-
-        #
-        # with a 2.1 coordinator, try to read the table with CL.ALL
-        #
-
-        session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
-        assert_one(session, "SELECT COUNT(*) FROM test.test", [1000])
-
-    def counter_leader_with_partial_view_test(self):
-        """
-        Test leader election with a starting node.
-
-        Testing that nodes do not elect as mutation leader a node with a partial view on the cluster.
-        Note that byteman rules can be syntax checked via the following command:
-            sh ./bin/bytemancheck.sh -cp ~/path_to/apache-cassandra-3.0.14-SNAPSHOT.jar ~/path_to/rule.btm
-
-        @jira_ticket CASSANDRA-13043
-        """
-        cluster = self.cluster
-
-        cluster.populate(3, use_vnodes=True, install_byteman=True)
-        nodes = cluster.nodelist()
-        # Have node 1 and 3 cheat a bit during the leader election for a counter mutation; note that cheating
-        # takes place iff there is an actual chance for node 2 to be picked.
-        if cluster.version() < '4.0':
-            nodes[0].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
-            nodes[2].update_startup_byteman_script('./byteman/pre4.0/election_counter_leader_favor_node2.btm')
-        else:
-            nodes[0].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
-            nodes[2].update_startup_byteman_script('./byteman/4.0/election_counter_leader_favor_node2.btm')
-
-        cluster.start(wait_for_binary_proto=True)
-        session = self.patient_cql_connection(nodes[0])
-        create_ks(session, 'ks', 3)
-        create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
-
-        # Now stop the node and restart but first install a rule to slow down how fast node 2 will update the list
-        # nodes that are alive
-        nodes[1].stop(wait=True, wait_other_notice=False)
-        nodes[1].update_startup_byteman_script('./byteman/gossip_alive_callback_sleep.btm')
-        nodes[1].start(no_wait=True, wait_other_notice=False)
-
-        # Until node 2 is fully alive try to force other nodes to pick him as mutation leader.
-        # If CASSANDRA-13043 is fixed, they will not. Otherwise they will do, but since we are slowing down how
-        # fast node 2 updates the list of nodes that are alive, it will just have a partial view on the cluster
-        # and thus will raise an 'UnavailableException' exception.
-        nb_attempts = 50000
-        for i in xrange(0, nb_attempts):
-            # Change the name of the counter for the sake of randomization
-            q = SimpleStatement(
-                query_string="UPDATE ks.cf SET c = c + 1 WHERE key = 'counter_%d'" % i,
-                consistency_level=ConsistencyLevel.QUORUM
-            )
-            session.execute(q)
-
-    def simple_increment_test(self):
-        """ Simple incrementation test (Created for #3465, that wasn't a bug) """
-        cluster = self.cluster
-
-        cluster.populate(3).start()
-        nodes = cluster.nodelist()
-
-        session = self.patient_cql_connection(nodes[0])
-        create_ks(session, 'ks', 3)
-        create_cf(session, 'cf', validation="CounterColumnType", columns={'c': 'counter'})
-
-        sessions = [self.patient_cql_connection(node, 'ks') for node in nodes]
-        nb_increment = 50
-        nb_counter = 10
-
-        for i in xrange(0, nb_increment):
-            for c in xrange(0, nb_counter):
-                session = sessions[(i + c) % len(nodes)]
-                query = SimpleStatement("UPDATE cf SET c = c + 1 WHERE key = 'counter%i'" % c, consistency_level=ConsistencyLevel.QUORUM)
-                session.execute(query)
-
-            session = sessions[i % len(nodes)]
-            keys = ",".join(["'counter%i'" % c for c in xrange(0, nb_counter)])
-            query = SimpleStatement("SELECT key, c FROM cf WHERE key IN (%s)" % keys, consistency_level=ConsistencyLevel.QUORUM)
-            res = list(session.execute(query))
-
-            assert_length_equal(res, nb_counter)
-            for c in xrange(0, nb_counter):
-                self.assertEqual(len(res[c]), 2, "Expecting key and counter for counter {}, got {}".format(c, str(res[c])))
-                self.assertEqual(res[c][1], i + 1, "Expecting counter {} = {}, got {}".format(c, i + 1, res[c][0]))
-
-    def upgrade_test(self):
-        """ Test for bug of #4436 """
-
-        cluster = self.cluster
-
-        cluster.populate(2).start()
-        nodes = cluster.nodelist()
-
-        session = self.patient_cql_connection(nodes[0])
-        create_ks(session, 'ks', 2)
-
-        query = """
-            CREATE TABLE counterTable (
-                k int PRIMARY KEY,
-                c counter
-            )
-        """
-        query = query + "WITH compression = { 'sstable_compression' : 'SnappyCompressor' }"
-
-        session.execute(query)
-        time.sleep(2)
-
-        keys = range(0, 4)
-        updates = 50
-
-        def make_updates():
-            session = self.patient_cql_connection(nodes[0], keyspace='ks')
-            upd = "UPDATE counterTable SET c = c + 1 WHERE k = %d;"
-            batch = " ".join(["BEGIN COUNTER BATCH"] + [upd % x for x in keys] + ["APPLY BATCH;"])
-
-            for i in range(0, updates):
-                query = SimpleStatement(batch, consistency_level=ConsistencyLevel.QUORUM)
-                session.execute(query)
-
-        def check(i):
-            session = self.patient_cql_connection(nodes[0], keyspace='ks')
-            query = SimpleStatement("SELECT * FROM counterTable", consistency_level=ConsistencyLevel.QUORUM)
-            rows = list(session.execute(query))
-
-            self.assertEqual(len(rows), len(keys), "Expected {} rows, got {}: {}".format(len(keys), len(rows), str(rows)))
-            for row in rows:
-                self.assertEqual(row[1], i * updates, "Unexpected value {}".format(str(row)))
-
-        def rolling_restart():
-            # Rolling restart
-            for i in range(0, 2):
-                time.sleep(.2)
-                nodes[i].nodetool("drain")
-                nodes[i].stop(wait_other_notice=False)
-                nodes[i].start(wait_other_notice=True, wait_for_binary_proto=True)
-                time.sleep(.2)
-
-        make_updates()
-        check(1)
-        rolling_restart()
-
-        make_updates()
-        check(2)
-        rolling_restart()
-
-        make_updates()
-        check(3)
-        rolling_restart()
-
-        check(3)
-
-    def counter_consistency_test(self):
-        """
-        Do a bunch of writes with ONE, read back with ALL and check results.
-        """
-        cluster = self.cluster
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
-        session = self.patient_cql_connection(node1)
-        create_ks(session, 'counter_tests', 3)
-
-        stmt = """
-              CREATE TABLE counter_table (
-              id uuid PRIMARY KEY,
-              counter_one COUNTER,
-              counter_two COUNTER,
-              )
-           """
-        session.execute(stmt)
-
-        counters = []
-        # establish 50 counters (2x25 rows)
-        for i in xrange(25):
-            _id = str(uuid.uuid4())
-            counters.append(
-                {_id: {'counter_one': 1, 'counter_two': 1}}
-            )
-
-            query = SimpleStatement("""
-                UPDATE counter_table
-                SET counter_one = counter_one + 1, counter_two = counter_two + 1
-                where id = {uuid}""".format(uuid=_id), consistency_level=ConsistencyLevel.ONE)
-            session.execute(query)
-
-        # increment a bunch of counters with CL.ONE
-        for i in xrange(10000):
-            counter = counters[random.randint(0, len(counters) - 1)]
-            counter_id = counter.keys()[0]
-
-            query = SimpleStatement("""
-                UPDATE counter_table
-                SET counter_one = counter_one + 2
-                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
-            session.execute(query)
-
-            query = SimpleStatement("""
-                UPDATE counter_table
-                SET counter_two = counter_two + 10
-                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
-            session.execute(query)
-
-            query = SimpleStatement("""
-                UPDATE counter_table
-                SET counter_one = counter_one - 1
-                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
-            session.execute(query)
-
-            query = SimpleStatement("""
-                UPDATE counter_table
-                SET counter_two = counter_two - 5
-                where id = {uuid}""".format(uuid=counter_id), consistency_level=ConsistencyLevel.ONE)
-            session.execute(query)
-
-            # update expectations to match (assumed) db state
-            counter[counter_id]['counter_one'] += 1
-            counter[counter_id]['counter_two'] += 5
-
-        # let's verify the counts are correct, using CL.ALL
-        for counter_dict in counters:
-            counter_id = counter_dict.keys()[0]
-
-            query = SimpleStatement("""
-                SELECT counter_one, counter_two
-                FROM counter_table WHERE id = {uuid}
-                """.format(uuid=counter_id), consistency_level=ConsistencyLevel.ALL)
-            rows = list(session.execute(query))
-
-            counter_one_actual, counter_two_actual = rows[0]
-
-            self.assertEqual(counter_one_actual, counter_dict[counter_id]['counter_one'])
-            self.assertEqual(counter_two_actual, counter_dict[counter_id]['counter_two'])
-
-    def multi_counter_update_test(self):
-        """
-        Test for singlular update statements that will affect multiple counters.
-        """
-        cluster = self.cluster
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
-        session = self.patient_cql_connection(node1)
-        create_ks(session, 'counter_tests', 3)
-
-        session.execute("""
-            CREATE TABLE counter_table (
-            id text,
-            myuuid uuid,
-            counter_one COUNTER,
-            PRIMARY KEY (id, myuuid))
-            """)
-
-        expected_counts = {}
-
-        # set up expectations
-        for i in range(1, 6):
-            _id = uuid.uuid4()
-
-            expected_counts[_id] = i
-
-        for k, v in expected_counts.items():
-            session.execute("""
-                UPDATE counter_table set counter_one = counter_one + {v}
-                WHERE id='foo' and myuuid = {k}
-                """.format(k=k, v=v))
-
-        for k, v in expected_counts.items():
-            count = list(session.execute("""
-                SELECT counter_one FROM counter_table
-                WHERE id = 'foo' and myuuid = {k}
-                """.format(k=k)))
-
-            self.assertEqual(v, count[0][0])
-
-    @since("2.0", max_version="3.X")
-    def validate_empty_column_name_test(self):
-        cluster = self.cluster
-        cluster.populate(1).start()
-        node1 = cluster.nodelist()[0]
-        session = self.patient_cql_connection(node1)
-        create_ks(session, 'counter_tests', 1)
-
-        session.execute("""
-            CREATE TABLE compact_counter_table (
-                pk int,
-                ck text,
-                value counter,
-                PRIMARY KEY (pk, ck))
-            WITH COMPACT STORAGE
-            """)
-
-        assert_invalid(session, "UPDATE compact_counter_table SET value = value + 1 WHERE pk = 0 AND ck = ''")
-        assert_invalid(session, "UPDATE compact_counter_table SET value = value - 1 WHERE pk = 0 AND ck = ''")
-
-        session.execute("UPDATE compact_counter_table SET value = value + 5 WHERE pk = 0 AND ck = 'ck'")
-        session.execute("UPDATE compact_counter_table SET value = value - 2 WHERE pk = 0 AND ck = 'ck'")
-
-        assert_one(session, "SELECT pk, ck, value FROM compact_counter_table", [0, 'ck', 3])
-
-    @since('2.0')
-    def drop_counter_column_test(self):
-        """Test for CASSANDRA-7831"""
-        cluster = self.cluster
-        cluster.populate(1).start()
-        node1, = cluster.nodelist()
-        session = self.patient_cql_connection(node1)
-        create_ks(session, 'counter_tests', 1)
-
-        session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")
-
-        session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
-        row = list(session.execute("SELECT * from counter_bug"))
-
-        self.assertEqual(rows_to_list(row)[0], [1, 1])
-        self.assertEqual(len(row), 1)
-
-        session.execute("ALTER TABLE counter_bug drop c")
-
-        assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
-
-    @since("2.0", max_version="3.X")  # Compact Storage
-    def compact_counter_cluster_test(self):
-        """
-        @jira_ticket CASSANDRA-12219
-        This test will fail on 3.0.0 - 3.0.8, and 3.1 - 3.8
-        """
-
-        cluster = self.cluster
-        cluster.populate(3).start()
-        node1 = cluster.nodelist()[0]
-        session = self.patient_cql_connection(node1)
-        create_ks(session, 'counter_tests', 1)
-
-        session.execute("""
-            CREATE TABLE IF NOT EXISTS counter_cs (
-                key bigint PRIMARY KEY,
-                data counter
-            ) WITH COMPACT STORAGE
-            """)
-
-        for outer in range(0, 5):
-            for idx in range(0, 5):
-                session.execute("UPDATE counter_cs SET data = data + 1 WHERE key = {k}".format(k=idx))
-
-        for idx in range(0, 5):
-            row = list(session.execute("SELECT data from counter_cs where key = {k}".format(k=idx)))
-            self.assertEqual(rows_to_list(row)[0][0], 5)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/cql_prepared_test.py
----------------------------------------------------------------------
diff --git a/cql_prepared_test.py b/cql_prepared_test.py
index 0dfe6f0..c039b90 100644
--- a/cql_prepared_test.py
+++ b/cql_prepared_test.py
@@ -1,7 +1,11 @@
 import time
+import pytest
+import logging
 
 from dtest import Tester, create_ks
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 @since("1.2")
@@ -18,7 +22,7 @@ class TestCQL(Tester):
         create_ks(session, 'ks', 1)
         return session
 
-    def batch_preparation_test(self):
+    def test_batch_preparation(self):
         """ Test preparation of batch statement (#4202) """
         session = self.prepare()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org