You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/29 21:10:22 UTC

[19/36] cassandra-dtest git commit: Migrate dtests to use pytest and python3

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/replication_test.py
----------------------------------------------------------------------
diff --git a/replication_test.py b/replication_test.py
index edc6fc0..8c29074 100644
--- a/replication_test.py
+++ b/replication_test.py
@@ -1,14 +1,18 @@
 import os
 import re
 import time
+import pytest
+import logging
+
 from collections import defaultdict
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
-from nose.plugins.attrib import attr
 
-from dtest import PRINT_DEBUG, DtestTimeoutError, Tester, debug, create_ks
-from tools.decorators import no_vnodes, since
+from dtest import DtestTimeoutError, Tester, create_ks
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 TRACE_DETERMINE_REPLICAS = re.compile('Determining replicas for mutation')
 TRACE_SEND_MESSAGE = re.compile('Sending (?:MUTATION|REQUEST_RESPONSE) message to /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)')
@@ -73,8 +77,8 @@ def block_on_trace(session):
             raise DtestTimeoutError()
 
 
-@no_vnodes()
-class ReplicationTest(Tester):
+@pytest.mark.no_vnodes
+class TestReplication(Tester):
     """
     This test suite looks at how data is replicated across a cluster
     and who the coordinator, replicas and forwarders involved are.
@@ -176,7 +180,7 @@ class ReplicationTest(Tester):
         elif strategy == 'NetworkTopologyStrategy':
             # NetworkTopologyStrategy can be broken down into multiple
             # SimpleStrategies, just once per datacenter:
-            for dc, rf in replication_factor.items():
+            for dc, rf in list(replication_factor.items()):
                 dc_nodes = [n for n in nodes if n.data_center == dc]
                 replicas.extend(self.get_replicas_for_token(
                     token, rf, nodes=dc_nodes))
@@ -190,13 +194,13 @@ class ReplicationTest(Tester):
         """
         Pretty print a trace
         """
-        if PRINT_DEBUG:
-            print("-" * 40)
+        if logging.root.level == logging.DEBUG:
+            print(("-" * 40))
             for t in trace.events:
-                print("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name))
-            print("-" * 40)
+                print(("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name)))
+            print(("-" * 40))
 
-    def simple_test(self):
+    def test_simple(self):
         """
         Test the SimpleStrategy on a 3 node cluster
         """
@@ -209,8 +213,8 @@ class ReplicationTest(Tester):
         create_ks(session, 'test', replication_factor)
         session.execute('CREATE TABLE test.test (id int PRIMARY KEY, value text)', trace=False)
 
-        for key, token in murmur3_hashes.items():
-            debug('murmur3 hash key={key},token={token}'.format(key=key, token=token))
+        for key, token in list(murmur3_hashes.items()):
+            logger.debug('murmur3 hash key={key},token={token}'.format(key=key, token=token))
             query = SimpleStatement("INSERT INTO test (id, value) VALUES ({}, 'asdf')".format(key), consistency_level=ConsistencyLevel.ALL)
             future = session.execute_async(query, trace=True)
             future.result()
@@ -222,17 +226,17 @@ class ReplicationTest(Tester):
             stats = self.get_replicas_from_trace(trace)
             replicas_should_be = set(self.get_replicas_for_token(
                 token, replication_factor))
-            debug('\nreplicas should be: %s' % replicas_should_be)
-            debug('replicas were: %s' % stats['replicas'])
+            logger.debug('\nreplicas should be: %s' % replicas_should_be)
+            logger.debug('replicas were: %s' % stats['replicas'])
 
             # Make sure the correct nodes are replicas:
-            self.assertEqual(stats['replicas'], replicas_should_be)
+            assert stats['replicas'] == replicas_should_be
             # Make sure that each replica node was contacted and
             # acknowledged the write:
-            self.assertEqual(stats['nodes_sent_write'], stats['nodes_responded_write'])
+            assert stats['nodes_sent_write'] == stats['nodes_responded_write']
 
-    @attr("resource-intensive")
-    def network_topology_test(self):
+    @pytest.mark.resource_intensive
+    def test_network_topology(self):
         """
         Test the NetworkTopologyStrategy on a 2DC 3:3 node cluster
         """
@@ -248,7 +252,7 @@ class ReplicationTest(Tester):
 
         forwarders_used = set()
 
-        for key, token in murmur3_hashes.items():
+        for key, token in list(murmur3_hashes.items()):
             query = SimpleStatement("INSERT INTO test (id, value) VALUES ({}, 'asdf')".format(key), consistency_level=ConsistencyLevel.ALL)
             future = session.execute_async(query, trace=True)
             future.result()
@@ -260,9 +264,9 @@ class ReplicationTest(Tester):
             stats = self.get_replicas_from_trace(trace)
             replicas_should_be = set(self.get_replicas_for_token(
                 token, replication_factor, strategy='NetworkTopologyStrategy'))
-            debug('Current token is %s' % token)
-            debug('\nreplicas should be: %s' % replicas_should_be)
-            debug('replicas were: %s' % stats['replicas'])
+            logger.debug('Current token is %s' % token)
+            logger.debug('\nreplicas should be: %s' % replicas_should_be)
+            logger.debug('replicas were: %s' % stats['replicas'])
 
             # Make sure the coordinator only talked to a single node in
             # the second datacenter - CASSANDRA-5632:
@@ -270,27 +274,27 @@ class ReplicationTest(Tester):
             for node_contacted in stats['nodes_contacted'][node1.address()]:
                 if ip_nodes[node_contacted].data_center != node1.data_center:
                     num_in_other_dcs_contacted += 1
-            self.assertEqual(num_in_other_dcs_contacted, 1)
+            assert num_in_other_dcs_contacted == 1
 
             # Record the forwarder used for each INSERT:
             forwarders_used = forwarders_used.union(stats['forwarders'])
 
             try:
                 # Make sure the correct nodes are replicas:
-                self.assertEqual(stats['replicas'], replicas_should_be)
+                assert stats['replicas'] == replicas_should_be
                 # Make sure that each replica node was contacted and
                 # acknowledged the write:
-                self.assertEqual(stats['nodes_sent_write'], stats['nodes_responded_write'])
+                assert stats['nodes_sent_write'] == stats['nodes_responded_write']
             except AssertionError as e:
-                debug("Failed on key %s and token %s." % (key, token))
+                logger.debug("Failed on key %s and token %s." % (key, token))
                 raise e
 
         # Given a diverse enough keyset, each node in the second
         # datacenter should get a chance to be a forwarder:
-        self.assertEqual(len(forwarders_used), 3)
+        assert len(forwarders_used) == 3
 
 
-class SnitchConfigurationUpdateTest(Tester):
+class TestSnitchConfigurationUpdate(Tester):
     """
     Test to reproduce CASSANDRA-10238, wherein changing snitch properties to change racks without a restart
     could violate RF contract.
@@ -299,27 +303,31 @@ class SnitchConfigurationUpdateTest(Tester):
     which nodes should be shutdown in order to have the rack changed.
     """
 
-    ignore_log_patterns = ["Fatal exception during initialization",
-                           "Cannot start node if snitch's rack(.*) differs from previous rack(.*)",
-                           "Cannot update data center or rack"]
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            "Fatal exception during initialization",
+            "Cannot start node if snitch's rack(.*) differs from previous rack(.*)",
+            "Cannot update data center or rack"
+        )
 
     def check_endpoint_count(self, ks, table, nodes, rf):
         """
         Check a dummy key expecting it to have replication factor as the sum of rf on all dcs.
         """
-        expected_count = sum([int(r) for d, r in rf.iteritems() if d != 'class'])
+        expected_count = sum([int(r) for d, r in rf.items() if d != 'class'])
         for node in nodes:
             cmd = "getendpoints {} {} dummy".format(ks, table)
             out, err, _ = node.nodetool(cmd)
 
             if len(err.strip()) > 0:
-                debug("Error running 'nodetool {}': {}".format(cmd, err))
+                logger.debug("Error running 'nodetool {}': {}".format(cmd, err))
 
-            debug("Endpoints for node {}, expected count is {}".format(node.address(), expected_count))
-            debug(out)
+            logger.debug("Endpoints for node {}, expected count is {}".format(node.address(), expected_count))
+            logger.debug(out)
             ips_found = re.findall('(\d+\.\d+\.\d+\.\d+)', out)
 
-            self.assertEqual(len(ips_found), expected_count, "wrong number of endpoints found ({}), should be: {}".format(len(ips_found), expected_count))
+            assert len(ips_found) == expected_count, "wrong number of endpoints found ({}), should be: {}".format(len(ips_found), expected_count)
 
     def wait_for_nodes_on_racks(self, nodes, expected_racks):
         """
@@ -331,9 +339,9 @@ class SnitchConfigurationUpdateTest(Tester):
             while time.time() < wait_expire:
                 out, err, _ = node.nodetool("status")
 
-                debug(out)
+                logger.debug(out)
                 if len(err.strip()) > 0:
-                    debug("Error trying to run nodetool status: {}".format(err))
+                    logger.debug("Error trying to run nodetool status: {}".format(err))
 
                 racks = []
                 for line in out.split(os.linesep):
@@ -343,10 +351,10 @@ class SnitchConfigurationUpdateTest(Tester):
 
                 if racks == expected_racks:
                     # great, the topology change is propagated
-                    debug("Topology change detected on node {}".format(i))
+                    logger.debug("Topology change detected on node {}".format(i))
                     break
                 else:
-                    debug("Waiting for topology change on node {}".format(i))
+                    logger.debug("Waiting for topology change on node {}".format(i))
                     time.sleep(5)
             else:
                 raise RuntimeError("Ran out of time waiting for topology to change on node {}".format(i))
@@ -383,7 +391,7 @@ class SnitchConfigurationUpdateTest(Tester):
                                        final_racks=["rack0", "rack1", "rack2"],
                                        nodes_to_shutdown=[0, 2])
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_rf_collapse_gossiping_property_file_snitch_multi_dc(self):
         """
         @jira_ticket CASSANDRA-10238
@@ -400,7 +408,7 @@ class SnitchConfigurationUpdateTest(Tester):
                                        final_racks=["rack1", "rack1", "rack1", "rack1", "rack1", "rack1"],
                                        nodes_to_shutdown=[0, 2, 3, 5])
 
-    @attr("resource-intensive")
+    @pytest.mark.resource_intensive
     def test_rf_expand_gossiping_property_file_snitch_multi_dc(self):
         """
         @jira_ticket CASSANDRA-10238
@@ -537,7 +545,7 @@ class SnitchConfigurationUpdateTest(Tester):
 
         session = self.patient_cql_connection(cluster.nodelist()[0])
 
-        options = (', ').join(['\'{}\': {}'.format(d, r) for d, r in rf.iteritems()])
+        options = (', ').join(['\'{}\': {}'.format(d, r) for d, r in rf.items()])
         session.execute("CREATE KEYSPACE testing WITH replication = {{{}}}".format(options))
         session.execute("CREATE TABLE testing.rf_test (key text PRIMARY KEY, value text)")
 
@@ -548,10 +556,10 @@ class SnitchConfigurationUpdateTest(Tester):
 
         for i in nodes_to_shutdown:
             node = cluster.nodelist()[i]
-            debug("Shutting down node {}".format(node.address()))
+            logger.debug("Shutting down node {}".format(node.address()))
             node.stop(wait_other_notice=True)
 
-        debug("Updating snitch file")
+        logger.debug("Updating snitch file")
         for i, node in enumerate(cluster.nodelist()):
             with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
                 for line in snitch_lines_after(i, node):
@@ -559,12 +567,12 @@ class SnitchConfigurationUpdateTest(Tester):
 
         # wait until the config is reloaded before we restart the nodes, the default check period is
         # 5 seconds so we wait for 10 seconds to be sure
-        debug("Waiting 10 seconds to make sure snitch file is reloaded...")
+        logger.debug("Waiting 10 seconds to make sure snitch file is reloaded...")
         time.sleep(10)
 
         for i in nodes_to_shutdown:
             node = cluster.nodelist()[i]
-            debug("Restarting node {}".format(node.address()))
+            logger.debug("Restarting node {}".format(node.address()))
             # Since CASSANDRA-10242 it is no longer
             # possible to start a node with a different rack unless we specify -Dcassandra.ignore_rack and since
             # CASSANDRA-9474 it is no longer possible to start a node with a different dc unless we specify
@@ -594,24 +602,24 @@ class SnitchConfigurationUpdateTest(Tester):
             for line in ["dc={}".format(node1.data_center), "rack=rack1"]:
                 topo_file.write(line + os.linesep)
 
-        debug("Starting node {} with rack1".format(node1.address()))
+        logger.debug("Starting node {} with rack1".format(node1.address()))
         node1.start(wait_for_binary_proto=True)
 
-        debug("Shutting down node {}".format(node1.address()))
+        logger.debug("Shutting down node {}".format(node1.address()))
         node1.stop(wait_other_notice=True)
 
-        debug("Updating snitch file with rack2")
+        logger.debug("Updating snitch file with rack2")
         for node in cluster.nodelist():
             with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as topo_file:
                 for line in ["dc={}".format(node.data_center), "rack=rack2"]:
                     topo_file.write(line + os.linesep)
 
-        debug("Restarting node {} with rack2".format(node1.address()))
+        logger.debug("Restarting node {} with rack2".format(node1.address()))
         mark = node1.mark_log()
         node1.start()
 
         # check node not running
-        debug("Waiting for error message in log file")
+        logger.debug("Waiting for error message in log file")
 
         if cluster.version() >= '2.2':
             node1.watch_log_for("Cannot start node if snitch's rack(.*) differs from previous rack(.*)",
@@ -696,7 +704,7 @@ class SnitchConfigurationUpdateTest(Tester):
 
         marks = [node.mark_log() for node in cluster.nodelist()]
 
-        debug("Updating snitch file")
+        logger.debug("Updating snitch file")
         for node in cluster.nodelist():
             with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
                 for line in snitch_lines_after:
@@ -704,7 +712,7 @@ class SnitchConfigurationUpdateTest(Tester):
 
         # wait until the config is reloaded, the default check period is
         # 5 seconds so we wait for 10 seconds to be sure
-        debug("Waiting 10 seconds to make sure snitch file is reloaded...")
+        logger.debug("Waiting 10 seconds to make sure snitch file is reloaded...")
         time.sleep(10)
 
         # check racks have not changed
@@ -723,7 +731,7 @@ class SnitchConfigurationUpdateTest(Tester):
         """
         expected_error = (r"Cannot start node if snitch's data center (.*) differs from previous data center (.*)\. "
                           "Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_dc=true.")
-        self.ignore_log_patterns = [expected_error]
+        self.fixture_dtest_setup.ignore_log_patterns = [expected_error]
 
         cluster = self.cluster
         cluster.populate(1)
@@ -744,4 +752,4 @@ class SnitchConfigurationUpdateTest(Tester):
 
         mark = node.mark_log()
         node.start()
-        node.watch_log_for(expected_error, from_mark=mark, timeout=10)
+        node.watch_log_for(expected_error, from_mark=mark, timeout=120)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index bf46e38..388a8a9 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,21 +1,18 @@
-# See python driver docs: futures and six have to be installed before
-# cythonizing the driver, perhaps only on old pips.
-# http://datastax.github.io/python-driver/installation.html#cython-based-extensions
-futures
-six
 -e git+https://github.com/datastax/python-driver.git@cassandra-test#egg=cassandra-driver
 # Used ccm version is tracked by cassandra-test branch in ccm repo. Please create a PR there for fixes or upgrades to new releases.
--e git+https://github.com/pcmanus/ccm.git@cassandra-test#egg=ccm
-cql
+-e git+https://github.com/riptano/ccm.git@cassandra-test#egg=ccm
+cqlsh
 decorator
 docopt
 enum34
 flaky
 mock
-nose
-nose-test-select
+pytest
+pytest-timeout
 parse
 pycodestyle
 psutil
-pycassa
-thrift==0.9.3
+thrift==0.10.0
+netifaces
+beautifulsoup4
+lxml

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/run_dtests.py
----------------------------------------------------------------------
diff --git a/run_dtests.py b/run_dtests.py
index e7165af..198cde2 100755
--- a/run_dtests.py
+++ b/run_dtests.py
@@ -1,264 +1,280 @@
 #!/usr/bin/env python
 """
-Usage: run_dtests.py [--nose-options NOSE_OPTIONS] [TESTS...] [--vnodes VNODES_OPTIONS...]
-                 [--runner-debug | --runner-quiet] [--dry-run]
-
-nosetests options:
-    --nose-options NOSE_OPTIONS  specify options to pass to `nosetests`.
-    TESTS                        space-separated list of tests to pass to `nosetests`
-
-script configuration options:
-    --runner-debug -d            print debug statements in this script
-    --runner-quiet -q            quiet all output from this script
-
-cluster configuration options:
-    --vnodes VNODES_OPTIONS...   specify whether to run with or without vnodes.
-                                 valid values: 'true' and 'false'
-
-example:
-    The following command will execute nosetests with the '-v' (verbose) option, vnodes disabled, and run a single test:
-    ./run_dtests.py --nose-options -v --vnodes false repair_tests/repair_test.py:TestRepair.token_range_repair_test_with_cf
-
+usage: run_dtests.py [-h] [--use-vnodes] [--use-off-heap-memtables] [--num-tokens NUM_TOKENS] [--data-dir-count-per-instance DATA_DIR_COUNT_PER_INSTANCE] [--force-resource-intensive-tests]
+                     [--skip-resource-intensive-tests] [--cassandra-dir CASSANDRA_DIR] [--cassandra-version CASSANDRA_VERSION] [--delete-logs] [--execute-upgrade-tests] [--disable-active-log-watching]
+                     [--keep-test-dir] [--enable-jacoco-code-coverage] [--dtest-enable-debug-logging] [--dtest-print-tests-only] [--dtest-print-tests-output DTEST_PRINT_TESTS_OUTPUT]
+                     [--pytest-options PYTEST_OPTIONS] [--dtest-tests DTEST_TESTS]
+
+optional arguments:
+  -h, --help                                                 show this help message and exit
+  --use-vnodes                                               Determines wither or not to setup clusters using vnodes for tests (default: False)
+  --use-off-heap-memtables                                   Enable Off Heap Memtables when creating test clusters for tests (default: False)
+  --num-tokens NUM_TOKENS                                    Number of tokens to set num_tokens yaml setting to when creating instances with vnodes enabled (default: 256)
+  --data-dir-count-per-instance DATA_DIR_COUNT_PER_INSTANCE  Control the number of data directories to create per instance (default: 3)
+  --force-resource-intensive-tests                           Forces the execution of tests marked as resource_intensive (default: False)
+  --skip-resource-intensive-tests                            Skip all tests marked as resource_intensive (default: False)
+  --cassandra-dir CASSANDRA_DIR
+  --cassandra-version CASSANDRA_VERSION
+  --delete-logs
+  --execute-upgrade-tests                                    Execute Cassandra Upgrade Tests (e.g. tests annotated with the upgrade_test mark) (default: False)
+  --disable-active-log-watching                              Disable ccm active log watching, which will cause dtests to check for errors in the logs in a single operation instead of semi-realtime
+                                                             processing by consuming ccm _log_error_handler callbacks (default: False)
+  --keep-test-dir                                            Do not remove/cleanup the test ccm cluster directory and it's artifacts after the test completes (default: False)
+  --enable-jacoco-code-coverage                              Enable JaCoCo Code Coverage Support (default: False)
+  --dtest-enable-debug-logging                               Enable debug logging (for this script, pytest, and during execution of test functions) (default: False)
+  --dtest-print-tests-only                                   Print list of all tests found eligible for execution given the provided options. (default: False)
+  --dtest-print-tests-output DTEST_PRINT_TESTS_OUTPUT        Path to file where the output of --dtest-print-tests-only should be written to (default: False)
+  --pytest-options PYTEST_OPTIONS                            Additional command line arguments to proxy directly thru when invoking pytest. (default: None)
+  --dtest-tests DTEST_TESTS                                  Comma separated list of test files, test classes, or test methods to execute. (default: None)
 """
-from __future__ import print_function
-
 import subprocess
 import sys
 import os
-from collections import namedtuple
-from itertools import product
-from os import getcwd, environ
-from tempfile import NamedTemporaryFile
-
-from docopt import docopt
-
-from plugins.dtestconfig import GlobalConfigObject
-
-
-# Generate values in a matrix from these lists of values for each attribute
-# not defined in arguments to the runner script.
-default_config_matrix = GlobalConfigObject(
-    vnodes=(True, False),
-)
-
+import re
+import logging
 
-def _noop(*args, **kwargs):
-    pass
-
-
-class ValidationResult(namedtuple('_ValidationResult', ['serialized', 'error_messages'])):
-    """
-    A value to be returned from validation functions. If serialization works,
-    return one with 'serialized' set, otherwise return a list of string on the
-    'error_messages' attribute.
-    """
-    __slots__ = ()
+from os import getcwd
+from tempfile import NamedTemporaryFile
+from bs4 import BeautifulSoup
 
-    def __new__(cls, serialized=None, error_messages=None):
-        if error_messages is None:
-            error_messages = []
+from _pytest.config import Parser
+import argparse
 
-        success_result = serialized is not None
-        failure_result = bool(error_messages)
+from conftest import pytest_addoption
 
-        if success_result + failure_result != 1:
-            msg = ('attempted to instantiate a {cls_name} with serialized='
-                   '{serialized} and error_messages={error_messages}. {cls_name} '
-                   'objects must be instantiated with either a serialized or '
-                   'error_messages argument, but not both.')
-            msg = msg.format(cls_name=cls.__name__,
-                             serialized=serialized,
-                             error_messages=error_messages)
-            raise ValueError(msg)
+logger = logging.getLogger(__name__)
 
-        return super(ValidationResult, cls).__new__(cls, serialized=serialized, error_messages=error_messages)
 
+class RunDTests():
+    def run(self, argv):
+        parser = argparse.ArgumentParser(formatter_class=lambda prog: argparse.ArgumentDefaultsHelpFormatter(prog,
+                                                                                                             max_help_position=100,
+                                                                                                             width=200))
 
-def _validate_and_serialize_vnodes(vnodes_value):
-    """
-    Validate the values received for vnodes configuration. Returns a
-    ValidationResult.
+        # this is a bit ugly: all of our command line arguments are added and configured as part
+        # of pytest. however, we also have this wrapper script to make it easier for those who
+        # aren't comfortable calling pytest directly. To avoid duplicating code (e.g. have the options
+        # in two separate places) we directly use the pytest_addoption fixture from conftest.py. Unfortunately,
+        # pytest wraps ArgumentParser, so, first we add the options to a pytest Parser, and then we pull
+        # all of those custom options out and add them to the unwrapped ArgumentParser we want to use
+        # here inside of run_dtests.py.
+        #
+        # So NOTE: to add a command line argument, if you're trying to do so by adding it here, you're doing it wrong!
+        # add it to conftest.py:pytest_addoption
+        pytest_parser = Parser()
+        pytest_addoption(pytest_parser)
+
+        # add all of the options from the pytest Parser we created, and add them into our ArgumentParser instance
+        pytest_custom_opts = pytest_parser._anonymous
+        for opt in pytest_custom_opts.options:
+            parser.add_argument(opt._long_opts[0], action=opt._attrs['action'],
+                                default=opt._attrs.get('default', None),
+                                help=opt._attrs.get('help', None))
+
+        parser.add_argument("--dtest-enable-debug-logging", action="store_true", default=False,
+                            help="Enable debug logging (for this script, pytest, and during execution "
+                                 "of test functions)")
+        parser.add_argument("--dtest-print-tests-only", action="store_true", default=False,
+                            help="Print list of all tests found eligible for execution given the provided options.")
+        parser.add_argument("--dtest-print-tests-output", action="store", default=False,
+                            help="Path to file where the output of --dtest-print-tests-only should be written to")
+        parser.add_argument("--pytest-options", action="store", default=None,
+                            help="Additional command line arguments to proxy directly thru when invoking pytest.")
+        parser.add_argument("--dtest-tests", action="store", default=None,
+                            help="Comma separated list of test files, test classes, or test methods to execute.")
+
+        args = parser.parse_args()
+
+        if not args.dtest_print_tests_only and args.cassandra_dir is None:
+            if args.cassandra_version is None:
+                raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir "
+                                "or --cassandra-version. Refer to the documentation or invoke the help with --help.")
+
+        if args.dtest_enable_debug_logging:
+            logging.root.setLevel(logging.DEBUG)
+            logger.setLevel(logging.DEBUG)
+
+        # Get dictionaries corresponding to each point in the configuration matrix
+        # we want to run, then generate a config object for each of them.
+        logger.debug('Generating configurations from the following matrix:\n\t{}'.format(args))
+
+        args_to_invoke_pytest = []
+        if args.pytest_options:
+            for arg in args.pytest_options.split(" "):
+                args_to_invoke_pytest.append("'{the_arg}'".format(the_arg=arg))
+
+        for arg in argv:
+            if arg.startswith("--pytest-options") or arg.startswith("--dtest-"):
+                continue
+            args_to_invoke_pytest.append("'{the_arg}'".format(the_arg=arg))
+
+        if args.dtest_print_tests_only:
+            args_to_invoke_pytest.append("'--collect-only'")
+
+        if args.dtest_tests:
+            for test in args.dtest_tests.split(","):
+                args_to_invoke_pytest.append("'{test_name}'".format(test_name=test))
+
+        original_raw_cmd_args = ", ".join(args_to_invoke_pytest)
+
+        logger.debug("args to call with: [%s]" % original_raw_cmd_args)
+
+        # the original run_dtests.py script did it like this to hack around nosetest
+        # limitations -- i'm not sure if they still apply or not in a pytest world
+        # but for now just leaving it as is, because it does the job (although
+        # certainly is still pretty complicated code and has a hacky feeling)
+        to_execute = (
+                "import pytest\n" +
+                (
+                "pytest.main([{options}])\n").format(options=original_raw_cmd_args)
+        )
+        temp = NamedTemporaryFile(dir=getcwd())
+        logger.debug('Writing the following to {}:'.format(temp.name))
 
-    If the values validate, return a ValidationResult with 'serialized' set to
-    the equivalent of:
+        logger.debug('```\n{to_execute}```\n'.format(to_execute=to_execute))
+        temp.write(to_execute.encode("utf-8"))
+        temp.flush()
 
-        tuple(set({'true': True, 'false':False}[v.lower()] for v in vnodes_value))
+        # We pass nose_argv as options to the python call to maintain
+        # compatibility with the nosetests command. Arguments passed in via the
+        # command line are treated one way, args passed in as
+        # nose.main(argv=...) are treated another. Compare with the options
+        # -xsv for an example.
+        cmd_list = [sys.executable, temp.name]
+        logger.debug('subprocess.call-ing {cmd_list}'.format(cmd_list=cmd_list))
 
-    If the values don't validate, return a ValidationResult with 'messages' set
-    to a list of strings, each of which points out an invalid value.
-    """
-    messages = []
-    vnodes_value = set(v.lower() for v in vnodes_value)
-    value_map = {'true': True, 'false': False}
+        sp = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ.copy())
 
-    for v in vnodes_value:
-        if v not in value_map:
-            messages.append('{} not a valid value for --vnodes option. '
-                            'valid values are {} (case-insensitive)'.format(v, ', '.join(list(value_map))))
+        if args.dtest_print_tests_only:
+            stdout, stderr = sp.communicate()
 
-    if messages:
-        return ValidationResult(error_messages=messages)
+            if stderr:
+                print(stderr.decode("utf-8"))
+                result = sp.returncode
+                exit(result)
 
-    serialized = tuple({value_map[v] for v in vnodes_value})
-    return ValidationResult(serialized=serialized)
+            all_collected_test_modules = collect_test_modules(stdout)
+            joined_test_modules = "\n".join(all_collected_test_modules)
+            #print("Collected %d Test Modules" % len(all_collected_test_modules))
+            if args.dtest_print_tests_output is not None:
+                collected_tests_output_file = open(args.dtest_print_tests_output, "w")
+                collected_tests_output_file.write(joined_test_modules)
+                collected_tests_output_file.close()
 
+            print(joined_test_modules)
+        else:
+            while True:
+                stdout_output = sp.stdout.readline()
+                stdout_output_str = stdout_output.decode("utf-8")
+                if stdout_output_str == '' and sp.poll() is not None:
+                    break
+                if stdout_output_str:
+                    print(stdout_output_str.strip())
 
-def validate_and_serialize_options(docopt_options):
-    """
-    For each value that should be configured for a config object, attempt to
-    serialize the passed-in strings into objects that can be used for
-    configuration. If no values were passed in, use the list of options from
-    the defaults above.
+                stderr_output = sp.stderr.readline()
+                stderr_output_str = stderr_output.decode("utf-8")
+                if stderr_output_str == '' and sp.poll() is not None:
+                    break
+                if stderr_output_str:
+                    print(stderr_output_str.strip())
 
-    Raises a ValueError and prints an error message if any values are invalid
-    or didn't serialize correctly.
-    """
-    vnodes = _validate_and_serialize_vnodes(docopt_options['--vnodes'])
-    if vnodes.error_messages:
-        raise ValueError('Validation error:\n{}'.format('\t\n'.join(list(vnodes.error_messages))))
-    return GlobalConfigObject(
-        vnodes=vnodes.serialized or default_config_matrix.vnodes
-    )
+        exit(sp.returncode)
 
 
-def product_of_values(d):
+def collect_test_modules(stdout):
     """
-    Transforms a dictionary of {key: list(configuration_options} into a tuple
-    of dictionaries, each corresponding to a point in the product, with the
-    values preserved at the keys where they were found in the argument.
-
-    This is difficult to explain and is probably best demonstrated with an
-    example:
-
-        >>> from pprint import pprint
-        >>> from runner import product_of_values
-        >>> pprint(product_of_values(
-        ...     {'a': [1, 2, 3],
-        ...      'b': [4, 5, 6]}
-        ... ))
-        ({'a': 1, 'b': 4},
-         {'a': 1, 'b': 5},
-         {'a': 1, 'b': 6},
-         {'a': 2, 'b': 4},
-         {'a': 2, 'b': 5},
-         {'a': 2, 'b': 6},
-         {'a': 3, 'b': 4},
-         {'a': 3, 'b': 5},
-         {'a': 3, 'b': 6})
-
-    So, in this case, we get something like
-
-        for a_value in d['a']:
-            for b_value in d['b']:
-                yield {'a': a_value, 'b': b_value}
-
-    This method does that, but for dictionaries with arbitrary iterables at
-    arbitrary numbers of keys.
+    Takes the xml-ish (no, it's not actually xml so we need to format it a bit) --collect-only output as printed
+    by pytest to stdout and normalizes it to get a list of all collected tests in a human friendly format
+    :param stdout: the stdout from pytest (should have been invoked with the --collect-only cmdline argument)
+    :return: a formatted list of collected test modules in format test_file.py::TestClass::test_function
     """
-
-    # transform, e.g., {'a': [1, 2, 3], 'b': [4, 5, 6]} into
-    # [[('a', 1), ('a', 2), ('a', 3)],
-    #  [('b', 4), ('b', 5), ('b', 6)]]
-    tuple_list = [[(k, v) for v in v_list] for k, v_list in d.items()]
-
-    # return the cartesian product of the flattened dict
-    return tuple(dict(result) for result in product(*tuple_list))
+    # unfortunately, pytest emits xml like output -- but it's not actually xml, so we'll fail to parse
+    # if we try. first step is to fix up the pytest output to create well formatted xml
+    xml_line_regex_pattern = re.compile("^([\s])*<(Module|Class|Function|Instance) '(.*)'>")
+    is_first_module = True
+    is_first_class = True
+    has_closed_class = False
+    section_has_instance = False
+    section_has_class = False
+    test_collect_xml_lines = []
+
+    test_collect_xml_lines.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>")
+    test_collect_xml_lines.append("<Modules>")
+    for line in stdout.decode("utf-8").split('\n'):
+        re_ret = re.search(xml_line_regex_pattern, line)
+        if re_ret:
+            if not is_first_module and re_ret.group(2) == "Module":
+                if section_has_instance:
+                    test_collect_xml_lines.append("      </Instance>")
+                if section_has_class:
+                    test_collect_xml_lines.append("    </Class>")
+
+                test_collect_xml_lines.append("  </Module>")
+                is_first_class = True
+                has_closed_class= False
+                section_has_instance = False
+                section_has_class = False
+                is_first_module = False
+            elif is_first_module and re_ret.group(2) == "Module":
+                if not has_closed_class and section_has_instance:
+                    test_collect_xml_lines.append("      </Instance>")
+                if not has_closed_class and section_has_class:
+                    test_collect_xml_lines.append("    </Class>")
+
+                is_first_class = True
+                is_first_module = False
+                has_closed_class = False
+                section_has_instance = False
+                section_has_class = False
+            elif re_ret.group(2) == "Instance":
+                section_has_instance = True
+            elif not is_first_class and re_ret.group(2) == "Class":
+                if section_has_instance:
+                    test_collect_xml_lines.append("      </Instance>")
+                if section_has_class:
+                    test_collect_xml_lines.append("    </Class>")
+                has_closed_class = True
+                section_has_class = True
+            elif re_ret.group(2) == "Class":
+                is_first_class = False
+                section_has_class = True
+                has_closed_class = False
+
+            if re_ret.group(2) == "Function":
+                test_collect_xml_lines.append("          <Function name=\"{name}\"></Function>"
+                                              .format(name=re_ret.group(3)))
+            elif re_ret.group(2) == "Class":
+                test_collect_xml_lines.append("    <Class name=\"{name}\">".format(name=re_ret.group(3)))
+            elif re_ret.group(2) == "Module":
+                test_collect_xml_lines.append("  <Module name=\"{name}\">".format(name=re_ret.group(3)))
+            elif re_ret.group(2) == "Instance":
+                test_collect_xml_lines.append("      <Instance name=\"\">".format(name=re_ret.group(3)))
+            else:
+                test_collect_xml_lines.append(line)
+
+    test_collect_xml_lines.append("      </Instance>")
+    test_collect_xml_lines.append("    </Class>")
+    test_collect_xml_lines.append("  </Module>")
+    test_collect_xml_lines.append("</Modules>")
+
+    all_collected_test_modules = []
+
+    # parse the now valid xml
+    print("\n".join(test_collect_xml_lines))
+    test_collect_xml = BeautifulSoup("\n".join(test_collect_xml_lines), "lxml-xml")
+
+    # find all Modules (followed by classes in those modules, and then finally functions)
+    for pytest_module in test_collect_xml.findAll("Module"):
+        for test_class_name in pytest_module.findAll("Class"):
+            for function_name in test_class_name.findAll("Function"):
+                # adds to test list in format like test_file.py::TestClass::test_function for every test function found
+                all_collected_test_modules.append("{module_name}::{class_name}::{function_name}"
+                                                  .format(module_name=pytest_module.attrs['name'],
+                                                          class_name=test_class_name.attrs['name'],
+                                                          function_name=function_name.attrs['name']))
+
+    return all_collected_test_modules
 
 
 if __name__ == '__main__':
-    options = docopt(__doc__)
-    validated_options = validate_and_serialize_options(options)
-
-    nose_options = options['--nose-options'] or ''
-    nose_option_list = nose_options.split()
-    test_list = options['TESTS']
-    nose_argv = nose_option_list + test_list
-
-    verbosity = 1  # default verbosity level
-    if options['--runner-debug']:
-        verbosity = 2
-    if options['--runner-quiet']:  # --debug and --quiet are mutually exclusive, enforced by docopt
-        verbosity = 0
-
-    debug = print if verbosity >= 2 else _noop
-    output = print if verbosity >= 1 else _noop
-
-    # Get dictionaries corresponding to each point in the configuration matrix
-    # we want to run, then generate a config object for each of them.
-    debug('Generating configurations from the following matrix:\n\t{}'.format(validated_options))
-    all_configs = tuple(GlobalConfigObject(**d) for d in
-                        product_of_values(validated_options._asdict()))
-    output('About to run nosetests with config objects:\n'
-           '\t{configs}\n'.format(configs='\n\t'.join(map(repr, all_configs))))
-
-    results = []
-    for config in all_configs:
-        # These properties have to hold if we want to evaluate their reprs
-        # below in the generated file.
-        assert eval(repr(config), {'GlobalConfigObject': GlobalConfigObject}, {}) == config
-        assert eval(repr(nose_argv), {}, {}) == nose_argv
-
-        output('Running dtests with config object {}'.format(config))
-
-        # Generate a file that runs nose, passing in config as the
-        # configuration object.
-        #
-        # Yes, this is icky. The reason we do it is because we're dealing with
-        # global configuration. We've decided global, nosetests-run-level
-        # configuration is the way to go. This means we don't want to call
-        # nose.main() multiple times in the same Python interpreter -- I have
-        # not yet found a way to re-execute modules (thus getting new
-        # module-level configuration) for each call. This didn't even work for
-        # me with exec(script, {}, {}). So, here we are.
-        #
-        # How do we execute code in a new interpreter each time? Generate the
-        # code as text, then shell out to a new interpreter.
-        to_execute = (
-            "import nose\n" +
-            "from plugins.dtestconfig import DtestConfigPlugin, GlobalConfigObject\n" +
-            "from plugins.dtestxunit import DTestXunit\n" +
-            "from plugins.dtesttag import DTestTag\n" +
-            "from plugins.dtestcollect import DTestCollect\n" +
-            "import sys\n" +
-            "print sys.getrecursionlimit()\n" +
-            "print sys.setrecursionlimit(8000)\n" +
-            ("nose.main(addplugins=[DtestConfigPlugin({config}), DTestXunit(), DTestCollect(), DTestTag()])\n" if "TEST_TAG" in environ else "nose.main(addplugins=[DtestConfigPlugin({config}), DTestCollect(), DTestXunit()])\n")
-        ).format(config=repr(config))
-        temp = NamedTemporaryFile(dir=getcwd())
-        debug('Writing the following to {}:'.format(temp.name))
-
-        debug('```\n{to_execute}```\n'.format(to_execute=to_execute))
-        temp.write(to_execute)
-        temp.flush()
-
-        # We pass nose_argv as options to the python call to maintain
-        # compatibility with the nosetests command. Arguments passed in via the
-        # command line are treated one way, args passed in as
-        # nose.main(argv=...) are treated another. Compare with the options
-        # -xsv for an example.
-        cmd_list = [sys.executable, temp.name] + nose_argv
-        debug('subprocess.call-ing {cmd_list}'.format(cmd_list=cmd_list))
-
-        if options['--dry-run']:
-            print('Would run the following command:\n\t{}'.format(cmd_list))
-            with open(temp.name, 'r') as f:
-                contents = f.read()
-            print('{temp_name} contains:\n```\n{contents}```\n'.format(
-                temp_name=temp.name,
-                contents=contents
-            ))
-        else:
-            results.append(subprocess.call(cmd_list, env=os.environ.copy()))
-        # separate the end of the last subprocess.call output from the
-        # beginning of the next by printing a newline.
-        print()
-
-    # If this answer:
-    # http://stackoverflow.com/a/21788998/3408454
-    # is to be believed, nosetests will exit with 0 on success, 1 on test or
-    # other failure, and 2 on printing usage. We'll just grab the max of the
-    # runs we saw -- if one printed usage, the whole run "printed usage", if
-    # none printed usage, and one or more failed, we failed, else success.
-    if not results:
-        results = [0]
-    exit(max(results))
+    RunDTests().run(sys.argv[1:])

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/schema_metadata_test.py
----------------------------------------------------------------------
diff --git a/schema_metadata_test.py b/schema_metadata_test.py
index baf8b5a..fdfcf56 100644
--- a/schema_metadata_test.py
+++ b/schema_metadata_test.py
@@ -1,10 +1,13 @@
+import pytest
+import logging
+
 from collections import defaultdict
 from uuid import uuid4
 
-from nose.tools import assert_equal, assert_in
+from dtest import Tester, create_ks
 
-from dtest import Tester, debug, create_ks
-from tools.decorators import since
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 def establish_durable_writes_keyspace(version, session, table_name_prefix=""):
@@ -32,11 +35,10 @@ def verify_durable_writes_keyspace(created_on_version, current_version, keyspace
         "durable_writes_true": True,
         "durable_writes_false": False
     }
-    for keyspace, is_durable in expected.iteritems():
+    for keyspace, is_durable in expected.items():
         keyspace_name = _cql_name_builder(table_name_prefix, keyspace)
         meta = session.cluster.metadata.keyspaces[keyspace_name]
-        assert_equal(is_durable, meta.durable_writes,
-                     "keyspace [{}] had durable_writes of [{}] should be [{}]".format(keyspace_name, meta.durable_writes, is_durable))
+        assert is_durable == meta.durable_writes, "keyspace [{}] had durable_writes of [{}] should be [{}]".format(keyspace_name, meta.durable_writes, is_durable)
 
 
 def establish_indexes_table(version, session, table_name_prefix=""):
@@ -53,29 +55,29 @@ def establish_indexes_table(version, session, table_name_prefix=""):
 
     session.execute(cql.format(table_name))
     index_name = _cql_name_builder("idx_" + table_name_prefix, table_name)
-    debug("table name: [{}], index name: [{}], prefix: [{}]".format(table_name, index_name, table_name_prefix))
+    logger.debug("table name: [{}], index name: [{}], prefix: [{}]".format(table_name, index_name, table_name_prefix))
     session.execute("CREATE INDEX {0} ON {1}( d )".format(index_name, table_name))
 
 
 def verify_indexes_table(created_on_version, current_version, keyspace, session, table_name_prefix=""):
     table_name = _cql_name_builder(table_name_prefix, "test_indexes")
     index_name = _cql_name_builder("idx_" + table_name_prefix, table_name)
-    debug("table name: [{}], index name: [{}], prefix: [{}]".format(table_name, index_name, table_name_prefix))
+    logger.debug("table name: [{}], index name: [{}], prefix: [{}]".format(table_name, index_name, table_name_prefix))
     meta = session.cluster.metadata.keyspaces[keyspace].indexes[index_name]
 
-    assert_equal('d', meta.index_options['target'])
+    assert 'd' == meta.index_options['target']
 
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
-    assert_equal(1, len(meta.clustering_key))
-    assert_equal('c', meta.clustering_key[0].name)
+    assert 1 == len(meta.clustering_key)
+    assert 'c' == meta.clustering_key[0].name
 
-    assert_equal(1, len(meta.indexes))
+    assert 1 == len(meta.indexes)
 
-    assert_equal({'target': 'd'}, meta.indexes[index_name].index_options)
-    assert_equal(3, len(meta.primary_key))
-    assert_equal('a', meta.primary_key[0].name)
-    assert_equal('b', meta.primary_key[1].name)
-    assert_equal('c', meta.primary_key[2].name)
+    assert {'target': 'd'} == meta.indexes[index_name].index_options
+    assert 3 == len(meta.primary_key)
+    assert 'a' == meta.primary_key[0].name
+    assert 'b' == meta.primary_key[1].name
+    assert 'c' == meta.primary_key[2].name
 
 
 def establish_clustering_order_table(version, session, table_name_prefix=""):
@@ -96,13 +98,13 @@ def establish_clustering_order_table(version, session, table_name_prefix=""):
 def verify_clustering_order_table(created_on_version, current_version, keyspace, session, table_name_prefix=""):
     table_name = _cql_name_builder(table_name_prefix, "test_clustering_order")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
-    assert_equal(0, len(meta.indexes))
-    assert_equal(2, len(meta.primary_key))
-    assert_equal('event_type', meta.primary_key[0].name)
-    assert_equal('insertion_time', meta.primary_key[1].name)
-    assert_equal(1, len(meta.clustering_key))
-    assert_equal('insertion_time', meta.clustering_key[0].name)
-    assert_in('insertion_time DESC', meta.as_cql_query())
+    assert 0 == len(meta.indexes)
+    assert 2 == len(meta.primary_key)
+    assert 'event_type' == meta.primary_key[0].name
+    assert 'insertion_time' == meta.primary_key[1].name
+    assert 1 == len(meta.clustering_key)
+    assert 'insertion_time' == meta.clustering_key[0].name
+    assert 'insertion_time DESC' in meta.as_cql_query()
 
 
 def establish_compact_storage_table(version, session, table_name_prefix=""):
@@ -123,16 +125,16 @@ def establish_compact_storage_table(version, session, table_name_prefix=""):
 def verify_compact_storage_table(created_on_version, current_version, keyspace, session, table_name_prefix=""):
     table_name = _cql_name_builder(table_name_prefix, "test_compact_storage")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
-    assert_equal(3, len(meta.columns))
-    assert_equal(2, len(meta.primary_key))
-    assert_equal(1, len(meta.clustering_key))
-    assert_equal('sub_block_id', meta.clustering_key[0].name)
-    assert_equal('block_id', meta.primary_key[0].name)
-    assert_equal('uuid', meta.primary_key[0].cql_type)
-    assert_equal('sub_block_id', meta.primary_key[1].name)
-    assert_equal('int', meta.primary_key[1].cql_type)
-    assert_equal(1, len(meta.clustering_key))
-    assert_equal('sub_block_id', meta.clustering_key[0].name)
+    assert 3 == len(meta.columns)
+    assert 2 == len(meta.primary_key)
+    assert 1 == len(meta.clustering_key)
+    assert 'sub_block_id' == meta.clustering_key[0].name
+    assert 'block_id' == meta.primary_key[0].name
+    assert 'uuid' == meta.primary_key[0].cql_type
+    assert 'sub_block_id' == meta.primary_key[1].name
+    assert 'int' == meta.primary_key[1].cql_type
+    assert 1 == len(meta.clustering_key)
+    assert 'sub_block_id' == meta.clustering_key[0].name
 
 
 def establish_compact_storage_composite_table(version, session, table_name_prefix=""):
@@ -153,19 +155,19 @@ def establish_compact_storage_composite_table(version, session, table_name_prefi
 def verify_compact_storage_composite_table(created_on_version, current_version, keyspace, session, table_name_prefix=""):
     table_name = _cql_name_builder(table_name_prefix, "test_compact_storage_composite")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
-    assert_equal(4, len(meta.columns))
-    assert_equal(3, len(meta.primary_key))
-    assert_equal('key', meta.primary_key[0].name)
-    assert_equal('text', meta.primary_key[0].cql_type)
-    assert_equal('column1', meta.primary_key[1].name)
-    assert_equal('int', meta.primary_key[1].cql_type)
-    assert_equal('column2', meta.primary_key[2].name)
-    assert_equal('int', meta.primary_key[2].cql_type)
-    assert_equal(2, len(meta.clustering_key))
-    assert_equal('column1', meta.clustering_key[0].name)
-    assert_equal('int', meta.clustering_key[0].cql_type)
-    assert_equal('column2', meta.clustering_key[1].name)
-    assert_equal('int', meta.clustering_key[1].cql_type)
+    assert 4 == len(meta.columns)
+    assert 3 == len(meta.primary_key)
+    assert 'key' == meta.primary_key[0].name
+    assert 'text' == meta.primary_key[0].cql_type
+    assert 'column1' == meta.primary_key[1].name
+    assert 'int' == meta.primary_key[1].cql_type
+    assert 'column2' == meta.primary_key[2].name
+    assert 'int' == meta.primary_key[2].cql_type
+    assert 2 == len(meta.clustering_key)
+    assert 'column1' == meta.clustering_key[0].name
+    assert 'int' == meta.clustering_key[0].cql_type
+    assert 'column2' == meta.clustering_key[1].name
+    assert 'int' == meta.clustering_key[1].cql_type
 
 
 def establish_nondefault_table_settings(version, session, table_name_prefix=""):
@@ -213,45 +215,45 @@ def verify_nondefault_table_settings(created_on_version, current_version, keyspa
     table_name = _cql_name_builder(table_name_prefix, "test_nondefault_settings")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
 
-    assert_equal('insightful information', meta.options['comment'])
-    assert_equal(0.88, meta.options['dclocal_read_repair_chance'])
-    assert_equal(9999, meta.options['gc_grace_seconds'])
-    assert_equal(0.99, meta.options['read_repair_chance'])
-    assert_equal(0.5, meta.options['bloom_filter_fp_chance'])
+    assert 'insightful information' == meta.options['comment']
+    assert 0.88 == meta.options['dclocal_read_repair_chance']
+    assert 9999 == meta.options['gc_grace_seconds']
+    assert 0.99 == meta.options['read_repair_chance']
+    assert 0.5 == meta.options['bloom_filter_fp_chance']
 
     if created_on_version >= '2.1':
-        assert_equal(86400, meta.options['default_time_to_live'])
-        assert_equal(1, meta.options['min_index_interval'])
-        assert_equal(20, meta.options['max_index_interval'])
+        assert 86400 == meta.options['default_time_to_live']
+        assert 1 == meta.options['min_index_interval']
+        assert 20 == meta.options['max_index_interval']
 
     if created_on_version >= '3.0':
-        assert_equal('55PERCENTILE', meta.options['speculative_retry'])
-        assert_equal(2121, meta.options['memtable_flush_period_in_ms'])
+        assert '55PERCENTILE' == meta.options['speculative_retry']
+        assert 2121 == meta.options['memtable_flush_period_in_ms']
 
     if current_version >= '3.0':
-        assert_equal('org.apache.cassandra.io.compress.DeflateCompressor', meta.options['compression']['class'])
-        assert_equal('128', meta.options['compression']['chunk_length_in_kb'])
-        assert_equal('org.apache.cassandra.db.compaction.LeveledCompactionStrategy', meta.options['compaction']['class'])
+        assert 'org.apache.cassandra.io.compress.DeflateCompressor' == meta.options['compression']['class']
+        assert '128' == meta.options['compression']['chunk_length_in_kb']
+        assert 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' == meta.options['compaction']['class']
 
     if '2.1' <= current_version < '3.0':
-        assert_equal('{"keys":"NONE", "rows_per_partition":"ALL"}', meta.options['caching'])
-        assert_in('"chunk_length_kb":"128"', meta.options['compression_parameters'])
-        assert_in('"sstable_compression":"org.apache.cassandra.io.compress.DeflateCompressor"', meta.options['compression_parameters'])
+        assert '{"keys":"NONE", "rows_per_partition":"ALL"}' == meta.options['caching']
+        assert '"chunk_length_kb":"128"' in meta.options['compression_parameters']
+        assert '"sstable_compression":"org.apache.cassandra.io.compress.DeflateCompressor"' in meta.options['compression_parameters']
     elif current_version >= '3.0':
-        assert_equal('NONE', meta.options['caching']['keys'])
-        assert_equal('ALL', meta.options['caching']['rows_per_partition'])
-        assert_equal('org.apache.cassandra.io.compress.DeflateCompressor', meta.options['compression']['class'])
-        assert_equal('128', meta.options['compression']['chunk_length_in_kb'])
-        assert_equal('org.apache.cassandra.db.compaction.LeveledCompactionStrategy', meta.options['compaction']['class'])
+        assert 'NONE' == meta.options['caching']['keys']
+        assert 'ALL' == meta.options['caching']['rows_per_partition']
+        assert 'org.apache.cassandra.io.compress.DeflateCompressor' == meta.options['compression']['class']
+        assert '128' == meta.options['compression']['chunk_length_in_kb']
+        assert 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' == meta.options['compaction']['class']
     else:
-        assert_equal('ROWS_ONLY', meta.options['caching'])
+        assert 'ROWS_ONLY' == meta.options['caching']
 
-    assert_equal(2, len(meta.partition_key))
-    assert_equal(meta.partition_key[0].name, 'a')
-    assert_equal(meta.partition_key[1].name, 'b')
+    assert 2 == len(meta.partition_key)
+    assert meta.partition_key[0].name == 'a'
+    assert meta.partition_key[1].name == 'b'
 
-    assert_equal(1, len(meta.clustering_key))
-    assert_equal(meta.clustering_key[0].name, 'c')
+    assert 1 == len(meta.clustering_key)
+    assert meta.clustering_key[0].name == 'c'
 
 
 def establish_uda(version, session, table_name_prefix=""):
@@ -281,13 +283,13 @@ def verify_uda(created_on_version, current_version, keyspace, session, table_nam
     function_name = _cql_name_builder(table_name_prefix, "test_uda_function")
     aggregate_name = _cql_name_builder(table_name_prefix, "test_uda_aggregate")
 
-    assert_in(function_name + "(int,int)", session.cluster.metadata.keyspaces[keyspace].functions.keys())
-    assert_in(aggregate_name + "(int)", session.cluster.metadata.keyspaces[keyspace].aggregates.keys())
+    assert function_name + "(int,int)" in list(session.cluster.metadata.keyspaces[keyspace].functions.keys())
+    assert aggregate_name + "(int)" in list(session.cluster.metadata.keyspaces[keyspace].aggregates.keys())
 
     aggr_meta = session.cluster.metadata.keyspaces[keyspace].aggregates[aggregate_name + "(int)"]
-    assert_equal(function_name, aggr_meta.state_func)
-    assert_equal('int', aggr_meta.state_type)
-    assert_equal('int', aggr_meta.return_type)
+    assert function_name == aggr_meta.state_func
+    assert 'int' == aggr_meta.state_type
+    assert 'int' == aggr_meta.return_type
 
 
 def establish_udf(version, session, table_name_prefix=""):
@@ -303,13 +305,13 @@ def verify_udf(created_on_version, current_version, keyspace, session, table_nam
     if created_on_version < '2.2':
         return
     function_name = _cql_name_builder(table_name_prefix, "test_udf")
-    assert_in(function_name + "(double)", session.cluster.metadata.keyspaces[keyspace].functions.keys())
+    assert function_name + "(double)" in list(session.cluster.metadata.keyspaces[keyspace].functions.keys())
     meta = session.cluster.metadata.keyspaces[keyspace].functions[function_name + "(double)"]
-    assert_equal('java', meta.language)
-    assert_equal('double', meta.return_type)
-    assert_equal(['double'], meta.argument_types)
-    assert_equal(['input'], meta.argument_names)
-    assert_equal('return Double.valueOf(Math.log(input.doubleValue()));', meta.body)
+    assert 'java' == meta.language
+    assert 'double' == meta.return_type
+    assert ['double'] == meta.argument_types
+    assert ['input'] == meta.argument_names
+    assert 'return Double.valueOf(Math.log(input.doubleValue()));' == meta.body
 
 
 def establish_udt_table(version, session, table_name_prefix=""):
@@ -330,13 +332,13 @@ def verify_udt_table(created_on_version, current_version, keyspace, session, tab
     table_name = _cql_name_builder(table_name_prefix, "test_udt")
     meta = session.cluster.metadata.keyspaces[keyspace].user_types[table_name]
 
-    assert_equal(meta.field_names, ['street', 'city', 'zip'])
-    assert_equal('street', meta.field_names[0])
-    assert_equal('text', meta.field_types[0])
-    assert_equal('city', meta.field_names[1])
-    assert_equal('text', meta.field_types[1])
-    assert_equal('zip', meta.field_names[2])
-    assert_equal('int', meta.field_types[2])
+    assert meta.field_names == ['street', 'city', 'zip']
+    assert 'street' == meta.field_names[0]
+    assert 'text' == meta.field_types[0]
+    assert 'city' == meta.field_names[1]
+    assert 'text' == meta.field_types[1]
+    assert 'zip' == meta.field_names[2]
+    assert 'int' == meta.field_types[2]
 
 
 def establish_static_column_table(version, session, table_name_prefix=""):
@@ -358,15 +360,15 @@ def verify_static_column_table(created_on_version, current_version, keyspace, se
         return
     table_name = _cql_name_builder(table_name_prefix, "test_static_column")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
-    assert_equal(4, len(meta.columns))
-    assert_equal('text', meta.columns['user'].cql_type)
-    assert_equal(False, meta.columns['user'].is_static)
-    assert_equal('int', meta.columns['balance'].cql_type)
-    assert_equal(True, meta.columns['balance'].is_static)
-    assert_equal('int', meta.columns['expense_id'].cql_type)
-    assert_equal(False, meta.columns['expense_id'].is_static)
-    assert_equal('int', meta.columns['amount'].cql_type)
-    assert_equal(False, meta.columns['amount'].is_static)
+    assert 4 == len(meta.columns)
+    assert 'text' == meta.columns['user'].cql_type
+    assert False == meta.columns['user'].is_static
+    assert 'int' == meta.columns['balance'].cql_type
+    assert True == meta.columns['balance'].is_static
+    assert 'int' == meta.columns['expense_id'].cql_type
+    assert False == meta.columns['expense_id'].is_static
+    assert 'int' == meta.columns['amount'].cql_type
+    assert False == meta.columns['amount'].is_static
 
 
 def establish_collection_datatype_table(version, session, table_name_prefix=""):
@@ -399,24 +401,24 @@ def verify_collection_datatype_table(created_on_version, current_version, keyspa
     table_name = _cql_name_builder(table_name_prefix, "test_collection_datatypes")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
     if created_on_version > '2.1':
-        assert_equal(13, len(meta.columns))
+        assert 13 == len(meta.columns)
     else:
-        assert_equal(7, len(meta.columns))
+        assert 7 == len(meta.columns)
 
-    assert_equal('list<int>', meta.columns['a'].cql_type)
-    assert_equal('list<text>', meta.columns['b'].cql_type)
-    assert_equal('set<int>', meta.columns['c'].cql_type)
-    assert_equal('set<text>', meta.columns['d'].cql_type)
-    assert_equal('map<text, text>', meta.columns['e'].cql_type)
-    assert_equal('map<text, int>', meta.columns['f'].cql_type)
+    assert 'list<int>' == meta.columns['a'].cql_type
+    assert 'list<text>' == meta.columns['b'].cql_type
+    assert 'set<int>' == meta.columns['c'].cql_type
+    assert 'set<text>' == meta.columns['d'].cql_type
+    assert 'map<text, text>' == meta.columns['e'].cql_type
+    assert 'map<text, int>' == meta.columns['f'].cql_type
 
     if created_on_version > '2.1':
-        assert_equal('frozen<list<int>>', meta.columns['g'].cql_type)
-        assert_equal('frozen<list<text>>', meta.columns['h'].cql_type)
-        assert_equal('frozen<set<int>>', meta.columns['i'].cql_type)
-        assert_equal('frozen<set<text>>', meta.columns['j'].cql_type)
-        assert_equal('frozen<map<text, text>>', meta.columns['k'].cql_type)
-        assert_equal('frozen<map<text, int>>', meta.columns['l'].cql_type)
+        assert 'frozen<list<int>>' == meta.columns['g'].cql_type
+        assert 'frozen<list<text>>' == meta.columns['h'].cql_type
+        assert 'frozen<set<int>>' == meta.columns['i'].cql_type
+        assert 'frozen<set<text>>' == meta.columns['j'].cql_type
+        assert 'frozen<map<text, text>>' == meta.columns['k'].cql_type
+        assert 'frozen<map<text, int>>' == meta.columns['l'].cql_type
 
 
 def establish_basic_datatype_table(version, session, table_name_prefix=""):
@@ -453,33 +455,33 @@ def verify_basic_datatype_table(created_on_version, current_version, keyspace, s
     table_name = _cql_name_builder(table_name_prefix, "test_basic_datatypes")
     meta = session.cluster.metadata.keyspaces[keyspace].tables[table_name]
     if created_on_version > '2.2':
-        assert_equal(19, len(meta.columns))
+        assert 19 == len(meta.columns)
     else:
-        assert_equal(15, len(meta.columns))
-
-    assert_equal(1, len(meta.primary_key))
-    assert_equal('b', meta.primary_key[0].name)
-
-    assert_equal('ascii', meta.columns['a'].cql_type)
-    assert_equal('bigint', meta.columns['b'].cql_type)
-    assert_equal('blob', meta.columns['c'].cql_type)
-    assert_equal('boolean', meta.columns['d'].cql_type)
-    assert_equal('decimal', meta.columns['e'].cql_type)
-    assert_equal('double', meta.columns['f'].cql_type)
-    assert_equal('float', meta.columns['g'].cql_type)
-    assert_equal('inet', meta.columns['h'].cql_type)
-    assert_equal('int', meta.columns['i'].cql_type)
-    assert_equal('text', meta.columns['j'].cql_type)
-    assert_equal('timestamp', meta.columns['k'].cql_type)
-    assert_equal('timeuuid', meta.columns['l'].cql_type)
-    assert_equal('uuid', meta.columns['m'].cql_type)
-    assert_equal('text', meta.columns['n'].cql_type)
-    assert_equal('varint', meta.columns['o'].cql_type)
+        assert 15 == len(meta.columns)
+
+    assert 1 == len(meta.primary_key)
+    assert 'b' == meta.primary_key[0].name
+
+    assert 'ascii' == meta.columns['a'].cql_type
+    assert 'bigint' == meta.columns['b'].cql_type
+    assert 'blob' == meta.columns['c'].cql_type
+    assert 'boolean' == meta.columns['d'].cql_type
+    assert 'decimal' == meta.columns['e'].cql_type
+    assert 'double' == meta.columns['f'].cql_type
+    assert 'float' == meta.columns['g'].cql_type
+    assert 'inet' == meta.columns['h'].cql_type
+    assert 'int' == meta.columns['i'].cql_type
+    assert 'text' == meta.columns['j'].cql_type
+    assert 'timestamp' == meta.columns['k'].cql_type
+    assert 'timeuuid' == meta.columns['l'].cql_type
+    assert 'uuid' == meta.columns['m'].cql_type
+    assert 'text' == meta.columns['n'].cql_type
+    assert 'varint' == meta.columns['o'].cql_type
     if created_on_version > '2.2':
-        assert_equal('date', meta.columns['p'].cql_type)
-        assert_equal('smallint', meta.columns['q'].cql_type)
-        assert_equal('time', meta.columns['r'].cql_type)
-        assert_equal('tinyint', meta.columns['s'].cql_type)
+        assert 'date' == meta.columns['p'].cql_type
+        assert 'smallint' == meta.columns['q'].cql_type
+        assert 'time' == meta.columns['r'].cql_type
+        assert 'tinyint' == meta.columns['s'].cql_type
 
 
 def _cql_name_builder(prefix, table_name):
@@ -495,10 +497,9 @@ def _cql_name_builder(prefix, table_name):
 
 
 class TestSchemaMetadata(Tester):
-
-    def setUp(self):
-        Tester.setUp(self)
-        cluster = self.cluster
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
+        cluster = fixture_dtest_setup.cluster
         cluster.schema_event_refresh_window = 0
 
         if cluster.version() >= '3.0':
@@ -508,101 +509,101 @@ class TestSchemaMetadata(Tester):
             cluster.set_configuration_options({'enable_user_defined_functions': 'true'})
         cluster.populate(1).start()
 
-        self.session = self.patient_cql_connection(cluster.nodelist()[0])
+        self.session = fixture_dtest_setup.patient_cql_connection(cluster.nodelist()[0])
         create_ks(self.session, 'ks', 1)
 
     def _keyspace_meta(self, keyspace_name="ks"):
         self.session.cluster.refresh_schema_metadata()
         return self.session.cluster.metadata.keyspaces[keyspace_name]
 
-    def creating_and_dropping_keyspace_test(self):
+    def test_creating_and_dropping_keyspace(self):
         starting_keyspace_count = len(self.session.cluster.metadata.keyspaces)
-        self.assertEqual(True, self._keyspace_meta().durable_writes)
+        assert True == self._keyspace_meta().durable_writes
         self.session.execute("""
                 CREATE KEYSPACE so_long
                     WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
                     AND durable_writes = false
             """)
-        self.assertEqual(False, self._keyspace_meta('so_long').durable_writes)
+        assert False == self._keyspace_meta('so_long').durable_writes
         self.session.execute("DROP KEYSPACE so_long")
-        self.assertEqual(starting_keyspace_count, len(self.session.cluster.metadata.keyspaces))
+        assert starting_keyspace_count == len(self.session.cluster.metadata.keyspaces)
 
-    def creating_and_dropping_table_test(self):
+    def test_creating_and_dropping_table(self):
         self.session.execute("create table born_to_die (id uuid primary key, name varchar)")
         meta = self._keyspace_meta().tables['born_to_die']
-        self.assertEqual('ks', meta.keyspace_name)
-        self.assertEqual('born_to_die', meta.name)
-        self.assertEqual(1, len(meta.partition_key))
-        self.assertEqual('id', meta.partition_key[0].name)
-        self.assertEqual(2, len(meta.columns))
-        self.assertIsNotNone(meta.columns.get('id'))
-        self.assertEqual('uuid', meta.columns['id'].cql_type)
-        self.assertIsNotNone(meta.columns.get('name'))
-        self.assertEqual('text', meta.columns['name'].cql_type)
-        self.assertEqual(0, len(meta.clustering_key))
-        self.assertEqual(0, len(meta.triggers))
-        self.assertEqual(0, len(meta.indexes))
+        assert 'ks' == meta.keyspace_name
+        assert 'born_to_die' == meta.name
+        assert 1 == len(meta.partition_key)
+        assert 'id' == meta.partition_key[0].name
+        assert 2 == len(meta.columns)
+        assert meta.columns.get('id') is not None
+        assert 'uuid' == meta.columns['id'].cql_type
+        assert meta.columns.get('name') is not None
+        assert 'text' == meta.columns['name'].cql_type
+        assert 0 == len(meta.clustering_key)
+        assert 0 == len(meta.triggers)
+        assert 0 == len(meta.indexes)
         self.session.execute("drop table born_to_die")
-        self.assertIsNone(self._keyspace_meta().tables.get('born_to_die'))
+        assert self._keyspace_meta().tables.get('born_to_die') is None
 
-    def creating_and_dropping_table_with_2ary_indexes_test(self):
-        self.assertEqual(0, len(self._keyspace_meta().indexes))
+    def test_creating_and_dropping_table_with_2ary_indexes(self):
+        assert 0 == len(self._keyspace_meta().indexes)
         self.session.execute("create table born_to_die (id uuid primary key, name varchar)")
         self.session.execute("create index ix_born_to_die_name on born_to_die(name)")
 
-        self.assertEqual(1, len(self._keyspace_meta().indexes))
+        assert 1 == len(self._keyspace_meta().indexes)
         ix_meta = self._keyspace_meta().indexes['ix_born_to_die_name']
-        self.assertEqual('ix_born_to_die_name', ix_meta.name)
+        assert 'ix_born_to_die_name' == ix_meta.name
 
-        self.assertEqual({'target': 'name'}, ix_meta.index_options)
-        self.assertEqual('COMPOSITES', ix_meta.kind)
+        assert {'target': 'name'} == ix_meta.index_options
+        assert 'COMPOSITES' == ix_meta.kind
 
         self.session.execute("drop table born_to_die")
-        self.assertIsNone(self._keyspace_meta().tables.get('born_to_die'))
-        self.assertIsNone(self._keyspace_meta().indexes.get('ix_born_to_die_name'))
-        self.assertEqual(0, len(self._keyspace_meta().indexes))
+        assert self._keyspace_meta().tables.get('born_to_die') is None
+        assert self._keyspace_meta().indexes.get('ix_born_to_die_name') is None
+        assert 0 == len(self._keyspace_meta().indexes)
 
     @since('2.1')
-    def creating_and_dropping_user_types_test(self):
-        self.assertEqual(0, len(self._keyspace_meta().user_types))
+    def test_creating_and_dropping_user_types(self):
+        assert 0 == len(self._keyspace_meta().user_types)
         self.session.execute("CREATE TYPE soon_to_die (foo text, bar int)")
-        self.assertEqual(1, len(self._keyspace_meta().user_types))
+        assert 1 == len(self._keyspace_meta().user_types)
 
         ut_meta = self._keyspace_meta().user_types['soon_to_die']
-        self.assertEqual('ks', ut_meta.keyspace)
-        self.assertEqual('soon_to_die', ut_meta.name)
-        self.assertEqual(['foo', 'bar'], ut_meta.field_names)
-        self.assertEqual(['text', 'int'], ut_meta.field_types)
+        assert 'ks' == ut_meta.keyspace
+        assert 'soon_to_die' == ut_meta.name
+        assert ['foo', 'bar'] == ut_meta.field_names
+        assert ['text', 'int'] == ut_meta.field_types
 
         self.session.execute("DROP TYPE soon_to_die")
-        self.assertEqual(0, len(self._keyspace_meta().user_types))
+        assert 0 == len(self._keyspace_meta().user_types)
 
     @since('2.2')
-    def creating_and_dropping_udf_test(self):
-        self.assertEqual(0, len(self._keyspace_meta().functions), "expected to start with no indexes")
+    def test_creating_and_dropping_udf(self):
+        assert 0 == len(self._keyspace_meta().functions), "expected to start with no indexes"
         self.session.execute("""
                 CREATE OR REPLACE FUNCTION ks.wasteful_function (input double)
                     CALLED ON NULL INPUT
                     RETURNS double
                     LANGUAGE java AS 'return Double.valueOf(Math.log(input.doubleValue()));';
             """)
-        self.assertEqual(1, len(self._keyspace_meta().functions), "udf count should be 1")
+        assert 1 == len(self._keyspace_meta().functions), "udf count should be 1"
         udf_meta = self._keyspace_meta().functions['wasteful_function(double)']
-        self.assertEqual('ks', udf_meta.keyspace)
-        self.assertEqual('wasteful_function', udf_meta.name)
-        self.assertEqual(['double'], udf_meta.argument_types)
-        self.assertEqual(['input'], udf_meta.argument_names)
-        self.assertEqual('double', udf_meta.return_type)
-        self.assertEqual('java', udf_meta.language)
-        self.assertEqual('return Double.valueOf(Math.log(input.doubleValue()));', udf_meta.body)
-        self.assertTrue(udf_meta.called_on_null_input)
+        assert 'ks' == udf_meta.keyspace
+        assert 'wasteful_function' == udf_meta.name
+        assert ['double'] == udf_meta.argument_types
+        assert ['input'] == udf_meta.argument_names
+        assert 'double' == udf_meta.return_type
+        assert 'java' == udf_meta.language
+        assert 'return Double.valueOf(Math.log(input.doubleValue()));' == udf_meta.body
+        assert udf_meta.called_on_null_input
         self.session.execute("DROP FUNCTION ks.wasteful_function")
-        self.assertEqual(0, len(self._keyspace_meta().functions), "expected udf list to be back to zero")
+        assert 0 == len(self._keyspace_meta().functions), "expected udf list to be back to zero"
 
     @since('2.2')
-    def creating_and_dropping_uda_test(self):
-        self.assertEqual(0, len(self._keyspace_meta().functions), "expected to start with no indexes")
-        self.assertEqual(0, len(self._keyspace_meta().aggregates), "expected to start with no aggregates")
+    def test_creating_and_dropping_uda(self):
+        assert 0 == len(self._keyspace_meta().functions), "expected to start with no indexes"
+        assert 0 == len(self._keyspace_meta().aggregates), "expected to start with no aggregates"
         self.session.execute('''
                 CREATE FUNCTION ks.max_val(current int, candidate int)
                 CALLED ON NULL INPUT
@@ -615,86 +616,86 @@ class TestSchemaMetadata(Tester):
                 STYPE int
                 INITCOND -1
             ''')
-        self.assertEqual(1, len(self._keyspace_meta().functions), "udf count should be 1")
-        self.assertEqual(1, len(self._keyspace_meta().aggregates), "uda count should be 1")
+        assert 1 == len(self._keyspace_meta().functions), "udf count should be 1"
+        assert 1 == len(self._keyspace_meta().aggregates), "uda count should be 1"
         udf_meta = self._keyspace_meta().functions['max_val(int,int)']
         uda_meta = self._keyspace_meta().aggregates['kind_of_max_agg(int)']
 
-        self.assertEqual('ks', udf_meta.keyspace)
-        self.assertEqual('max_val', udf_meta.name)
-        self.assertEqual(['int', 'int'], udf_meta.argument_types)
-        self.assertEqual(['current', 'candidate'], udf_meta.argument_names)
-        self.assertEqual('int', udf_meta.return_type)
-        self.assertEqual('java', udf_meta.language)
-        self.assertEqual('if (current == null) return candidate; else return Math.max(current, candidate);', udf_meta.body)
-        self.assertTrue(udf_meta.called_on_null_input)
-
-        self.assertEqual('ks', uda_meta.keyspace)
-        self.assertEqual('kind_of_max_agg', uda_meta.name)
-        self.assertEqual(['int'], uda_meta.argument_types)
-        self.assertEqual('max_val', uda_meta.state_func)
-        self.assertEqual('int', uda_meta.state_type)
-        self.assertEqual(None, uda_meta.final_func)
-        self.assertEqual('-1', uda_meta.initial_condition)
-        self.assertEqual('int', uda_meta.return_type)
+        assert 'ks' == udf_meta.keyspace
+        assert 'max_val' == udf_meta.name
+        assert ['int', 'int'] == udf_meta.argument_types
+        assert ['current', 'candidate'] == udf_meta.argument_names
+        assert 'int' == udf_meta.return_type
+        assert 'java' == udf_meta.language
+        assert 'if (current == null) return candidate; else return Math.max(current, candidate);' == udf_meta.body
+        assert udf_meta.called_on_null_input
+
+        assert 'ks' == uda_meta.keyspace
+        assert 'kind_of_max_agg' == uda_meta.name
+        assert ['int'] == uda_meta.argument_types
+        assert 'max_val' == uda_meta.state_func
+        assert 'int' == uda_meta.state_type
+        assert None == uda_meta.final_func
+        assert '-1' == uda_meta.initial_condition
+        assert 'int' == uda_meta.return_type
 
         self.session.execute("DROP AGGREGATE ks.kind_of_max_agg")
-        self.assertEqual(0, len(self._keyspace_meta().aggregates), "expected uda list to be back to zero")
+        assert 0 == len(self._keyspace_meta().aggregates), "expected uda list to be back to zero"
         self.session.execute("DROP FUNCTION ks.max_val")
-        self.assertEqual(0, len(self._keyspace_meta().functions), "expected udf list to be back to zero")
+        assert 0 == len(self._keyspace_meta().functions), "expected udf list to be back to zero"
 
-    def basic_table_datatype_test(self):
+    def test_basic_table_datatype(self):
         establish_basic_datatype_table(self.cluster.version(), self.session)
         verify_basic_datatype_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
-    def collection_table_datatype_test(self):
+    def test_collection_table_datatype(self):
         establish_collection_datatype_table(self.cluster.version(), self.session)
         verify_collection_datatype_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
-    def clustering_order_test(self):
+    def test_clustering_order(self):
         establish_clustering_order_table(self.cluster.version(), self.session)
         verify_clustering_order_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since("2.0", max_version="3.X")  # Compact Storage
-    def compact_storage_test(self):
+    def test_compact_storage(self):
         establish_compact_storage_table(self.cluster.version(), self.session)
         verify_compact_storage_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since("2.0", max_version="3.X")  # Compact Storage
-    def compact_storage_composite_test(self):
+    def test_compact_storage_composite(self):
         establish_compact_storage_composite_table(self.cluster.version(), self.session)
         verify_compact_storage_composite_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
-    def nondefault_table_settings_test(self):
+    def test_nondefault_table_settings(self):
         establish_nondefault_table_settings(self.cluster.version(), self.session)
         verify_nondefault_table_settings(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
-    def indexes_test(self):
+    def test_indexes(self):
         establish_indexes_table(self.cluster.version(), self.session)
         verify_indexes_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
-    def durable_writes_test(self):
+    def test_durable_writes(self):
         establish_durable_writes_keyspace(self.cluster.version(), self.session)
         verify_durable_writes_keyspace(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since('2.0')
-    def static_column_test(self):
+    def test_static_column(self):
         establish_static_column_table(self.cluster.version(), self.session)
         verify_static_column_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since('2.1')
-    def udt_table_test(self):
+    def test_udt_table(self):
         establish_udt_table(self.cluster.version(), self.session)
         verify_udt_table(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since('2.2')
-    def udf_test(self):
+    def test_udf(self):
         establish_udf(self.cluster.version(), self.session)
         self.session.cluster.refresh_schema_metadata()
         verify_udf(self.cluster.version(), self.cluster.version(), 'ks', self.session)
 
     @since('2.2')
-    def uda_test(self):
+    def test_uda(self):
         establish_uda(self.cluster.version(), self.session)
         self.session.cluster.refresh_schema_metadata()
         verify_uda(self.cluster.version(), self.cluster.version(), 'ks', self.session)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/schema_test.py
----------------------------------------------------------------------
diff --git a/schema_test.py b/schema_test.py
index 1553adc..8aaea3d 100644
--- a/schema_test.py
+++ b/schema_test.py
@@ -1,15 +1,19 @@
 import time
+import pytest
+import logging
 
 from cassandra.concurrent import execute_concurrent_with_args
 
 from tools.assertions import assert_invalid, assert_all, assert_one
-from tools.decorators import since
 from dtest import Tester, create_ks
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 class TestSchema(Tester):
 
-    def table_alteration_test(self):
+    def test_table_alteration(self):
         """
         Tests that table alters return as expected with many sstables at different schema points
         """
@@ -42,16 +46,16 @@ class TestSchema(Tester):
         rows = session.execute("select * from tbl_o_churn")
         for row in rows:
             if row.id < rows_to_insert * 5:
-                self.assertEqual(row.c1, 'bbb')
-                self.assertIsNone(row.c2)
-                self.assertFalse(hasattr(row, 'c0'))
+                assert row.c1 == 'bbb'
+                assert row.c2 is None
+                assert not hasattr(row, 'c0')
             else:
-                self.assertEqual(row.c1, 'ccc')
-                self.assertEqual(row.c2, 'ddd')
-                self.assertFalse(hasattr(row, 'c0'))
+                assert row.c1 == 'ccc'
+                assert row.c2 == 'ddd'
+                assert not hasattr(row, 'c0')
 
     @since("2.0", max_version="3.X")  # Compact Storage
-    def drop_column_compact_test(self):
+    def test_drop_column_compact(self):
         session = self.prepare()
 
         session.execute("USE ks")
@@ -59,7 +63,7 @@ class TestSchema(Tester):
 
         assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
 
-    def drop_column_compaction_test(self):
+    def test_drop_column_compaction(self):
         session = self.prepare()
         session.execute("USE ks")
         session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
@@ -84,7 +88,7 @@ class TestSchema(Tester):
         session = self.patient_cql_connection(node)
         assert_all(session, "SELECT c1 FROM ks.cf", [[None], [None], [None], [4]], ignore_order=True)
 
-    def drop_column_queries_test(self):
+    def test_drop_column_queries(self):
         session = self.prepare()
 
         session.execute("USE ks")
@@ -116,7 +120,7 @@ class TestSchema(Tester):
 
         assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
 
-    def drop_column_and_restart_test(self):
+    def test_drop_column_and_restart(self):
         """
         Simply insert data in a table, drop a column involved in the insert and restart the node afterwards.
         This ensures that the dropped_columns system table is properly flushed on the alter or the restart
@@ -142,7 +146,7 @@ class TestSchema(Tester):
         session.execute("USE ks")
         assert_one(session, "SELECT * FROM t", [0, 0])
 
-    def drop_static_column_and_restart_test(self):
+    def test_drop_static_column_and_restart(self):
         """
         Dropping a static column caused an sstable corrupt exception after restarting, here
         we test that we can drop a static column and restart safely.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org