You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2019/01/09 17:13:22 UTC

[cassandra-dtest] branch master updated: Reenable upgrade tests

This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git


The following commit(s) were added to refs/heads/master by this push:
     new 84598f1  Reenable upgrade tests
84598f1 is described below

commit 84598f11513f4c1dc0be4d7115a47b59940a649e
Author: Ariel Weisberg <aw...@apple.com>
AuthorDate: Wed Oct 31 16:17:17 2018 -0400

    Reenable upgrade tests
    
    Patch by Ariel Weisberg; Reviewed by Dinesh Joshi for CASSANDRA-14421
    
    Co-authored-by: Ariel Weisberg <aw...@apple.com>
    Co-authored-by: Dinesh A. Joshi <di...@apple.com>
---
 conftest.py                                    |  63 +++--
 cqlsh_tests/cqlsh_copy_tests.py                |   1 +
 cqlsh_tests/cqlsh_tests.py                     |   3 +
 dtest.py                                       |   9 +-
 dtest_setup.py                                 |  28 +-
 pytest.ini                                     |   3 +-
 sstable_generation_loading_test.py             |  41 ++-
 tools/assertions.py                            |   8 +
 tools/misc.py                                  |  22 +-
 tools/paging.py                                |  14 +-
 upgrade_tests/compatibility_flag_test.py       |   2 +
 upgrade_tests/conftest.py                      |   4 +
 upgrade_tests/cql_tests.py                     | 249 +++++++++++++----
 upgrade_tests/paging_test.py                   | 110 ++++----
 upgrade_tests/regression_test.py               |  10 +-
 upgrade_tests/repair_test.py                   |   4 +-
 upgrade_tests/storage_engine_upgrade_test.py   | 120 ++++++---
 upgrade_tests/thrift_upgrade_test.py           | 356 +++++++++++++++++--------
 upgrade_tests/upgrade_base.py                  |  30 ++-
 upgrade_tests/upgrade_compact_storage.py       |   2 +
 upgrade_tests/upgrade_manifest.py              | 131 +++++----
 upgrade_tests/upgrade_schema_agreement_test.py |   2 +
 upgrade_tests/upgrade_supercolumns_test.py     |  23 +-
 upgrade_tests/upgrade_through_versions_test.py |  87 +++---
 24 files changed, 922 insertions(+), 400 deletions(-)

diff --git a/conftest.py b/conftest.py
index 5b1a276..bfd4299 100644
--- a/conftest.py
+++ b/conftest.py
@@ -18,6 +18,7 @@ from netifaces import AF_INET
 from psutil import virtual_memory
 
 import netifaces as ni
+import ccmlib.repository
 from ccmlib.common import validate_install_dir, is_win, get_version_from_build
 
 from dtest_config import DTestConfig
@@ -75,6 +76,8 @@ def pytest_addoption(parser):
                           "after the test completes")
     parser.addoption("--enable-jacoco-code-coverage", action="store_true", default=False,
                      help="Enable JaCoCo Code Coverage Support")
+    parser.addoption("--upgrade-version-selection", action="store", default="indev",
+                     help="Specify whether to run indev, releases, or both")
 
 
 def sufficient_system_resources_for_resource_intensive_tests():
@@ -364,12 +367,29 @@ def fixture_since(request, fixture_dtest_setup):
 
         since_str = request.node.get_closest_marker('since').args[0]
         since = LooseVersion(since_str)
-        # use cassandra_version_from_build as it's guaranteed to be a LooseVersion
-        # whereas cassandra_version may be a string if set in the cli options
-        current_running_version = fixture_dtest_setup.dtest_config.cassandra_version_from_build
-        skip_msg = _skip_msg(current_running_version, since, max_version)
-        if skip_msg:
-            pytest.skip(skip_msg)
+        # For upgrade tests don't run the test if any of the involved versions
+        # are excluded by the annotation
+        if hasattr(request.cls, "UPGRADE_PATH"):
+            upgrade_path = request.cls.UPGRADE_PATH
+            ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.starting_meta.version)
+            starting_version = get_version_from_build(ccm_repo_cache_dir)
+            skip_msg = _skip_msg(starting_version, since, max_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
+            ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.upgrade_meta.version)
+            ending_version = get_version_from_build(ccm_repo_cache_dir)
+            skip_msg = _skip_msg(ending_version, since, max_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
+        else:
+            # For regular tests the value in the current cluster actually means something so we should
+            # use that to check.
+            # Use cassandra_version_from_build as it's guaranteed to be a LooseVersion
+            # whereas cassandra_version may be a string if set in the cli options
+            current_running_version = fixture_dtest_setup.dtest_config.cassandra_version_from_build
+            skip_msg = _skip_msg(current_running_version, since, max_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
 
 
 @pytest.fixture(autouse=True)
@@ -409,13 +429,16 @@ def pytest_collection_modifyitems(items, config):
     This function is called upon during the pytest test collection phase and allows for modification
     of the test items within the list
     """
-    if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None:
-        if config.getoption("--cassandra-version") is None:
+    collect_only = config.getoption("--collect-only")
+    cassandra_dir = config.getoption("--cassandra-dir")
+    cassandra_version = config.getoption("--cassandra-version")
+    if not collect_only and cassandra_dir is None:
+        if  cassandra_version is None:
             raise Exception("Required dtest arguments were missing! You must provide either --cassandra-dir "
                             "or --cassandra-version. Refer to the documentation or invoke the help with --help.")
 
     # Either cassandra_version or cassandra_dir is defined, so figure out the version
-    CASSANDRA_VERSION = config.getoption("--cassandra-version") or get_version_from_build(config.getoption("--cassandra-dir"))
+    CASSANDRA_VERSION = cassandra_version or get_version_from_build(cassandra_dir)
 
     # Check that use_off_heap_memtables is supported in this c* version
     if config.getoption("--use-off-heap-memtables") and ("3.0" <= CASSANDRA_VERSION < "3.4"):
@@ -433,16 +456,17 @@ def pytest_collection_modifyitems(items, config):
     for item in items:
         deselect_test = False
 
-        if item.get_closest_marker("resource_intensive"):
-            if config.getoption("--force-resource-intensive-tests"):
-                pass
-            if config.getoption("--skip-resource-intensive-tests"):
-                deselect_test = True
-                logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of "
-                      "this test re-run with the --force-resource-intensive-tests command line argument" % item.name)
-            if not sufficient_system_resources_resource_intensive:
-                deselect_test = True
-                logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name)
+        if item.get_closest_marker("resource_intensive") and not collect_only:
+            force_resource_intensive = config.getoption("--force-resource-intensive-tests")
+            skip_resource_intensive = config.getoption("--skip-resource-intensive-tests")
+            if not force_resource_intensive:
+                if skip_resource_intensive:
+                    deselect_test = True
+                    logger.info("SKIP: Deselecting test %s as test marked resource_intensive. To force execution of "
+                          "this test re-run with the --force-resource-intensive-tests command line argument" % item.name)
+                if not sufficient_system_resources_resource_intensive:
+                    deselect_test = True
+                    logger.info("SKIP: Deselecting resource_intensive test %s due to insufficient system resources" % item.name)
 
         if item.get_closest_marker("no_vnodes"):
             if config.getoption("--use-vnodes"):
@@ -480,4 +504,3 @@ def pytest_collection_modifyitems(items, config):
 
     config.hook.pytest_deselected(items=deselected_items)
     items[:] = selected_items
-
diff --git a/cqlsh_tests/cqlsh_copy_tests.py b/cqlsh_tests/cqlsh_copy_tests.py
index 9769fd7..662fb31 100644
--- a/cqlsh_tests/cqlsh_copy_tests.py
+++ b/cqlsh_tests/cqlsh_copy_tests.py
@@ -58,6 +58,7 @@ class UTC(datetime.tzinfo):
         return datetime.timedelta(0)
 
 
+@pytest.mark.skip("These aren't functioning just yet")
 class TestCqlshCopy(Tester):
     """
     Tests the COPY TO and COPY FROM features in cqlsh.
diff --git a/cqlsh_tests/cqlsh_tests.py b/cqlsh_tests/cqlsh_tests.py
index ef38ee2..e5c601c 100644
--- a/cqlsh_tests/cqlsh_tests.py
+++ b/cqlsh_tests/cqlsh_tests.py
@@ -27,6 +27,7 @@ since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
 
+@pytest.mark.skip("These aren't functioning just yet")
 class TestCqlsh(Tester):
 
     @classmethod
@@ -1673,6 +1674,7 @@ Tracing session:""")
         return p.communicate()
 
 
+@pytest.mark.skip("These aren't functioning just yet")
 class TestCqlshSmoke(Tester):
     """
     Tests simple use cases for clqsh.
@@ -1945,6 +1947,7 @@ class TestCqlshSmoke(Tester):
         return [table.name for table in list(self.session.cluster.metadata.keyspaces[keyspace].tables.values())]
 
 
+@pytest.mark.skip("These aren't functioning just yet")
 class CqlLoginTest(Tester):
     """
     Tests login which requires password authenticator
diff --git a/dtest.py b/dtest.py
index 095ccd0..9027d75 100644
--- a/dtest.py
+++ b/dtest.py
@@ -8,6 +8,8 @@ import sys
 import threading
 import time
 import traceback
+from distutils.version import LooseVersion
+
 import pytest
 import cassandra
 
@@ -41,6 +43,8 @@ if len(config.read(os.path.expanduser('~/.cassandra-dtest'))) > 0:
 
 RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true')
 
+MAJOR_VERSION_4 = LooseVersion('4.0')
+
 logger = logging.getLogger(__name__)
 
 
@@ -236,6 +240,7 @@ class Tester:
     def set_dtest_setup_on_function(self, fixture_dtest_setup):
         self.fixture_dtest_setup = fixture_dtest_setup
         self.dtest_config = fixture_dtest_setup.dtest_config
+        return None
 
     def set_node_to_current_version(self, node):
         version = os.environ.get('CASSANDRA_VERSION')
@@ -454,12 +459,12 @@ def run_scenarios(scenarios, handler, deferred_exceptions=tuple()):
         try:
             handler(scenario)
         except deferred_exceptions as e:
-            tracebacks.append(traceback.format_exc(sys.exc_info()))
+            tracebacks.append(traceback.format_exc())
             errors.append(type(e)('encountered {} {} running scenario:\n  {}\n'.format(e.__class__.__name__, str(e), scenario)))
             logger.debug("scenario {}/{} encountered a deferrable exception, continuing".format(i, len(scenarios)))
         except Exception as e:
             # catch-all for any exceptions not intended to be deferred
-            tracebacks.append(traceback.format_exc(sys.exc_info()))
+            tracebacks.append(traceback.format_exc())
             errors.append(type(e)('encountered {} {} running scenario:\n  {}\n'.format(e.__class__.__name__, str(e), scenario)))
             logger.debug("scenario {}/{} encountered a non-deferrable exception, aborting".format(i, len(scenarios)))
             raise MultiError(errors, tracebacks)
diff --git a/dtest_setup.py b/dtest_setup.py
index 9e3f330..4b2ece9 100644
--- a/dtest_setup.py
+++ b/dtest_setup.py
@@ -420,6 +420,15 @@ class DTestSetup:
         if self.cluster.version() >= '4':
             values['corrupted_tombstone_strategy'] = 'exception'
 
+        if self.dtest_config.use_vnodes:
+            self.cluster.set_configuration_options(
+                values={'initial_token': None, 'num_tokens': self.dtest_config.num_tokens})
+        else:
+            self.cluster.set_configuration_options(values={'num_tokens': None})
+
+        if self.dtest_config.use_off_heap_memtables:
+            self.cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'})
+
         self.cluster.set_configuration_options(values)
         logger.debug("Done setting configuration options:\n" + pprint.pformat(self.cluster._config_options, indent=4))
 
@@ -454,7 +463,7 @@ class DTestSetup:
 
     @staticmethod
     def create_ccm_cluster(dtest_setup):
-        logger.debug("cluster ccm directory: " + dtest_setup.test_path)
+        logger.info("cluster ccm directory: " + dtest_setup.test_path)
         version = dtest_setup.dtest_config.cassandra_version
 
         if version:
@@ -462,14 +471,6 @@ class DTestSetup:
         else:
             cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_dir=dtest_setup.dtest_config.cassandra_dir)
 
-        if dtest_setup.dtest_config.use_vnodes:
-            cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': dtest_setup.dtest_config.num_tokens})
-        else:
-            cluster.set_configuration_options(values={'num_tokens': None})
-
-        if dtest_setup.dtest_config.use_off_heap_memtables:
-            cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'})
-
         cluster.set_datadir_count(dtest_setup.dtest_config.data_dir_count)
         cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', dtest_setup.dtest_config.jemalloc_path)
 
@@ -513,3 +514,12 @@ class DTestSetup:
         # write_last_test_file(cls.test_path, cls.cluster)
 
         # cls.post_initialize_cluster()
+
+    def reinitialize_cluster_for_different_version(self):
+        """
+        This method is used by upgrade tests to re-init the cluster to work with a specific
+        version that may not be compatible with the existing configuration options
+        """
+        self.init_default_config()
+
+
diff --git a/pytest.ini b/pytest.ini
index f088fa5..c8e85d4 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,6 +1,7 @@
 [pytest]
+python_files = test_*.py *_test.py *_tests.py
 junit_suite_name = Cassandra dtests
 log_print = True
 log_level = INFO
 log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s
-timeout = 900
\ No newline at end of file
+timeout = 900
diff --git a/sstable_generation_loading_test.py b/sstable_generation_loading_test.py
index ab99a03..901011d 100644
--- a/sstable_generation_loading_test.py
+++ b/sstable_generation_loading_test.py
@@ -2,18 +2,19 @@ import os
 import subprocess
 import time
 import distutils.dir_util
+from distutils.version import LooseVersion
+
 import pytest
 import logging
 
 from ccmlib import common as ccmcommon
 
-from dtest import Tester, create_ks, create_cf
+from dtest import Tester, create_ks, create_cf, MAJOR_VERSION_4
 from tools.assertions import assert_all, assert_none, assert_one
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
-
 # WARNING: sstableloader tests should be added to TestSSTableGenerationAndLoading (below),
 # and not to BaseSStableLoaderTest (which is shared with upgrade tests)
 
@@ -27,45 +28,62 @@ class TestBaseSStableLoader(Tester):
         fixture_dtest_setup.allow_log_errors = True
 
     upgrade_from = None
-    compact = False
+    test_compact = False
+
+    def compact(self):
+        return self.fixture_dtest_setup.cluster.version() < MAJOR_VERSION_4 and self.test_compact
 
     def create_schema(self, session, ks, compression):
         create_ks(session, ks, rf=2)
-        create_cf(session, "standard1", compression=compression, compact_storage=self.compact)
+        create_cf(session, "standard1", compression=compression, compact_storage=self.compact())
         create_cf(session, "counter1", compression=compression, columns={'v': 'counter'},
-                  compact_storage=self.compact)
+                  compact_storage=self.compact())
+
+    def skip_base_class_test(self):
+        if self.__class__.__name__ != 'TestBasedSSTableLoader' and self.upgrade_from is None:
+            pytest.skip("Don't need to run base class test, only derived classes")
 
     def test_sstableloader_compression_none_to_none(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration(None, None)
 
     def test_sstableloader_compression_none_to_snappy(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration(None, 'Snappy')
 
     def test_sstableloader_compression_none_to_deflate(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration(None, 'Deflate')
 
     def test_sstableloader_compression_snappy_to_none(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Snappy', None)
 
     def test_sstableloader_compression_snappy_to_snappy(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Snappy', 'Snappy')
 
     def test_sstableloader_compression_snappy_to_deflate(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Snappy', 'Deflate')
 
     def test_sstableloader_compression_deflate_to_none(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Deflate', None)
 
     def test_sstableloader_compression_deflate_to_snappy(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Deflate', 'Snappy')
 
     def test_sstableloader_compression_deflate_to_deflate(self):
+        self.skip_base_class_test()
         self.load_sstable_with_configuration('Deflate', 'Deflate')
 
     def test_sstableloader_with_mv(self):
         """
         @jira_ticket CASSANDRA-11275
         """
+        self.skip_base_class_test()
         def create_schema_with_mv(session, ks, compression):
             self.create_schema(session, ks, compression)
             # create a materialized view
@@ -125,9 +143,11 @@ class TestBaseSStableLoader(Tester):
         cluster = self.cluster
         if self.upgrade_from:
             logger.debug("Generating sstables with version %s" % (self.upgrade_from))
+            default_install_version = self.cluster.version()
             default_install_dir = self.cluster.get_install_dir()
             # Forcing cluster version on purpose
             cluster.set_install_dir(version=self.upgrade_from)
+            self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         logger.debug("Using jvm_args={}".format(self.jvm_args))
         cluster.populate(2).start(jvm_args=list(self.jvm_args))
         node1, node2 = cluster.nodelist()
@@ -141,6 +161,16 @@ class TestBaseSStableLoader(Tester):
             session.execute("UPDATE standard1 SET v='{}' WHERE KEY='{}' AND c='col'".format(i, i))
             session.execute("UPDATE counter1 SET v=v+1 WHERE KEY='{}'".format(i))
 
+        #Will upgrade to a version that doesn't support compact storage so revert the compact
+        #storage, this doesn't actually fix it yet
+        if self.compact() and default_install_version >= MAJOR_VERSION_4:
+            session.execute('alter table standard1 drop compact storage');
+            session.execute('alter table counter1 drop compact storage');
+            node1.nodetool('rebuild')
+            node1.nodetool('cleanup')
+            node2.nodetool('rebuild')
+            node2.nodetool('cleanup')
+
         node1.nodetool('drain')
         node1.stop()
         node2.nodetool('drain')
@@ -158,6 +188,7 @@ class TestBaseSStableLoader(Tester):
             logger.debug("Running sstableloader with version from %s" % (default_install_dir))
             # Return to previous version
             cluster.set_install_dir(install_dir=default_install_dir)
+            self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
         cluster.start(jvm_args=list(self.jvm_args))
         time.sleep(5)  # let gossip figure out what is going on
diff --git a/tools/assertions.py b/tools/assertions.py
index 2f826ae..bce9b8a 100644
--- a/tools/assertions.py
+++ b/tools/assertions.py
@@ -346,3 +346,11 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
                 sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
 
     assert sorted_list1 == sorted_list2
+
+
+def assert_lists_of_dicts_equal(list1, list2):
+    for adict, bdict in zip(list1, list2):
+        assert(len(adict) == len(bdict))
+        for key, value in adict.items():
+            assert key in bdict
+            assert bdict[key] == value
diff --git a/tools/misc.py b/tools/misc.py
index aa2c084..a4502f1 100644
--- a/tools/misc.py
+++ b/tools/misc.py
@@ -3,6 +3,7 @@ import subprocess
 import time
 import hashlib
 import logging
+import pytest
 
 from collections import Mapping
 
@@ -97,7 +98,9 @@ def list_to_hashed_dict(list):
                 normalized_list.append(tmp_list)
             else:
                 normalized_list.append(item)
-        list_digest = hashlib.sha256(str(normalized_list).encode('utf-8', 'ignore')).hexdigest()
+        list_str = str(normalized_list)
+        utf8 = list_str.encode('utf-8', 'ignore')
+        list_digest = hashlib.sha256(utf8).hexdigest()
         hashed_dict[list_digest] = normalized_list
     return hashed_dict
 
@@ -136,3 +139,20 @@ class ImmutableMapping(Mapping):
 
     def __repr__(self):
         return '{cls}({data})'.format(cls=self.__class__.__name__, data=self._data)
+
+
+def wait_for_agreement(thrift, timeout=10):
+    def check_agreement():
+        schemas = thrift.describe_schema_versions()
+        if len([ss for ss in list(schemas.keys()) if ss != 'UNREACHABLE']) > 1:
+            raise Exception("schema agreement not reached")
+    retry_till_success(check_agreement, timeout=timeout)
+
+
+def add_skip(cls, reason=""):
+    if hasattr(cls, "pytestmark"):
+        cls.pytestmark = cls.pytestmark.copy()
+        cls.pytestmark.append(pytest.mark.skip(reason))
+    else:
+        cls.pytestmark = [pytest.mark.skip(reason)]
+    return cls
diff --git a/tools/paging.py b/tools/paging.py
index 0d99bfd..7153ed6 100644
--- a/tools/paging.py
+++ b/tools/paging.py
@@ -165,7 +165,19 @@ class PageAssertionMixin(object):
     """Can be added to subclasses of unittest.Tester"""
 
     def assertEqualIgnoreOrder(self, actual, expected):
-        assert list_to_hashed_dict(actual) == list_to_hashed_dict(expected)
+        hashed_expected = list_to_hashed_dict(expected)
+        hashed_actual = list_to_hashed_dict(actual)
+        for key, expected in hashed_expected.items():
+            assert key in hashed_actual, "expected %s not in actual" % str(expected)
+            actual = hashed_actual[key]
+            assert actual == expected, "actual %s not same as expected %s" % (str(actual), str(expected))
+
+        for key, actual in hashed_actual.items():
+            assert key in hashed_expected, "actual %s not in expected" % str(actual)
+            expected = hashed_expected[key]
+            assert expected == actual, "expected %s not same as actual %s" % (str(expected), str(actual))
+
+        assert hashed_expected == hashed_actual
 
 
     def assertIsSubsetOf(self, subset, superset):
diff --git a/upgrade_tests/compatibility_flag_test.py b/upgrade_tests/compatibility_flag_test.py
index f308174..3711841 100644
--- a/upgrade_tests/compatibility_flag_test.py
+++ b/upgrade_tests/compatibility_flag_test.py
@@ -25,6 +25,7 @@ class TestCompatibilityFlag(Tester):
         cluster.populate(2)
         node1, node2 = cluster.nodelist()
         cluster.set_install_dir(version=from_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         cluster.start(wait_for_binary_proto=True)
 
         node1.drain()
@@ -46,6 +47,7 @@ class TestCompatibilityFlag(Tester):
         cluster.populate(2)
         node1, node2 = cluster.nodelist()
         cluster.set_install_dir(version=from_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         cluster.start(wait_for_binary_proto=True)
 
         node1.drain()
diff --git a/upgrade_tests/conftest.py b/upgrade_tests/conftest.py
new file mode 100644
index 0000000..ecd4753
--- /dev/null
+++ b/upgrade_tests/conftest.py
@@ -0,0 +1,4 @@
+from .upgrade_manifest import set_config
+
+def pytest_configure(config):
+    set_config(config)
diff --git a/upgrade_tests/cql_tests.py b/upgrade_tests/cql_tests.py
index f08a141..4350877 100644
--- a/upgrade_tests/cql_tests.py
+++ b/upgrade_tests/cql_tests.py
@@ -16,7 +16,7 @@ from cassandra.protocol import ProtocolException, SyntaxException
 from cassandra.query import SimpleStatement
 from cassandra.util import sortedset
 
-from dtest import RUN_STATIC_UPGRADE_MATRIX
+from dtest import RUN_STATIC_UPGRADE_MATRIX, MAJOR_VERSION_4
 from thrift_bindings.thrift010.ttypes import \
     ConsistencyLevel as ThriftConsistencyLevel
 from thrift_bindings.thrift010.ttypes import (CfDef, Column, ColumnDef,
@@ -27,6 +27,7 @@ from thrift_test import get_thrift_client
 from tools.assertions import (assert_all, assert_invalid, assert_length_equal,
                               assert_none, assert_one, assert_row_count)
 from tools.data import rows_to_list
+from tools.misc import add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
 
@@ -37,6 +38,9 @@ logger = logging.getLogger(__name__)
 @pytest.mark.upgrade_test
 class TestCQL(UpgradeTester):
 
+    def is_40_or_greater(self):
+        return self.UPGRADE_PATH.upgrade_meta.family in ('trunk', '4.0')
+
     def test_static_cf(self):
         """ Test static CF syntax """
         cursor = self.prepare()
@@ -79,7 +83,7 @@ class TestCQL(UpgradeTester):
 
             assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None], [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]])
 
-    @since('2.0', max_version='3')  # 3.0+ not compatible with protocol version 2
+    @since('2.0', max_version='2.99')  # 3.0+ not compatible with protocol version 2
     def test_large_collection_errors(self):
         """ For large collections, make sure that we are printing warnings """
         for version in self.get_node_versions():
@@ -128,6 +132,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE users DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE users")
@@ -139,15 +147,25 @@ class TestCQL(UpgradeTester):
             # Queries
             assert_one(cursor, "SELECT firstname, lastname FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", ['Frodo', 'Baggins'])
 
-            assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000", [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins'])
-
-            # FIXME There appears to be some sort of problem with reusable cells
-            # when executing this query.  It's likely that CASSANDRA-9705 will
-            # fix this, but I'm not 100% sure.
-            assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479", [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'])
-
-            assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'],
-                                                       [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']])
+            if self.is_40_or_greater():
+                assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000",
+                           [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 32, 'Frodo', 'Baggins', None])
+                assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479",
+                           [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 33, 'Samwise', 'Gamgee', None])
+                assert_all(cursor, "SELECT * FROM users",
+                           [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 33, 'Samwise', 'Gamgee', None],
+                            [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 32, 'Frodo', 'Baggins', None]])
+            else:
+                assert_one(cursor, "SELECT * FROM users WHERE userid = 550e8400-e29b-41d4-a716-446655440000",
+                           [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins'])
+                # FIXME There appears to be some sort of problem with reusable cells
+                # when executing this query.  It's likely that CASSANDRA-9705 will
+                # fix this, but I'm not 100% sure.
+                assert_one(cursor, "SELECT * FROM users WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479",
+                           [UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'])
+                assert_all(cursor, "SELECT * FROM users",
+                           [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 33, 'Samwise', 'Gamgee'],
+                            [UUID('550e8400-e29b-41d4-a716-446655440000'), 32, 'Frodo', 'Baggins']])
 
             # Test batch inserts
             cursor.execute("""
@@ -159,8 +177,12 @@ class TestCQL(UpgradeTester):
                 APPLY BATCH
             """)
 
-            assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None],
-                                                       [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]])
+            if self.is_40_or_greater():
+                assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), None, 37, None, None, None],
+                                                           [UUID('550e8400-e29b-41d4-a716-446655440000'), None, 36, None, None, None]])
+            else:
+                assert_all(cursor, "SELECT * FROM users", [[UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479'), 37, None, None],
+                                                           [UUID('550e8400-e29b-41d4-a716-446655440000'), 36, None, None]])
 
     def test_dynamic_cf(self):
         """ Test non-composite dynamic CF syntax """
@@ -175,6 +197,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE clicks")
@@ -196,8 +222,9 @@ class TestCQL(UpgradeTester):
 
             assert_all(cursor, "SELECT time FROM clicks", [[24], [12], [128], [24], [12], [42]])
 
-            # Check we don't allow empty values for url since this is the full underlying cell name (#6152)
-            assert_invalid(cursor, "INSERT INTO clicks (userid, url, time) VALUES (810e8500-e29b-41d4-a716-446655440000, '', 42)")
+            if not self.is_40_or_greater():
+                # Check we don't allow empty values for url since this is the full underlying cell name (#6152)
+                assert_invalid(cursor, "INSERT INTO clicks (userid, url, time) VALUES (810e8500-e29b-41d4-a716-446655440000, '', 42)")
 
     def test_dense_cf(self):
         """ Test composite 'dense' CF syntax """
@@ -213,6 +240,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE connections DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE connections")
@@ -224,8 +255,12 @@ class TestCQL(UpgradeTester):
             cursor.execute("UPDATE connections SET time = 24 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.2' AND port = 80")
 
             # we don't have to include all of the clustering columns (see CASSANDRA-7990)
-            cursor.execute("INSERT INTO connections (userid, ip, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 42)")
-            cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'")
+            if self.is_40_or_greater():
+                cursor.execute("INSERT INTO connections (userid, ip, port, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 80, 42)")
+                cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4' AND port = 90")
+            else:
+                cursor.execute("INSERT INTO connections (userid, ip, time) VALUES (f47ac10b-58cc-4372-a567-0e02b2c3d479, '192.168.0.3', 42)")
+                cursor.execute("UPDATE connections SET time = 42 WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'")
 
             # Queries
             assert_all(cursor, "SELECT ip, port, time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000",
@@ -239,17 +274,23 @@ class TestCQL(UpgradeTester):
 
             assert_none(cursor, "SELECT ip, port, time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000 and ip > '192.168.0.2'")
 
-            assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'",
-                       ['192.168.0.3', None, 42])
-
-            assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'",
-                       ['192.168.0.4', None, 42])
+            if self.is_40_or_greater():
+                assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'",
+                           ['192.168.0.3', 80, 42])
+                assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'",
+                           ['192.168.0.4', 90, 42])
+            else:
+                assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.3'",
+                           ['192.168.0.3', None, 42])
+                assert_one(cursor, "SELECT ip, port, time FROM connections WHERE userid = f47ac10b-58cc-4372-a567-0e02b2c3d479 AND ip = '192.168.0.4'",
+                           ['192.168.0.4', None, 42])
 
             # Deletion
             cursor.execute("DELETE time FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000 AND ip = '192.168.0.2' AND port = 80")
 
             res = list(cursor.execute("SELECT * FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000"))
-            assert_length_equal(res, 2)
+            #Without compact storage deleting just the column leaves the row behind
+            assert_length_equal(res, 3 if self.is_40_or_greater() else 2)
 
             cursor.execute("DELETE FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000")
             assert_none(cursor, "SELECT * FROM connections WHERE userid = 550e8400-e29b-41d4-a716-446655440000")
@@ -310,6 +351,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE clicks")
@@ -337,6 +382,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE clicks")
@@ -408,6 +457,7 @@ class TestCQL(UpgradeTester):
             res = list(cursor.execute("SELECT * FROM clicks LIMIT 4"))
             assert_length_equal(res, 4)
 
+    @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14958")
     def test_counters(self):
         """ Validate counter support """
         cursor = self.prepare()
@@ -421,6 +471,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE clicks DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE clicks")
@@ -516,6 +570,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -564,6 +622,11 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;")
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test1")
@@ -613,6 +676,10 @@ class TestCQL(UpgradeTester):
             );
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test1")
@@ -664,6 +731,11 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE Test DROP COMPACT STORAGE;")
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -901,6 +973,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE testcf2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE testcf")
@@ -1485,7 +1561,7 @@ class TestCQL(UpgradeTester):
 
             assert_all(cursor, "SELECT * FROM test WHERE token(k1, k2) > " + str(-((2 ** 63) - 1)), [[0, 2, 2, 2], [0, 3, 3, 3], [0, 0, 0, 0], [0, 1, 1, 1]])
 
-    @since('2', max_version='4')
+    @since('2', max_version='3.99')
     def test_cql3_insert_thrift(self):
         """
         Check that we can insert from thrift into a CQL3 table
@@ -1514,7 +1590,7 @@ class TestCQL(UpgradeTester):
             key = struct.pack('>i', 2)
             column_name_component = struct.pack('>i', 4)
             # component length + component + EOC + component length + component + EOC
-            column_name = '\x00\x04' + column_name_component + '\x00' + '\x00\x01' + 'v' + '\x00'
+            column_name = b'\x00\x04' + column_name_component + b'\x00' + b'\x00\x01' + 'v'.encode() + b'\x00'
             value = struct.pack('>i', 8)
             client.batch_mutate(
                 {key: {'test': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name, value=value, timestamp=100)))]}},
@@ -1522,7 +1598,7 @@ class TestCQL(UpgradeTester):
 
             assert_one(cursor, "SELECT * FROM test", [2, 4, 8])
 
-    @since('2', max_version='4')
+    @since('2', max_version='3.99')
     def test_cql3_non_compound_range_tombstones(self):
         """
         Checks that 3.0 serializes RangeTombstoneLists correctly
@@ -1538,7 +1614,7 @@ class TestCQL(UpgradeTester):
         client.set_keyspace('ks')
 
         # create a CF with mixed static and dynamic cols
-        column_defs = [ColumnDef('static1', 'Int32Type', None, None, None)]
+        column_defs = [ColumnDef('static1'.encode(), 'Int32Type', None, None, None)]
         cfdef = CfDef(
             keyspace='ks',
             name='cf',
@@ -1568,34 +1644,34 @@ class TestCQL(UpgradeTester):
             client.set_keyspace('ks')
 
             # insert a number of keys so that we'll get rows on both the old and upgraded nodes
-            for key in ['key{}'.format(i) for i in range(10)]:
-                logger.debug("Using key " + key)
+            for key in ['key{}'.format(i).encode() for i in range(10)]:
+                logger.debug("Using key " + key.decode())
 
                 # insert "static" column
                 client.batch_mutate(
-                    {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name='static1', value=struct.pack('>i', 1), timestamp=100)))]}},
+                    {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name='static1'.encode(), value=struct.pack('>i', 1), timestamp=100)))]}},
                     ThriftConsistencyLevel.ALL)
 
                 # insert "dynamic" columns
                 for i, column_name in enumerate(('a', 'b', 'c', 'd', 'e')):
                     column_value = 'val{}'.format(i)
                     client.batch_mutate(
-                        {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name, value=column_value, timestamp=100)))]}},
+                        {key: {'cf': [Mutation(ColumnOrSuperColumn(column=Column(name=column_name.encode(), value=column_value.encode(), timestamp=100)))]}},
                         ThriftConsistencyLevel.ALL)
 
                 # sanity check on the query
-                fetch_slice = SlicePredicate(slice_range=SliceRange('', '', False, 100))
+                fetch_slice = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 100))
                 row = client.get_slice(key, ColumnParent(column_family='cf'), fetch_slice, ThriftConsistencyLevel.ALL)
                 assert 6 == len(row), row
-                cols = OrderedDict([(cosc.column.name, cosc.column.value) for cosc in row])
+                cols = OrderedDict([(cosc.column.name.decode(), cosc.column.value) for cosc in row])
                 logger.debug(cols)
                 assert ['a', 'b', 'c', 'd', 'e', 'static1'] == list(cols.keys())
-                assert 'val0' == cols['a']
-                assert 'val4' == cols['e']
+                assert 'val0'.encode() == cols['a']
+                assert 'val4'.encode() == cols['e']
                 assert struct.pack('>i', 1) == cols['static1']
 
                 # delete a slice of dynamic columns
-                slice_range = SliceRange('b', 'd', False, 100)
+                slice_range = SliceRange('b'.encode(), 'd'.encode(), False, 100)
                 client.batch_mutate(
                     {key: {'cf': [Mutation(deletion=Deletion(timestamp=101, predicate=SlicePredicate(slice_range=slice_range)))]}},
                     ThriftConsistencyLevel.ALL)
@@ -1603,11 +1679,11 @@ class TestCQL(UpgradeTester):
                 # check remaining columns
                 row = client.get_slice(key, ColumnParent(column_family='cf'), fetch_slice, ThriftConsistencyLevel.ALL)
                 assert 3 == len(row), row
-                cols = OrderedDict([(cosc.column.name, cosc.column.value) for cosc in row])
+                cols = OrderedDict([(cosc.column.name.decode(), cosc.column.value) for cosc in row])
                 logger.debug(cols)
                 assert ['a', 'e', 'static1'] == list(cols.keys())
-                assert 'val0' == cols['a']
-                assert 'val4' == cols['e']
+                assert 'val0'.encode() == cols['a']
+                assert 'val4'.encode() == cols['e']
                 assert struct.pack('>i', 1) == cols['static1']
 
     def test_row_existence(self):
@@ -1673,6 +1749,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -1692,7 +1772,9 @@ class TestCQL(UpgradeTester):
                     cursor.execute(q, (k, c))
 
             query = "SELECT * FROM test2"
-            assert_all(cursor, query, [[x, y] for x in range(0, 2) for y in range(0, 2)])
+            expected = [[x, y, None] for x in range(0, 2) for y in range(0, 2)] if self.is_40_or_greater() else [
+                [x, y] for x in range(0, 2) for y in range(0, 2)]
+            assert_all(cursor, query, expected)
 
     def test_no_clustering(self):
         cursor = self.prepare()
@@ -2043,6 +2125,11 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test1 DROP COMPACT STORAGE;")
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test1")
@@ -2108,6 +2195,10 @@ class TestCQL(UpgradeTester):
                   AND CLUSTERING ORDER BY(c1 DESC, c2 DESC);
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -2354,6 +2445,7 @@ class TestCQL(UpgradeTester):
             query = "SELECT blog_id, content FROM blogs WHERE author='foo'"
             assert_all(cursor, query, [[1, set(['bar1', 'bar2'])], [1, set(['bar2', 'bar3'])], [2, set(['baz'])]])
 
+    @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14961")
     def test_truncate_clean_cache(self):
         cursor = self.prepare(ordered=True, use_cache=True)
 
@@ -2412,7 +2504,7 @@ class TestCQL(UpgradeTester):
             for i in random.sample(range(nb_keys), nb_deletes):
                 cursor.execute("DELETE FROM test WHERE k = {}".format(i))
 
-            res = list(cursor.execute("SELECT * FROM test LIMIT {}".format(nb_keys / 2)))
+            res = list(cursor.execute("SELECT * FROM test LIMIT {}".format(int(nb_keys / 2))))
             assert_length_equal(res, nb_keys / 2)
 
     def test_collection_function(self):
@@ -2497,6 +2589,10 @@ class TestCQL(UpgradeTester):
 
         cursor.execute(create)
 
+        #4.0 doesn't support compact storage
+        if compact and self.is_40_or_greater():
+            cursor.execute("ALTER TABLE zipcodes DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE zipcodes")
@@ -2551,6 +2647,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -2675,12 +2775,16 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE;
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE bar DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE bar")
 
             cursor.execute("INSERT INTO bar (id, i) VALUES (1, 2);")
-            assert_one(cursor, "SELECT * FROM bar", [1, 2])
+            assert_one(cursor, "SELECT * FROM bar", [1, None, 2, None] if self.is_40_or_greater() else [1, 2])
 
     def test_query_compact_tables_during_upgrade(self):
         """
@@ -2708,6 +2812,10 @@ class TestCQL(UpgradeTester):
                                      [(i, i) for i in range(100)])
         self.cluster.flush()
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE t1 DROP COMPACT STORAGE;")
+
         def check_read_all(cursor):
             read_count = 0
             # first read each row separately - obviously, we should be able to retrieve all 100
@@ -2895,7 +3003,7 @@ class TestCQL(UpgradeTester):
             assert_one(cursor, query, [3, 'foobar'])
 
     # Fixed by CASSANDRA-12654 in 3.12
-    @since('2.0', max_version='3.12')
+    @since('2.0', max_version='3.11.99')
     def test_IN_clause_on_last_key(self):
         """
         Tests patch to improve validation by not throwing an assertion when using map, list, or set
@@ -2960,9 +3068,9 @@ class TestCQL(UpgradeTester):
             cursor.execute("TRUNCATE test")
 
             cursor.execute("INSERT INTO test (k, b) VALUES (0, 0x)")
-            assert_one(cursor, "SELECT * FROM test", [0, ''])
+            assert_one(cursor, "SELECT * FROM test", [0, ''.encode()])
 
-    @since('2', max_version='4')
+    @since('2', max_version='3.99')
     def test_rename(self):
         cursor = self.prepare(start_rpc=True)
 
@@ -3225,7 +3333,7 @@ class TestCQL(UpgradeTester):
             # test aliasing a regular function
             res = cursor.execute('SELECT intAsBlob(id) AS id_blob FROM users WHERE id = 0')
             assert 'id_blob' == res[0]._fields[0]
-            assert '\x00\x00\x00\x00' == res[0].id_blob
+            assert '\x00\x00\x00\x00' == res[0].id_blob.decode()
 
             logger.debug("Current node version is {}".format(self.get_node_version(is_upgraded)))
 
@@ -3261,6 +3369,10 @@ class TestCQL(UpgradeTester):
         # Same test, but for compact
         cursor.execute("CREATE TABLE test_compact (k1 int, k2 int, v int, PRIMARY KEY (k1, k2)) WITH COMPACT STORAGE")
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test_compact DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -3331,6 +3443,11 @@ class TestCQL(UpgradeTester):
         # Test a 'wide row' thrift table.
         cursor.execute('CREATE TABLE wide (pk int, name text, val int, PRIMARY KEY(pk, name)) WITH COMPACT STORAGE')
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE compact DROP COMPACT STORAGE;")
+            cursor.execute("ALTER TABLE wide DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE regular")
@@ -4194,7 +4311,7 @@ class TestCQL(UpgradeTester):
 
             assert_invalid(cursor, "SELECT v1, v2, v3 FROM test WHERE k = 0 AND (v1, v3) > (1, 0)")
 
-    @since('2.0', max_version='3')  # 3.0+ not compatible with protocol version 2
+    @since('2.0', max_version='2.99')  # 3.0+ not compatible with protocol version 2
     def test_v2_protocol_IN_with_tuples(self):
         """
         @jira_ticket CASSANDRA-8062
@@ -4296,6 +4413,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE lock DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE lock")
@@ -4868,6 +4989,10 @@ class TestCQL(UpgradeTester):
             ) WITH COMPACT STORAGE
         """)
 
+        #4.0 doesn't support compact storage
+        if self.is_40_or_greater():
+            cursor.execute("ALTER TABLE test DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             cursor.execute("TRUNCATE test")
@@ -4879,9 +5004,14 @@ class TestCQL(UpgradeTester):
             assert_all(cursor, "SELECT v FROM test WHERE k=0 AND v > 0 AND v <= 4 LIMIT 2", [[1], [2]])
             assert_all(cursor, "SELECT v FROM test WHERE k=0 AND v > -1 AND v <= 4 LIMIT 2", [[0], [1]])
 
-            assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1], [0, 2]])
-            assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0], [0, 1]])
-            assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1], [0, 2], [0, 3], [1, 1], [1, 2], [1, 3]])
+            if self.is_40_or_greater():
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1, None], [0, 2, None]])
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0, None], [0, 1, None]])
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1, None], [0, 2, None], [0, 3, None], [1, 1, None], [1, 2, None], [1, 3, None]])
+            else:
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 2", [[0, 1], [0, 2]])
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > -1 AND v <= 4 LIMIT 2", [[0, 0], [0, 1]])
+                assert_all(cursor, "SELECT * FROM test WHERE k IN (0, 1, 2) AND v > 0 AND v <= 4 LIMIT 6", [[0, 1], [0, 2], [0, 3], [1, 1], [1, 2], [1, 3]])
 
             # This doesn't work -- see #7059
             # assert_all(cursor, "SELECT * FROM test WHERE v > 1 AND v <= 3 LIMIT 6 ALLOW FILTERING", [[1, 2], [1, 3], [0, 2], [0, 3], [2, 2], [2, 3]])
@@ -5050,6 +5180,7 @@ class TestCQL(UpgradeTester):
             # A blob that is not 4 bytes should be rejected
             assert_invalid(cursor, "INSERT INTO test(k, v) VALUES (0, blobAsInt(0x01))")
 
+    @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14960")
     def test_invalid_string_literals(self):
         """
         @jira_ticket CASSANDRA-8101
@@ -5062,10 +5193,9 @@ class TestCQL(UpgradeTester):
             cursor.execute("TRUNCATE invalid_string_literals")
 
             assert_invalid(cursor, "insert into ks.invalid_string_literals (k, a) VALUES (0, '\u038E\u0394\u03B4\u03E0')")
-
             # since the protocol requires strings to be valid UTF-8, the error response to this is a ProtocolError
             try:
-                cursor.execute("insert into ks.invalid_string_literals (k, c) VALUES (0, '\xc2\x01')")
+                cursor.execute("insert into ks.invalid_string_literals (k, b) VALUES (0, '\xc2\x01')")
                 self.fail("Expected error")
             except ProtocolException as e:
                 assert "Cannot decode string as UTF8" in str(e)
@@ -5279,6 +5409,7 @@ class TestCQL(UpgradeTester):
 
             assert_none(cursor, "select * from space1.table1 where a=1 and b=1")
 
+    @pytest.mark.skip("https://issues.apache.org/jira/browse/CASSANDRA-14961")
     def test_secondary_index_query(self):
         """
         Test for fix to bug where secondary index cannot be queried due to Column Family caching changes.
@@ -5335,7 +5466,7 @@ class TestCQL(UpgradeTester):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
             assert_all(cursor, "SELECT k FROM ks.test WHERE v = 0", [[0]])
 
-    def test_tracing_prevents_startup_after_upgrading(self):
+    def test_tracing_prevents_startup_after_upgrading(self, fixture_dtest_setup):
         """
         Test that after upgrading from 2.1 to 3.0, the system_traces.sessions table is properly upgraded to include
         the client column.
@@ -5346,6 +5477,13 @@ class TestCQL(UpgradeTester):
         cursor.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
         cursor.execute("CREATE TABLE foo.bar (k int PRIMARY KEY, v int)")
 
+        #It's possible to log an error when reading trace information because the schema at node differs
+        #between versions
+        if self.is_40_or_greater():
+            fixture_dtest_setup.ignore_log_patterns = fixture_dtest_setup.ignore_log_patterns +\
+                                                      ["Unknown column coordinator_port during deserialization",
+                                                       "Unknown column source_port during deserialization"]
+
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
 
@@ -5401,6 +5539,7 @@ for spec in specs:
     assert gen_class_name not in globals()
 
     upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family
+    cls = type(gen_class_name, (TestCQL,), spec)
     if not upgrade_applies_to_env:
-        pytest.skip('test not applicable to env.')
-    globals()[gen_class_name] = type(gen_class_name, (TestCQL,), spec)
+        add_skip(cls, 'test not applicable to env.')
+    globals()[gen_class_name] = cls
diff --git a/upgrade_tests/paging_test.py b/upgrade_tests/paging_test.py
index e21ba88..11df0b9 100644
--- a/upgrade_tests/paging_test.py
+++ b/upgrade_tests/paging_test.py
@@ -9,10 +9,12 @@ from cassandra import InvalidRequest
 from cassandra.query import SimpleStatement, dict_factory, named_tuple_factory
 from ccmlib.common import LogPatternToVersion
 
-from dtest import RUN_STATIC_UPGRADE_MATRIX, run_scenarios
-from tools.assertions import assert_read_timeout_or_failure
+from dtest import RUN_STATIC_UPGRADE_MATRIX, run_scenarios, MAJOR_VERSION_4
+
+from tools.assertions import (assert_read_timeout_or_failure, assert_lists_equal_ignoring_order)
 from tools.data import rows_to_list
 from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts
+from tools.misc import add_skip
 from tools.paging import PageAssertionMixin, PageFetcher
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
@@ -121,7 +123,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin):
             assert pf.num_results_all() == [5, 4]
 
             # make sure expected and actual have same data elements (ignoring order)
-            self.assertEqualIgnoreOrder(pf.all_data(), expected_data)
+            assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value')
 
     def test_with_equal_results_to_page_size(self):
         cursor = self.prepare()
@@ -151,7 +153,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin):
             assert pf.pagecount() == 1
 
             # make sure expected and actual have same data elements (ignoring order)
-            self.assertEqualIgnoreOrder(pf.all_data(), expected_data)
+            assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value')
 
     def test_undefined_page_size_default(self):
         """
@@ -183,7 +185,7 @@ class TestPagingSize(BasePagingTester, PageAssertionMixin):
 
             self.maxDiff = None
             # make sure expected and actual have same data elements (ignoring order)
-            self.assertEqualIgnoreOrder(pf.all_data(), expected_data)
+            assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='value')
 
 
 class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin):
@@ -422,7 +424,7 @@ class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin):
             assert pf.num_results_all() == [4, 3]
 
             # make sure the allow filtering query matches the expected results (ignoring order)
-            self.assertEqualIgnoreOrder(
+            assert_lists_equal_ignoring_order(
                 pf.all_data(),
                 parse_data_into_dicts(
                     """
@@ -435,7 +437,8 @@ class TestPagingWithModifiers(BasePagingTester, PageAssertionMixin):
                     |8 |and more testing|
                     |9 |and more testing|
                     """, format_funcs={'id': int, 'value': str}
-                )
+                ),
+                sort_key='value'
             )
 
 
@@ -457,21 +460,30 @@ class TestPagingData(BasePagingTester, PageAssertionMixin):
             );
         """)
 
-        cursor.execute("""
-            CREATE TABLE test2 (
-                k int,
-                c int,
-                v text,
-                PRIMARY KEY (k, c)
-            ) WITH COMPACT STORAGE;
-        """)
+        testing_compact_storage = self.cluster.version() <= MAJOR_VERSION_4
+        if testing_compact_storage:
+            cursor.execute("""
+                CREATE TABLE test2 (
+                    k int,
+                    c int,
+                    v text,
+                    PRIMARY KEY (k, c)
+                ) WITH COMPACT STORAGE;
+            """)
+
+        version_string = self.upgrade_version_string()
+        #4.0 doesn't support compact storage
+        if version_string == 'trunk' or version_string >= MAJOR_VERSION_4:
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
 
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying %s node" % ("upgraded" if is_upgraded else "old",))
             cursor.execute("TRUNCATE test")
-            cursor.execute("TRUNCATE test2")
+            if testing_compact_storage:
+                cursor.execute("TRUNCATE test2")
 
-            for table in ("test", "test2"):
+            tables = ("test", "test2") if testing_compact_storage else ("test")
+            for table in tables:
                 logger.debug("Querying table %s" % (table,))
                 expected = []
                 # match the key ordering for murmur3
@@ -503,22 +515,31 @@ class TestPagingData(BasePagingTester, PageAssertionMixin):
             );
         """)
 
-        cursor.execute("""
-            CREATE TABLE test2 (
-                k int,
-                c1 int,
-                c2 int,
-                v text,
-                PRIMARY KEY (k, c1, c2)
-            ) WITH COMPACT STORAGE;
-        """)
+        testing_compact_storage = self.cluster.version() <= MAJOR_VERSION_4
+        if testing_compact_storage:
+            cursor.execute("""
+                CREATE TABLE test2 (
+                    k int,
+                    c1 int,
+                    c2 int,
+                    v text,
+                    PRIMARY KEY (k, c1, c2)
+                ) WITH COMPACT STORAGE;
+            """)
+
+        version_string = self.upgrade_version_string()
+        #4.0 doesn't support compact storage
+        if version_string == 'trunk' or version_string >= MAJOR_VERSION_4:
+            cursor.execute("ALTER TABLE test2 DROP COMPACT STORAGE;")
 
         for is_upgraded, cursor in self.do_upgrade(cursor):
             logger.debug("Querying %s node" % ("upgraded" if is_upgraded else "old",))
             cursor.execute("TRUNCATE test")
-            cursor.execute("TRUNCATE test2")
+            if testing_compact_storage:
+              cursor.execute("TRUNCATE test2")
 
-            for table in ("test", "test2"):
+            tables = ("test", "test2") if testing_compact_storage else ("test")
+            for table in tables:
                 logger.debug("Querying table %s" % (table,))
                 expected = []
                 # match the key ordering for murmur3
@@ -566,7 +587,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin):
             all_results = pf.all_data()
             assert len(expected_data) == len(all_results)
             self.maxDiff = None
-            self.assertEqualIgnoreOrder(expected_data, all_results)
+            assert_lists_equal_ignoring_order(expected_data, all_results, sort_key='value')
 
     def test_paging_across_multi_wide_rows(self):
         cursor = self.prepare()
@@ -635,7 +656,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin):
 
             assert pf.pagecount() == 2
             assert pf.num_results_all() == [400, 200]
-            self.assertEqualIgnoreOrder(expected_data, pf.all_data())
+            assert_lists_equal_ignoring_order(expected_data, pf.all_data(), sort_key='sometext')
 
     @since('2.0.6')
     def test_static_columns_paging(self):
@@ -902,7 +923,7 @@ class TestPagingData(BasePagingTester, PageAssertionMixin):
 
             assert pf.pagecount() == 2
             assert pf.num_results_all() == [400, 200]
-            self.assertEqualIgnoreOrder(expected_data, pf.all_data())
+            assert_lists_equal_ignoring_order(expected_data, pf.all_data(), sort_key='sometext')
 
 
 class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
@@ -944,7 +965,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
             assert pf.pagecount() == 2
             assert pf.num_results_all(), [501 == 499]
 
-            self.assertEqualIgnoreOrder(pf.all_data(), expected_data)
+            assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='mytext')
 
     def test_data_change_impacting_later_page(self):
         cursor = self.prepare()
@@ -981,7 +1002,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
 
             # add the new row to the expected data and then do a compare
             expected_data.append({'id': 2, 'mytext': 'foo'})
-            self.assertEqualIgnoreOrder(pf.all_data(), expected_data)
+            assert_lists_equal_ignoring_order(pf.all_data(), expected_data, sort_key='mytext')
 
     def test_row_TTL_expiry_during_paging(self):
         cursor = self.prepare()
@@ -1064,7 +1085,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
 
             # no need to request page here, because the first page is automatically retrieved
             page1 = pf.page_data(1)
-            self.assertEqualIgnoreOrder(page1, data[:500])
+            assert_lists_equal_ignoring_order(page1, data[:500], sort_key="mytext")
 
             # set some TTLs for data on page 3
             for row in data[1000:1500]:
@@ -1080,7 +1101,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
             # check page two
             pf.request_one()
             page2 = pf.page_data(2)
-            self.assertEqualIgnoreOrder(page2, data[500:1000])
+            assert_lists_equal_ignoring_order(page2, data[500:1000], sort_key="mytext")
 
             page3expected = []
             for row in data[1000:1500]:
@@ -1093,7 +1114,7 @@ class TestPagingDatasetChanges(BasePagingTester, PageAssertionMixin):
 
             pf.request_one()
             page3 = pf.page_data(3)
-            self.assertEqualIgnoreOrder(page3, page3expected)
+            assert_lists_equal_ignoring_order(page3, page3expected, sort_key="mytext")
 
 
 class TestPagingQueryIsolation(BasePagingTester, PageAssertionMixin):
@@ -1476,13 +1497,11 @@ class TestPagingWithDeletions(BasePagingTester, PageAssertionMixin):
     def test_failure_threshold_deletions(self):
         """Test that paging throws a failure in case of tombstone threshold """
         self.fixture_dtest_setup.allow_log_errors = True
-        self.cluster.set_configuration_options(
-            values={'tombstone_failure_threshold': 500,
-                    'read_request_timeout_in_ms': 1000,
-                    'request_timeout_in_ms': 1000,
-                    'range_request_timeout_in_ms': 1000}
-        )
-        cursor = self.prepare()
+        cursor = self.prepare(
+            extra_config_options={'tombstone_failure_threshold': 500,
+                                  'read_request_timeout_in_ms': 1000,
+                                  'request_timeout_in_ms': 1000,
+                                  'range_request_timeout_in_ms': 1000})
         nodes = self.cluster.nodelist()
         self.setup_schema(cursor)
 
@@ -1530,6 +1549,7 @@ for klaus in BasePagingTester.__subclasses__():
         assert gen_class_name not in globals()
 
         upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family
+        cls = type(gen_class_name, (klaus,), spec)
         if not upgrade_applies_to_env:
-            pytest.mark.skip(reason='test not applicable to env.')
-        globals()[gen_class_name] = type(gen_class_name, (klaus,), spec)
+            add_skip(cls, 'test not applicable to env.')
+        globals()[gen_class_name] = cls
diff --git a/upgrade_tests/regression_test.py b/upgrade_tests/regression_test.py
index 2e16645..e36e19a 100644
--- a/upgrade_tests/regression_test.py
+++ b/upgrade_tests/regression_test.py
@@ -12,6 +12,7 @@ from cassandra import ConsistencyLevel as CL
 
 from dtest import RUN_STATIC_UPGRADE_MATRIX
 from tools.jmxutils import (JolokiaAgent, make_mbean)
+from tools.misc import add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
 
@@ -145,8 +146,8 @@ class TestForRegressions(UpgradeTester):
             logger.debug(response)
             schemas = response.split('Schema versions:')[1].strip()
             num_schemas = len(re.findall(r'\[.*?\]', schemas))
-            self.assertEqual(num_schemas, 1, "There were multiple schema versions during an upgrade: {}"
-                             .format(schemas))
+            assert num_schemas == 1, "There were multiple schema versions during an upgrade: {}" \
+                             .format(schemas)
 
         for node in self.cluster.nodelist():
             validate_schema_agreement(node, False)
@@ -180,6 +181,7 @@ for path in build_upgrade_pairs():
             '__test__': True}
 
     upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or path.upgrade_meta.matches_current_env_version_family
+    cls = type(gen_class_name, (TestForRegressions,), spec)
     if not upgrade_applies_to_env:
-        pytest.mark.skip(reason='test not applicable to env.')
-    globals()[gen_class_name] = type(gen_class_name, (TestForRegressions,), spec)
+        add_skip(cls, 'test not applicable to env.')
+    globals()[gen_class_name] = cls
diff --git a/upgrade_tests/repair_test.py b/upgrade_tests/repair_test.py
index 9ac45a9..36eb5cc 100644
--- a/upgrade_tests/repair_test.py
+++ b/upgrade_tests/repair_test.py
@@ -15,10 +15,10 @@ LEGACY_SSTABLES_JVM_ARGS = ["-Dcassandra.streamdes.initial_mem_buffer_size=1",
 # We don't support directly upgrading from 2.2 to 4.0 so disabling this on 4.0.
 # TODO: we should probably not hardcode versions?
 @pytest.mark.upgrade_test
-@since('3.0', max_version='4')
+@since('3.0', max_version='3.99')
 class TestUpgradeRepair(BaseRepairTest):
 
-    @since('3.0')
+    @since('3.0', max_version='3.99')
     def test_repair_after_upgrade(self):
         """
         @jira_ticket CASSANDRA-10990
diff --git a/upgrade_tests/storage_engine_upgrade_test.py b/upgrade_tests/storage_engine_upgrade_test.py
index 4cc718c..c1fc228 100644
--- a/upgrade_tests/storage_engine_upgrade_test.py
+++ b/upgrade_tests/storage_engine_upgrade_test.py
@@ -3,7 +3,7 @@ import time
 import pytest
 import logging
 
-from dtest import Tester
+from dtest import Tester, MAJOR_VERSION_4
 from sstable_generation_loading_test import TestBaseSStableLoader
 from thrift_bindings.thrift010.Cassandra import (ConsistencyLevel, Deletion,
                                            Mutation, SlicePredicate,
@@ -25,13 +25,11 @@ LEGACY_SSTABLES_JVM_ARGS = ["-Dcassandra.streamdes.initial_mem_buffer_size=1",
 @since('3.0')
 class TestStorageEngineUpgrade(Tester):
 
-    def setUp(self, bootstrap=False, jvm_args=None):
-        super(TestStorageEngineUpgrade, self).setUp()
-        self.default_install_dir = self.cluster.get_install_dir()
-        self.bootstrap = bootstrap
-        if jvm_args is None:
-            jvm_args = []
-        self.jvm_args = jvm_args
+    @pytest.fixture(autouse=True)
+    def storage_engine_upgrade_setup(self, fixture_dtest_setup):
+        self.fixture_dtest_setup.default_install_dir = fixture_dtest_setup.cluster.get_install_dir()
+        self.fixture_dtest_setup.bootstrap = False
+        return ()
 
     def _setup_cluster(self, create_keyspace=True, cluster_options=None):
         cluster = self.cluster
@@ -40,10 +38,11 @@ class TestStorageEngineUpgrade(Tester):
             cluster.set_configuration_options(cluster_options)
 
         # Forcing cluster version on purpose
-        if self.dtest_config.cassandra_version_from_build >= '4':
+        if self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4:
             cluster.set_install_dir(version="git:cassandra-3.0")
         else:
             cluster.set_install_dir(version="git:cassandra-2.1")
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         cluster.populate(1).start()
 
         node1 = cluster.nodelist()[0]
@@ -62,14 +61,14 @@ class TestStorageEngineUpgrade(Tester):
         time.sleep(.5)
         node1.stop(wait_other_notice=True)
 
-        node1.set_install_dir(install_dir=self.default_install_dir)
+        node1.set_install_dir(install_dir=self.fixture_dtest_setup.default_install_dir)
         node1.start(wait_other_notice=True, wait_for_binary_proto=True)
 
-        if self.bootstrap:
-            cluster.set_install_dir(install_dir=self.default_install_dir)
+        if self.fixture_dtest_setup.bootstrap:
+            cluster.set_install_dir(install_dir=self.fixture_dtest_setup.default_install_dir)
             # Add a new node, bootstrap=True ensures that it is not a seed
             node2 = new_node(cluster, bootstrap=True)
-            node2.start(wait_for_binary_proto=True, jvm_args=self.jvm_args)
+            node2.start(wait_for_binary_proto=True, jvm_args=self.fixture_dtest_setup.jvm_args)
 
             temp_files = self.glob_data_dirs(os.path.join('*', "tmp", "*.dat"))
             logger.debug("temp files: " + str(temp_files))
@@ -140,6 +139,10 @@ class TestStorageEngineUpgrade(Tester):
             for r in range(ROWS):
                 session.execute("INSERT INTO t(k, t, v) VALUES ({n}, {r}, {r})".format(n=n, r=r))
 
+        #4.0 doesn't support compact storage
+        if compact_storage and self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4:
+            session.execute("ALTER TABLE t DROP COMPACT STORAGE;")
+
         session = self._do_upgrade()
 
         for n in range(PARTITIONS):
@@ -151,8 +154,8 @@ class TestStorageEngineUpgrade(Tester):
                        [[n, v, v] for v in range(ROWS - 1, -1, -1)])
 
             # Querying a "large" slice
-            start = ROWS / 10
-            end = ROWS - 1 - (ROWS / 10)
+            start = ROWS // 10
+            end = ROWS - 1 - (ROWS // 10)
             assert_all(session,
                        "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end),
                        [[n, v, v] for v in range(start, end)])
@@ -161,8 +164,8 @@ class TestStorageEngineUpgrade(Tester):
                        [[n, v, v] for v in range(end - 1, start - 1, -1)])
 
             # Querying a "small" slice
-            start = ROWS / 2
-            end = ROWS / 2 + 5
+            start = ROWS // 2
+            end = ROWS // 2 + 5
             assert_all(session,
                        "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end),
                        [[n, v, v] for v in range(start, end)])
@@ -179,8 +182,8 @@ class TestStorageEngineUpgrade(Tester):
                        [[n, v, v] for v in range(ROWS - 1, -1, -1)])
 
             # Querying a "large" slice
-            start = ROWS / 10
-            end = ROWS - 1 - (ROWS / 10)
+            start = ROWS // 10
+            end = ROWS - 1 - (ROWS // 10)
             assert_all(session,
                        "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end),
                        [[n, v, v] for v in range(start, end)])
@@ -189,8 +192,8 @@ class TestStorageEngineUpgrade(Tester):
                        [[n, v, v] for v in range(end - 1, start - 1, -1)])
 
             # Querying a "small" slice
-            start = ROWS / 2
-            end = ROWS / 2 + 5
+            start = ROWS // 2
+            end = ROWS // 2 + 5
             assert_all(session,
                        "SELECT * FROM t WHERE k = {n} AND t >= {start} AND t < {end}".format(n=n, start=start, end=end),
                        [[n, v, v] for v in range(start, end)])
@@ -209,15 +212,25 @@ class TestStorageEngineUpgrade(Tester):
         for n in range(PARTITIONS):
             session.execute("INSERT INTO t(k, v1, v2, v3, v4) VALUES ({}, {}, {}, {}, {})".format(n, n + 1, n + 2, n + 3, n + 4))
 
+        is40 = self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4
+        if compact_storage and is40:
+            session.execute("ALTER TABLE t DROP COMPACT STORAGE;")
+
         session = self._do_upgrade()
 
+        def maybe_add_compact_columns(expected):
+            if is40 and compact_storage:
+                expected.insert(1, None)
+                expected.append(None)
+            return expected
+
         for n in range(PARTITIONS):
-            assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), [n, n + 1, n + 2, n + 3, n + 4])
+            assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), maybe_add_compact_columns([n, n + 1, n + 2, n + 3, n + 4]))
 
         self.cluster.compact()
 
         for n in range(PARTITIONS):
-            assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), [n, n + 1, n + 2, n + 3, n + 4])
+            assert_one(session, "SELECT * FROM t WHERE k = {}".format(n), maybe_add_compact_columns([n, n + 1, n + 2, n + 3, n + 4]))
 
     def test_upgrade_with_statics(self):
         self.upgrade_with_statics(rows=10)
@@ -402,7 +415,7 @@ class TestStorageEngineUpgrade(Tester):
 
         assert_one(session, "SELECT k FROM t", ['some_key'])
 
-    @since('3.0', max_version='4')
+    @since('3.0', max_version='3.99')
     def test_upgrade_with_range_tombstone_eoc_0(self):
         """
         Check sstable upgrading when the sstable contains a range tombstone with EOC=0.
@@ -457,35 +470,68 @@ class TestStorageEngineUpgrade(Tester):
 @since('3.0')
 class TestBootstrapAfterUpgrade(TestStorageEngineUpgrade):
 
-    def setUp(self):
-        super(TestBootstrapAfterUpgrade, self).setUp(bootstrap=True, jvm_args=LEGACY_SSTABLES_JVM_ARGS)
-
+    @pytest.fixture(autouse=True)
+    def set_up(self, storage_engine_upgrade_setup):
+        self.fixture_dtest_setup.bootstrap=True
+        self.fixture_dtest_setup.jvm_args=LEGACY_SSTABLES_JVM_ARGS
 
 @pytest.mark.upgrade_test
-@since('3.0', max_version='4')
+@since('3.0', max_version='3.99')
 class TestLoadKaSStables(TestBaseSStableLoader):
-    upgrade_from = '2.1.6'
+    upgrade_test = True
+    upgrade_from = '2.1.20'
     jvm_args = LEGACY_SSTABLES_JVM_ARGS
 
 
 @pytest.mark.upgrade_test
-@since('3.0', max_version='4')
+@since('3.0', max_version='3.99')
 class TestLoadKaCompactSStables(TestBaseSStableLoader):
-    upgrade_from = '2.1.6'
+    upgrade_test = True
+    upgrade_from = '2.1.20'
     jvm_args = LEGACY_SSTABLES_JVM_ARGS
-    compact = True
+    test_compact = True
 
 
 @pytest.mark.upgrade_test
-@since('3.0', max_version='4')
+@since('3.0', max_version='3.99')
 class TestLoadLaSStables(TestBaseSStableLoader):
-    upgrade_from = '2.2.4'
+    upgrade_test = True
+    upgrade_from = '2.2.13'
     jvm_args = LEGACY_SSTABLES_JVM_ARGS
 
 
 @pytest.mark.upgrade_test
-@since('3.0', max_version='4')
+@since('3.0', max_version='3.99')
 class TestLoadLaCompactSStables(TestBaseSStableLoader):
-    upgrade_from = '2.2.4'
+    upgrade_test = True
+    upgrade_from = '2.2.13'
     jvm_args = LEGACY_SSTABLES_JVM_ARGS
-    compact = True
+    test_compact = True
+
+
+@pytest.mark.upgrade_test
+@since('4.0', max_version='4.99')
+class TestLoadMdSStables(TestBaseSStableLoader):
+    upgrade_from = '3.0.17'
+
+
+@pytest.mark.upgrade_test
+@pytest.mark.skip("4.0 sstableloader can't handle formerly compact tables even after drop compact storage, rebuild, cleanup")
+@since('4.0', max_version='4.99')
+class TestLoadMdCompactSStables(TestBaseSStableLoader):
+    upgrade_from = '3.0.17'
+    test_compact = True
+
+
+@pytest.mark.upgrade_test
+@since('4.0', max_version='4.99')
+class TestLoadMdThreeOneOneSStables(TestBaseSStableLoader):
+    upgrade_from = '3.11.3'
+
+
+@pytest.mark.upgrade_test
+@pytest.mark.skip("4.0 sstableloader can't handle formerly compact tables even after drop compact storage, rebuild, cleanup")
+@since('4.0', max_version='4.99')
+class TestLoadMdThreeOneOneCompactSStables(TestBaseSStableLoader):
+    upgrade_from = '3.11.3'
+    test_compact = True
diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py
index 42343a2..b427660 100644
--- a/upgrade_tests/thrift_upgrade_test.py
+++ b/upgrade_tests/thrift_upgrade_test.py
@@ -10,7 +10,8 @@ from thrift_bindings.thrift010.Cassandra import (Column, ColumnDef,
                                            ColumnParent, ConsistencyLevel,
                                            SlicePredicate, SliceRange)
 from thrift_test import _i64, get_thrift_client
-from tools.assertions import assert_length_equal
+from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal
+from tools.misc import wait_for_agreement, add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
 
@@ -18,91 +19,173 @@ since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
 
-def _create_dense_super_cf(name):
-    return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_dense_super_cf(thrift, name):
+    cfdef = Cassandra.CfDef('ks', name, column_type='Super',
                            key_validation_class='AsciiType',        # pk
                            comparator_type='AsciiType',             # ck
                            default_validation_class='AsciiType',    # SC value
                            subcomparator_type='LongType')           # SC key
+    thrift.system_add_column_family(cfdef)
+    wait_for_agreement(thrift)
 
 
-def _create_sparse_super_cf(name):
-    cd1 = ColumnDef('col1', 'LongType', None, None)
-    cd2 = ColumnDef('col2', 'LongType', None, None)
-    return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_sparse_super_cf(thrift, name):
+    cd1 = ColumnDef('col1'.encode(), 'LongType', None, None)
+    cd2 = ColumnDef('col2'.encode(), 'LongType', None, None)
+    cfdef = Cassandra.CfDef('ks', name, column_type='Super',
                            column_metadata=[cd1, cd2],
                            key_validation_class='AsciiType',
                            comparator_type='AsciiType',
                            subcomparator_type='AsciiType')
+    thrift.system_add_column_family(cfdef)
+    wait_for_agreement(thrift)
 
 
-def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key'):
+def unpack(lst):
+    result_list = []
+    for item_dict in lst:
+        normalized_dict = {}
+        for key, value in item_dict.items():
+            if hasattr(value, "items"):
+                assert(key == '')
+                for a, b in value.items():
+                    normalized_dict[a] = b
+            else:
+                normalized_dict[key] = value
+        result_list.append(normalized_dict)
+    return result_list
+
+
+def add_value(list):
+    """Helper for _validate_sparse_cql to modify expected results based"""
+    for item in list:
+        key = item.get('key', None)
+        if key is None:
+            key = item.get('renamed_key')
+
+        value_key = 'value1' if key == 'k1' else 'value2'
+        item[value_key]=_i64(100)
+
+
+def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key', is_version_4_or_greater=False):
     cursor.execute('use ks')
 
-    assert (list(cursor.execute("SELECT * FROM {}".format(cf))) ==
-                 [{key: 'k1', column1: 'key1', col1: 200, col2: 300},
-                  {key: 'k1', column1: 'key2', col1: 200, col2: 300},
-                  {key: 'k2', column1: 'key1', col1: 200, col2: 300},
-                  {key: 'k2', column1: 'key2', col1: 200, col2: 300}])
+    result = unpack(list(cursor.execute("SELECT * FROM {}".format(cf))))
+
+    expected = [{key: 'k1', column1: 'key1', col1: 200, col2: 300},
+     {key: 'k1', column1: 'key2', col1: 200, col2: 300},
+     {key: 'k2', column1: 'key1', col1: 200, col2: 300},
+     {key: 'k2', column1: 'key2', col1: 200, col2: 300}]
+    if is_version_4_or_greater:
+        add_value(expected)
+    assert_lists_of_dicts_equal(result, expected)
 
-    assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) ==
-                 [{key: 'k1', column1: 'key1', col1: 200, col2: 300},
-                  {key: 'k1', column1: 'key2', col1: 200, col2: 300}])
+    result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))))
+    expected =  [{key: 'k1', column1: 'key1', col1: 200, col2: 300},
+                  {key: 'k1', column1: 'key2', col1: 200, col2: 300}]
+    if is_version_4_or_greater:
+        add_value(expected)
+    assert_lists_of_dicts_equal(result, expected)
 
-    assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1))) ==
-                 [{key: 'k2', column1: 'key1', col1: 200, col2: 300}])
+    result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k2' AND {} = 'key1'".format(cf, key, column1))))
+    expected = [{key: 'k2', column1: 'key1', col1: 200, col2: 300}]
+    if is_version_4_or_greater:
+        add_value(expected)
+    assert_lists_of_dicts_equal(result, expected)
 
 
 def _validate_sparse_thrift(client, cf='sparse_super_1'):
-    client.transport.open()
+    try:
+        client.transport.open()
+    except:
+        pass
     client.set_keyspace('ks')
-    result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE)
+    result = client.get_slice('k1'.encode(), ColumnParent(cf), SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 5)), ConsistencyLevel.ONE)
     assert_length_equal(result, 2)
-    assert result[0].super_column.name == 'key1'
-    assert result[1].super_column.name == 'key2'
+    assert result[0].super_column.name == 'key1'.encode()
+    assert result[1].super_column.name == 'key2'.encode()
 
     for cosc in result:
-        assert cosc.super_column.columns[0].name == 'col1'
+        assert cosc.super_column.columns[0].name == 'col1'.encode()
         assert cosc.super_column.columns[0].value == _i64(200)
-        assert cosc.super_column.columns[1].name == 'col2'
+        assert cosc.super_column.columns[1].name == 'col2'.encode()
         assert cosc.super_column.columns[1].value == _i64(300)
-        assert cosc.super_column.columns[2].name == 'value1'
+        assert cosc.super_column.columns[2].name == 'value1'.encode()
         assert cosc.super_column.columns[2].value == _i64(100)
 
 
-def _validate_dense_cql(cursor, cf='dense_super_1', key='key', column1='column1', column2='column2', value='value'):
+def _validate_dense_cql(cursor, cf='dense_super_1', key='key', column1='column1', column2='column2', value='value', is_version_4_or_greater=False):
     cursor.execute('use ks')
 
-    assert (list(cursor.execute("SELECT * FROM {}".format(cf))) ==
-                 [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'},
+    expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'},
                   {key: 'k1', column1: 'key2', column2: 100, value: 'value1'},
                   {key: 'k2', column1: 'key1', column2: 200, value: 'value2'},
-                  {key: 'k2', column1: 'key2', column2: 200, value: 'value2'}])
-
-    assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))) ==
-                 [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'},
-                  {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}])
-
-    assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1))) ==
-                 [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}])
-
-    assert (list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2))) ==
-                 [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}])
+                  {key: 'k2', column1: 'key2', column2: 200, value: 'value2'}]
+    if is_version_4_or_greater:
+        expected[0][100]='value1'
+        expected[1][100]='value1'
+        expected[2][200]='value2'
+        expected[3][200]='value2'
+        for dict in expected:
+            del dict[value]
+        for dict in expected:
+            del dict[column2]
+    result = unpack(list(cursor.execute("SELECT * FROM {}".format(cf))))
+    assert_lists_of_dicts_equal(result, expected)
+
+    result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1'".format(cf, key))))
+    expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'},
+                  {key: 'k1', column1: 'key2', column2: 100, value: 'value1'}]
+    if is_version_4_or_greater:
+        expected[0][100]='value1'
+        expected[1][100]='value1'
+        for dict in expected:
+            del dict[value]
+        for dict in expected:
+            del dict[column2]
+    assert_lists_of_dicts_equal(result, expected)
+
+    result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1'".format(cf, key, column1))))
+    expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]
+    if is_version_4_or_greater:
+        expected[0][100]='value1'
+        for dict in expected:
+            del dict[value]
+        for dict in expected:
+            del dict[column2]
+    assert_lists_of_dicts_equal(result, expected)
+
+    if is_version_4_or_greater:
+        result = unpack(list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND \"\" CONTAINS KEY 100 ALLOW FILTERING".format(cf, key, column1, column2))))
+    else:
+        result = list(cursor.execute("SELECT * FROM {} WHERE {} = 'k1' AND {} = 'key1' AND {} = 100".format(cf, key, column1, column2)))
+
+    expected = [{key: 'k1', column1: 'key1', column2: 100, value: 'value1'}]
+    if is_version_4_or_greater:
+        expected[0][100]='value1'
+        for dict in expected:
+            del dict[value]
+        for dict in expected:
+            del dict[column2]
+    assert_lists_of_dicts_equal(result, expected)
 
 
 def _validate_dense_thrift(client, cf='dense_super_1'):
-    client.transport.open()
+    try:
+        client.transport.open()
+    except:
+        pass
     client.set_keyspace('ks')
-    result = client.get_slice('k1', ColumnParent(cf), SlicePredicate(slice_range=SliceRange('', '', False, 5)), ConsistencyLevel.ONE)
+    result = client.get_slice('k1'.encode(), ColumnParent(cf), SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 5)), ConsistencyLevel.ONE)
     assert_length_equal(result, 2)
-    assert result[0].super_column.name == 'key1'
-    assert result[1].super_column.name == 'key2'
+    assert result[0].super_column.name == 'key1'.encode()
+    assert result[1].super_column.name == 'key2'.encode()
 
     print((result[0]))
     print((result[1]))
     for cosc in result:
         assert cosc.super_column.columns[0].name == _i64(100)
-        assert cosc.super_column.columns[0].value == 'value1'
+        assert cosc.super_column.columns[0].value == 'value1'.encode()
 
 
 @pytest.mark.upgrade_test
@@ -124,6 +207,9 @@ class TestUpgradeSuperColumnsThrough(Tester):
             node.set_configuration_options(values={'start_rpc': 'true'})
             logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
         self.cluster.set_install_dir(version=tag)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
+        for node in nodes:
+            node.set_configuration_options(values={'start_rpc': 'true'})
 
         # Restart nodes on new version
         for node in nodes:
@@ -138,6 +224,7 @@ class TestUpgradeSuperColumnsThrough(Tester):
 
         # Forcing cluster version on purpose
         cluster.set_install_dir(version=cassandra_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
         cluster.populate(num_nodes)
         for node in self.cluster.nodelist():
@@ -159,24 +246,30 @@ class TestUpgradeSuperColumnsThrough(Tester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        client.system_add_column_family(_create_dense_super_cf('dense_super_1'))
+        _create_dense_super_cf(client, 'dense_super_1')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE)
 
         _validate_dense_thrift(client, cf='dense_super_1')
 
-        node.stop()
         self.set_node_to_current_version(node)
-        node.set_configuration_options(values={'start_rpc': 'true'})
+        #4.0 doesn't support compact storage
+        if node.get_cassandra_version() >= '4':
+            cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;")
+
+        node.stop()
+        if node.get_cassandra_version() < '4':
+           node.set_configuration_options(values={'start_rpc': 'true'})
         node.start()
 
         cursor = self.patient_cql_connection(node, row_factory=dict_factory)
-        client = get_thrift_client(host, port)
 
-        _validate_dense_thrift(client, cf='dense_super_1')
-        _validate_dense_cql(cursor, cf='dense_super_1')
+        if node.get_cassandra_version() < '4':
+            client = get_thrift_client(host, port)
+            _validate_dense_thrift(client, cf='dense_super_1')
+        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4')
 
     def test_dense_supercolumn(self):
         cluster = self.prepare()
@@ -192,11 +285,11 @@ class TestUpgradeSuperColumnsThrough(Tester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        client.system_add_column_family(_create_dense_super_cf('dense_super_1'))
+        _create_dense_super_cf(client, 'dense_super_1')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE)
 
         _validate_dense_thrift(client, cf='dense_super_1')
         _validate_dense_cql(cursor, cf='dense_super_1')
@@ -208,16 +301,22 @@ class TestUpgradeSuperColumnsThrough(Tester):
 
         _validate_dense_thrift(client, cf='dense_super_1')
 
-        node.stop()
         self.set_node_to_current_version(node)
-        node.set_configuration_options(values={'start_rpc': 'true'})
+        #4.0 doesn't support compact storage
+        if node.get_cassandra_version() >= '4':
+            cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;")
+
+        node.stop()
+        if node.get_cassandra_version() < '4':
+            node.set_configuration_options(values={'start_rpc': 'true'})
         node.start()
 
-        cursor = self.patient_cql_connection(node, row_factory=dict_factory)
-        client = get_thrift_client(host, port)
+        if node.get_cassandra_version() < '4':
+            client = get_thrift_client(host, port)
+            _validate_dense_thrift(client, cf='dense_super_1')
 
-        _validate_dense_thrift(client, cf='dense_super_1')
-        _validate_dense_cql(cursor, cf='dense_super_1')
+        cursor = self.patient_cql_connection(node, row_factory=dict_factory)
+        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4')
 
     def test_sparse_supercolumn(self):
         cluster = self.prepare()
@@ -233,17 +332,16 @@ class TestUpgradeSuperColumnsThrough(Tester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        cf = _create_sparse_super_cf('sparse_super_2')
-        client.system_add_column_family(cf)
+        _create_sparse_super_cf(client, 'sparse_super_2')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
         _validate_sparse_thrift(client, cf='sparse_super_2')
         _validate_sparse_cql(cursor, cf='sparse_super_2')
@@ -255,20 +353,27 @@ class TestUpgradeSuperColumnsThrough(Tester):
 
         _validate_sparse_thrift(client, cf='sparse_super_2')
 
-        node.stop()
         self.set_node_to_current_version(node)
-        node.set_configuration_options(values={'start_rpc': 'true'})
+        is_version_4_or_greater = node.get_cassandra_version() >= '4'
+        #4.0 doesn't support compact storage
+        if is_version_4_or_greater:
+            cursor.execute("ALTER TABLE ks.sparse_super_2 DROP COMPACT STORAGE;")
+
+        node.stop()
+        if not is_version_4_or_greater:
+            node.set_configuration_options(values={'start_rpc': 'true'})
         node.start()
 
-        cursor = self.patient_cql_connection(node, row_factory=dict_factory)
-        client = get_thrift_client(host, port)
+        if not is_version_4_or_greater:
+            client = get_thrift_client(host, port)
+            _validate_sparse_thrift(client, cf='sparse_super_2')
 
-        _validate_sparse_thrift(client, cf='sparse_super_2')
-        _validate_sparse_cql(cursor, cf='sparse_super_2')
+        cursor = self.patient_cql_connection(node, row_factory=dict_factory)
+        _validate_sparse_cql(cursor, cf='sparse_super_2', is_version_4_or_greater=is_version_4_or_greater)
 
 
 @pytest.mark.upgrade_test
-@since('2.1', max_version='4.0.0')
+@since('2.1', max_version='3.99')
 class TestThrift(UpgradeTester):
     """
     Verify dense and sparse supercolumn functionality with and without renamed columns
@@ -289,20 +394,27 @@ class TestThrift(UpgradeTester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        client.system_add_column_family(_create_dense_super_cf('dense_super_1'))
+        _create_dense_super_cf(client, 'dense_super_1')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('dense_super_1', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('dense_super_1', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE)
 
         _validate_dense_cql(cursor)
         _validate_dense_thrift(client)
 
+        version_string = self.upgrade_version_string()
+        is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0'
+        #4.0 doesn't support compact storage
+        if is_version_4_or_greater:
+            cursor.execute("ALTER TABLE ks.dense_super_1 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
-            client = get_thrift_client(host, port)
-            _validate_dense_cql(cursor)
-            _validate_dense_thrift(client)
+            if not is_version_4_or_greater:
+                client = get_thrift_client(host, port)
+                _validate_dense_thrift(client)
+            _validate_dense_cql(cursor, is_version_4_or_greater=is_version_4_or_greater)
 
     def test_dense_supercolumn_with_renames(self):
         cursor = self.prepare(row_factory=dict_factory)
@@ -317,11 +429,11 @@ class TestThrift(UpgradeTester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        client.system_add_column_family(_create_dense_super_cf('dense_super_2'))
+        _create_dense_super_cf(client, 'dense_super_2')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(100), 'value1', 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('dense_super_2', 'key{}'.format(i)), Column(_i64(200), 'value2', 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('dense_super_2', 'key{}'.format(i).encode()), Column(_i64(100), 'value1'.encode(), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('dense_super_2', 'key{}'.format(i).encode()), Column(_i64(200), 'value2'.encode(), 0), ConsistencyLevel.ONE)
 
         cursor.execute("ALTER TABLE ks.dense_super_2 RENAME key TO renamed_key")
         cursor.execute("ALTER TABLE ks.dense_super_2 RENAME column1 TO renamed_column1")
@@ -331,11 +443,18 @@ class TestThrift(UpgradeTester):
         _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value')
         _validate_dense_thrift(client, cf='dense_super_2')
 
+        version_string = self.upgrade_version_string()
+        is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0'
+        #4.0 doesn't support compact storage
+        if is_version_4_or_greater:
+            cursor.execute("ALTER TABLE ks.dense_super_2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
-            client = get_thrift_client(host, port)
-            _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value')
-            _validate_dense_thrift(client, cf='dense_super_2')
+            if not is_version_4_or_greater:
+                client = get_thrift_client(host, port)
+                _validate_dense_thrift(client, cf='dense_super_2')
+            _validate_dense_cql(cursor, cf='dense_super_2', key='renamed_key', column1='renamed_column1', column2='renamed_column2', value='renamed_value', is_version_4_or_greater=is_version_4_or_greater)
 
     def test_sparse_supercolumn_with_renames(self):
         cursor = self.prepare(row_factory=dict_factory)
@@ -350,29 +469,35 @@ class TestThrift(UpgradeTester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        cf = _create_sparse_super_cf('sparse_super_1')
-        client.system_add_column_family(cf)
+        _create_sparse_super_cf(client, 'sparse_super_1')
 
         cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME key TO renamed_key")
         cursor.execute("ALTER TABLE ks.sparse_super_1 RENAME column1 TO renamed_column1")
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
-            client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_1', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_1', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
         _validate_sparse_thrift(client)
         _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key')
 
+        version_string = self.upgrade_version_string()
+        is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0'
+        #4.0 doesn't support compact storage
+        if is_version_4_or_greater:
+            cursor.execute("ALTER TABLE ks.sparse_super_1 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
-            client = get_thrift_client(host, port)
-            _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key')
-            _validate_sparse_thrift(client)
+            if not is_version_4_or_greater:
+                client = get_thrift_client(host, port)
+                _validate_sparse_thrift(client)
+            _validate_sparse_cql(cursor, column1='renamed_column1', key='renamed_key', is_version_4_or_greater=is_version_4_or_greater)
 
     def test_sparse_supercolumn(self):
         cursor = self.prepare(row_factory=dict_factory)
@@ -387,26 +512,32 @@ class TestThrift(UpgradeTester):
         client.transport.open()
         client.set_keyspace('ks')
 
-        cf = _create_sparse_super_cf('sparse_super_2')
-        client.system_add_column_family(cf)
+        _create_sparse_super_cf(client, 'sparse_super_2')
 
         for i in range(1, 3):
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value1", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k1', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value1".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k1'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("value2", _i64(100), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col1", _i64(200), 0), ConsistencyLevel.ONE)
-            client.insert('k2', ColumnParent('sparse_super_2', 'key{}'.format(i)), Column("col2", _i64(300), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("value2".encode(), _i64(100), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col1".encode(), _i64(200), 0), ConsistencyLevel.ONE)
+            client.insert('k2'.encode(), ColumnParent('sparse_super_2', 'key{}'.format(i).encode()), Column("col2".encode(), _i64(300), 0), ConsistencyLevel.ONE)
 
         _validate_sparse_thrift(client, cf='sparse_super_2')
         _validate_sparse_cql(cursor, cf='sparse_super_2')
 
+        version_string = self.upgrade_version_string()
+        is_version_4_or_greater = version_string == 'trunk' or version_string >= '4.0'
+        #4.0 doesn't support compact storage
+        if is_version_4_or_greater:
+            cursor.execute("ALTER TABLE ks.sparse_super_2 DROP COMPACT STORAGE;")
+
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
-            client = get_thrift_client(host, port)
-            _validate_sparse_thrift(client, cf='sparse_super_2')
-            _validate_sparse_cql(cursor, cf='sparse_super_2')
+            if not is_version_4_or_greater:
+                client = get_thrift_client(host, port)
+                _validate_sparse_thrift(client, cf='sparse_super_2')
+            _validate_sparse_cql(cursor, cf='sparse_super_2', is_version_4_or_greater=is_version_4_or_greater)
 
 
 topology_specs = [
@@ -427,6 +558,7 @@ for spec in specs:
     assert gen_class_name not in globals()
 
     upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or spec['UPGRADE_PATH'].upgrade_meta.matches_current_env_version_family
+    cls = type(gen_class_name, (TestThrift,), spec)
     if not upgrade_applies_to_env:
-        pytest.mark.skip(reason='test not applicable to env.')
-    globals()[gen_class_name] = type(gen_class_name, (TestThrift,), spec)
+        add_skip(cls, 'test not applicable to env.')
+    globals()[gen_class_name] = cls
diff --git a/upgrade_tests/upgrade_base.py b/upgrade_tests/upgrade_base.py
index a403835..b831084 100644
--- a/upgrade_tests/upgrade_base.py
+++ b/upgrade_tests/upgrade_base.py
@@ -70,7 +70,7 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
         super(UpgradeTester, self).setUp()
 
     def prepare(self, ordered=False, create_keyspace=True, use_cache=False, use_thrift=False,
-                nodes=None, rf=None, protocol_version=None, cl=None, **kwargs):
+                nodes=None, rf=None, protocol_version=None, cl=None, extra_config_options=None, **kwargs):
         nodes = self.NODES if nodes is None else nodes
         rf = self.RF if rf is None else rf
 
@@ -83,6 +83,9 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
 
         cluster = self.cluster
 
+        cluster.set_install_dir(version=self.UPGRADE_PATH.starting_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
+
         if ordered:
             cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner")
 
@@ -98,9 +101,11 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
 
         cluster.set_configuration_options(values={'internode_compression': 'none'})
 
+        if extra_config_options:
+            cluster.set_configuration_options(values=extra_config_options)
+
         cluster.populate(nodes)
         node1 = cluster.nodelist()[0]
-        cluster.set_install_dir(version=self.UPGRADE_PATH.starting_version)
         self.fixture_dtest_setup.enable_for_jolokia = kwargs.pop('jolokia', False)
         if self.fixture_dtest_setup.enable_for_jolokia:
             remove_perf_disable_shared_mem(node1)
@@ -131,7 +136,8 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
         node1 = self.cluster.nodelist()[0]
         node2 = self.cluster.nodelist()[1]
 
-        # stop the nodes
+        # stop the nodes, this can fail due to https://issues.apache.org/jira/browse/CASSANDRA-8220 on MacOS
+        # for the tests that run against 2.0. You will need to run those in Linux.
         node1.drain()
         node1.stop(gently=True)
 
@@ -165,7 +171,7 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
         node1.set_log_level(logging.getLevelName(logging.root.level))
         node1.set_configuration_options(values={'internode_compression': 'none'})
 
-        if use_thrift:
+        if use_thrift and node1.get_cassandra_version() < '4':
             node1.set_configuration_options(values={'start_rpc': 'true'})
 
         if self.fixture_dtest_setup.enable_for_jolokia:
@@ -245,3 +251,19 @@ class UpgradeTester(Tester, metaclass=ABCMeta):
              '')
         )
         assert self.UPGRADE_PATH is not None, no_upgrade_path_error
+
+    def upgrade_version_string(self):
+        """
+        Returns a hopefully useful version string that can be compared
+        to tune test behavior. For trunk this returns trunk, for an earlier
+        version like github:apache/cassandra-3.11 it returns a version number
+        as a string
+        :return:
+        """
+        version_string = self.UPGRADE_PATH.upgrade_version
+        if version_string.startswith('github'):
+            version_string = version_string.partition('/')[2]
+        if "-" in version_string:
+            version_string = version_string.partition('-')[2]
+        return version_string
+
diff --git a/upgrade_tests/upgrade_compact_storage.py b/upgrade_tests/upgrade_compact_storage.py
index ed85515..f3fe12a 100644
--- a/upgrade_tests/upgrade_compact_storage.py
+++ b/upgrade_tests/upgrade_compact_storage.py
@@ -34,6 +34,7 @@ class TestUpgradeSuperColumnsThrough(Tester):
             node.set_install_dir(version=tag)
             logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
         self.cluster.set_install_dir(version=tag)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
         # Restart nodes on new version
         for node in nodes:
@@ -45,6 +46,7 @@ class TestUpgradeSuperColumnsThrough(Tester):
 
         # Forcing cluster version on purpose
         cluster.set_install_dir(version=cassandra_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
         cluster.populate(num_nodes)
 
diff --git a/upgrade_tests/upgrade_manifest.py b/upgrade_tests/upgrade_manifest.py
index d26fd34..1cd9b2c 100644
--- a/upgrade_tests/upgrade_manifest.py
+++ b/upgrade_tests/upgrade_manifest.py
@@ -4,22 +4,73 @@ from collections import namedtuple
 
 from dtest import RUN_STATIC_UPGRADE_MATRIX
 
+import ccmlib.repository
+from ccmlib.common import get_version_from_build
+
+from enum import Enum
+
 logger = logging.getLogger(__name__)
 
 # UpgradePath's contain data about upgrade paths we wish to test
 # They also contain VersionMeta's for each version the path is testing
 UpgradePath = namedtuple('UpgradePath', ('name', 'starting_version', 'upgrade_version', 'starting_meta', 'upgrade_meta'))
 
+VERSION_FAMILY = None
+CONFIG = None
+
+
+def is_same_family_current_to_indev(origin, destination):
+    """
+    Within a version family it is useful to test that a prior release can upgrade to the indev version
+    """
+    return origin.family == destination.family and origin.variant == "current" and destination.variant == "indev"
+
+
+class VersionSelectionStrategies(Enum):
+    """
+    Test upgrading from indev -> indev, current -> current across versions, and current -> indev within a version
+    """
+    BOTH=(lambda origin, destination: (origin.variant == destination.variant) or is_same_family_current_to_indev(origin,destination))
+    """
+    Exclusively test in development branches so your bug fixes show up
+    """
+    INDEV=(lambda origin, destination: origin.variant == 'indev' and destination.variant == 'indev' or is_same_family_current_to_indev(origin, destination),)
+    """
+    Test upgrading from releases to the latest release as well as from the current release to the indev tip 
+    within the same version.
+    """
+    RELEASES=(lambda origin, destination: not VersionSelectionStrategies.INDEV.value[0](origin, destination) or is_same_family_current_to_indev(origin, destination),)
+
+
+def set_config(config):
+    global CONFIG
+    CONFIG = config
+    set_version_family()
 
-def _get_version_family():
+
+def set_version_family():
     """
     Detects the version family (line) using dtest.py:CASSANDRA_VERSION_FROM_BUILD
     """
     # todo CASSANDRA-14421
     # current_version = CASSANDRA_VERSION_FROM_BUILD
-    current_version = '4.0'
+    # There are times when we want to know the C* version we're testing against
+    # before we call Tester.setUp. In the general case, we can't know that -- the
+    # test method could use any version it wants for self.cluster. However, we can
+    # get the version from build.xml in the C* repository specified by
+    # CASSANDRA_VERSION or CASSANDRA_DIR. This should use the same resolution
+    # strategy as the actual checkout code in Tester.setUp; if it does not, that is
+    # a bug.
+    cassandra_version_slug = CONFIG.getoption("--cassandra-version")
+    cassandra_dir = CONFIG.getoption("--cassandra-dir")
+    # Prefer CASSANDRA_VERSION if it's set in the environment. If not, use CASSANDRA_DIR
+    if cassandra_version_slug:
+        # fetch but don't build the specified C* version
+        ccm_repo_cache_dir, _ = ccmlib.repository.setup(cassandra_version_slug)
+        current_version = get_version_from_build(ccm_repo_cache_dir)
+    else:
+        current_version = get_version_from_build(cassandra_dir)
 
-    version_family = 'unknown'
     if current_version.vstring.startswith('2.0'):
         version_family = '2.0.x'
     elif current_version.vstring.startswith('2.1'):
@@ -36,10 +87,9 @@ def _get_version_family():
         # when this occurs, it's time to update this manifest a bit!
         raise RuntimeError("4.1+ not yet supported on upgrade tests!")
 
-    return version_family
-
-
-VERSION_FAMILY = _get_version_family()
+    global VERSION_FAMILY
+    VERSION_FAMILY = version_family
+    logger.info("Setting version family to %s\n" % VERSION_FAMILY)
 
 
 class VersionMeta(namedtuple('_VersionMeta', ('name', 'family', 'variant', 'version', 'min_proto_v', 'max_proto_v', 'java_versions'))):
@@ -70,22 +120,19 @@ class VersionMeta(namedtuple('_VersionMeta', ('name', 'family', 'variant', 'vers
         return self
 
 
-indev_2_0_x = None  # None if release not likely
-current_2_0_x = VersionMeta(name='current_2_0_x', family='2.0.x', variant='current', version='2.0.17', min_proto_v=1, max_proto_v=2, java_versions=(7,))
-
-indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1.x', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
-current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', variant='current', version='2.1.17', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
+indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
+current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1', variant='current', version='2.1.20', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
 
-indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2.x', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
-current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', variant='current', version='2.2.9', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
+indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
+current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2', variant='current', version='2.2.13', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
 
-indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,))
-current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0.x', variant='current', version='3.0.12', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0', variant='current', version='3.0.17', min_proto_v=3, max_proto_v=4, java_versions=(8,))
 
-indev_3_x = VersionMeta(name='indev_3_x', family='3.x', variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,))
-current_3_x = VersionMeta(name='current_3_x', family='3.x', variant='current', version='3.10', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+indev_3_11_x = VersionMeta(name='indev_3_11_x', family='3.11', variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+current_3_11_x = VersionMeta(name='current_3_11_x', family='3.11', variant='current', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,))
 
-indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', version='github:apache/trunk', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', version='github:apache/trunk', min_proto_v=4, max_proto_v=5, java_versions=(8,))
 
 
 # MANIFEST maps a VersionMeta representing a line/variant to a list of other VersionMeta's representing supported upgrades
@@ -96,20 +143,17 @@ indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', v
 #   3) Nodes upgraded to version B can read data stored by the predecessor version A, and from a data standpoint will function the same as if they always ran version B.
 #   4) If a new sstable format is present in version B, writes will occur in that format after upgrade. Running sstableupgrade on version B will proactively convert version A sstables to version B.
 MANIFEST = {
-    indev_2_0_x: [indev_2_1_x, current_2_1_x],
-    current_2_0_x: [indev_2_0_x, indev_2_1_x, current_2_1_x],
+    indev_2_1_x: [indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
+    current_2_1_x: [indev_2_1_x, indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
 
-    indev_2_1_x: [indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x],
-    current_2_1_x: [indev_2_1_x, indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x],
+    indev_2_2_x: [indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
+    current_2_2_x: [indev_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
 
-    indev_2_2_x: [indev_3_0_x, current_3_0_x, indev_3_x, current_3_x],
-    current_2_2_x: [indev_2_2_x, indev_3_0_x, current_3_0_x, indev_3_x, current_3_x],
+    indev_3_0_x: [indev_3_11_x, current_3_11_x],
+    current_3_0_x: [indev_3_0_x, indev_3_11_x, current_3_11_x, indev_trunk],
 
-    indev_3_0_x: [indev_3_x, current_3_x],
-    current_3_0_x: [indev_3_0_x, indev_3_x, current_3_x, indev_trunk],
-
-    current_3_x: [indev_3_x, indev_trunk],
-    indev_3_x: [indev_trunk]
+    current_3_11_x: [indev_3_11_x, indev_trunk],
+    indev_3_11_x: [indev_trunk]
 }
 
 # Local env and custom path testing instructions. Use these steps to REPLACE the normal upgrade test cases with your own.
@@ -137,23 +181,6 @@ def _have_common_proto(origin_meta, destination_meta):
     """
     return origin_meta.max_proto_v >= destination_meta.min_proto_v
 
-
-def _is_targeted_variant_combo(origin_meta, destination_meta):
-    """
-    Takes two VersionMeta objects, in order of test from start version to next version.
-    Returns a boolean indicating if this is a test pair we care about.
-
-    for now we only test upgrades of these types:
-      current -> in-dev (aka: released -> branch)
-    """
-    # if we're overriding the test manifest, we don't want to filter anything out
-    if OVERRIDE_MANIFEST:
-        return True
-
-    # is this an upgrade variant combination we care about?
-    return (origin_meta.variant == 'current' and destination_meta.variant == 'indev')
-
-
 def build_upgrade_pairs():
     """
     Using the manifest (above), builds a set of valid upgrades, according to current testing practices.
@@ -163,14 +190,16 @@ def build_upgrade_pairs():
     valid_upgrade_pairs = []
     manifest = OVERRIDE_MANIFEST or MANIFEST
 
+    configured_strategy = CONFIG.getoption("--upgrade-version-selection").upper()
+    version_select_strategy = VersionSelectionStrategies[configured_strategy].value[0]
+
     for origin_meta, destination_metas in list(manifest.items()):
         for destination_meta in destination_metas:
-            if not (origin_meta and destination_meta):  # None means we don't care about that version, which means we don't care about iterations involving it either
-                logger.debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta))
+            if not version_select_strategy(origin_meta, destination_meta):
                 continue
 
-            if not _is_targeted_variant_combo(origin_meta, destination_meta):
-                logger.debug("skipping class creation, no testing of '{}' to '{}' (for {} upgrade to {})".format(origin_meta.variant, destination_meta.variant, origin_meta.name, destination_meta.name))
+            if not (origin_meta and destination_meta):  # None means we don't care about that version, which means we don't care about iterations involving it either
+                logger.debug("skipping class creation as a version is undefined (this is normal), versions: {} and {}".format(origin_meta, destination_meta))
                 continue
 
             if not _have_common_proto(origin_meta, destination_meta):
diff --git a/upgrade_tests/upgrade_schema_agreement_test.py b/upgrade_tests/upgrade_schema_agreement_test.py
index 13ff744..7b07a81 100644
--- a/upgrade_tests/upgrade_schema_agreement_test.py
+++ b/upgrade_tests/upgrade_schema_agreement_test.py
@@ -52,6 +52,7 @@ class TestSchemaAgreementUpgrade(Tester):
 
         # Forcing cluster version on purpose
         cluster.set_install_dir(version=version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         cluster.populate(num_nodes).start()
 
         return cluster
@@ -145,6 +146,7 @@ class TestSchemaAgreementUpgrade(Tester):
             logger.debug("")
             logger.debug("Upgrading cluster to {}".format(version))
             cluster.set_install_dir(version=version)
+            self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
             for node in nodes:
                 other_nodes = [n for n in nodes if n != node]
diff --git a/upgrade_tests/upgrade_supercolumns_test.py b/upgrade_tests/upgrade_supercolumns_test.py
index e216f5c..40ac11c 100644
--- a/upgrade_tests/upgrade_supercolumns_test.py
+++ b/upgrade_tests/upgrade_supercolumns_test.py
@@ -24,7 +24,7 @@ logger = logging.getLogger(__name__)
 # The data contained in the SSTables is (name, {'attr': {'name': name}}) for the name in NAMES.
 SCHEMA_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "schema-2.0.cql")
 TABLES_PATH = os.path.join("./", "upgrade_tests", "supercolumn-data", "cassandra-2.0", "supcols", "cols")
-NAMES = ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]
+NAMES = [name.encode() for name in ["Alice", "Bob", "Claire", "Dave", "Ed", "Frank", "Grace"]]
 
 
 @pytest.mark.upgrade_test
@@ -55,6 +55,7 @@ class TestSCUpgrade(Tester):
 
         # Forcing cluster version on purpose
         cluster.set_install_dir(version=cassandra_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         if "memtable_allocation_type" in cluster._config_options:
             del cluster._config_options['memtable_allocation_type']
         cluster.populate(num_nodes).start()
@@ -71,7 +72,7 @@ class TestSCUpgrade(Tester):
         client = get_thrift_client(host, port)
         client.transport.open()
         client.set_keyspace('supcols')
-        p = SlicePredicate(slice_range=SliceRange('', '', False, 1000))
+        p = SlicePredicate(slice_range=SliceRange(''.encode(), ''.encode(), False, 1000))
         for name in NAMES:
             super_col_value = client.get_slice(name, ColumnParent("cols"), p, ConsistencyLevel.ONE)
             logger.debug("get_slice(%s) returned %s" % (name, super_col_value))
@@ -79,7 +80,7 @@ class TestSCUpgrade(Tester):
 
     def verify_with_cql(self, session):
         session.execute("USE supcols")
-        expected = [[name, 'attr', 'name', name] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']]
+        expected = [[name.encode(), 'attr'.encode(), 'name', name.encode()] for name in ['Grace', 'Claire', 'Dave', 'Frank', 'Ed', 'Bob', 'Alice']]
         assert_all(session, "SELECT * FROM cols", expected)
 
     def _upgrade_super_columns_through_versions_test(self, upgrade_path):
@@ -123,15 +124,17 @@ class TestSCUpgrade(Tester):
         self.verify_with_thrift()
 
         for version in upgrade_path:
+            if version == 'git:cassandra-4.0' or version == 'git:trunk':
+                session.execute("ALTER TABLE supcols.cols DROP COMPACT STORAGE")
             self.upgrade_to_version(version)
 
-            if self.cluster.version() < '4':
-                node1.nodetool("enablethrift")
-
             session = self.patient_exclusive_cql_connection(node1)
 
             self.verify_with_cql(session)
-            self.verify_with_thrift()
+
+            if self.cluster.version() < '4':
+                node1.nodetool("enablethrift")
+                self.verify_with_thrift()
 
         cluster.remove(node=node1)
 
@@ -156,11 +159,13 @@ class TestSCUpgrade(Tester):
         # Update Cassandra Directory
         for node in nodes:
             node.set_install_dir(version=tag)
+            logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
+        self.cluster.set_install_dir(version=tag)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
+        for node in nodes:
             if tag < "2.1":
                 if "memtable_allocation_type" in node.config_options:
                     node.config_options.__delitem__("memtable_allocation_type")
-            logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
-        self.cluster.set_install_dir(version=tag)
 
         # Restart nodes on new version
         for node in nodes:
diff --git a/upgrade_tests/upgrade_through_versions_test.py b/upgrade_tests/upgrade_through_versions_test.py
index 397ea15..41bb7ec 100644
--- a/upgrade_tests/upgrade_through_versions_test.py
+++ b/upgrade_tests/upgrade_through_versions_test.py
@@ -19,9 +19,10 @@ from cassandra.query import SimpleStatement
 from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester
 from tools.misc import generate_ssl_stores, new_node
 from .upgrade_base import switch_jdks
-from .upgrade_manifest import (build_upgrade_pairs, current_2_0_x,
-                              current_2_1_x, current_2_2_x, current_3_0_x,
-                              indev_2_2_x, indev_3_x)
+from .upgrade_manifest import (build_upgrade_pairs,
+                               current_2_1_x, current_2_2_x, current_3_0_x,
+                               indev_3_11_x,
+                               current_3_11_x, indev_trunk)
 
 logger = logging.getLogger(__name__)
 
@@ -67,7 +68,7 @@ def data_writer(tester, to_verify_queue, verification_done_queue, rewrite_probab
 
             session.execute(prepared, (val, key))
 
-            to_verify_queue.put_nowait((key, val,))
+            to_verify_queue.put((key, val,))
         except Exception:
             logger.debug("Error in data writer process!")
             to_verify_queue.close()
@@ -122,7 +123,7 @@ def data_checker(tester, to_verify_queue, verification_done_queue):
                 # rewrite rows in the same sequence as originally written
                 pass
 
-        tester.assertEqual(expected_val, actual_val, "Data did not match expected value!")
+        assert expected_val == actual_val, "Data did not match expected value!"
 
 
 def counter_incrementer(tester, to_verify_queue, verification_done_queue, rewrite_probability=0):
@@ -225,6 +226,7 @@ def counter_checker(tester, to_verify_queue, verification_done_queue):
 
 @pytest.mark.upgrade_test
 @pytest.mark.resource_intensive
+@pytest.mark.skip("Fake skip so that this isn't run outside of a generated class that removes this annotation")
 class TestUpgrade(Tester):
     """
     Upgrades a 3-node Murmur3Partitioner cluster through versions specified in test_version_metas.
@@ -247,13 +249,13 @@ class TestUpgrade(Tester):
             r'Unknown column cdc during deserialization',
         )
 
-    def setUp(self):
+    def prepare(self):
         logger.debug("Upgrade test beginning, setting CASSANDRA_VERSION to {}, and jdk to {}. (Prior values will be restored after test)."
               .format(self.test_version_metas[0].version, self.test_version_metas[0].java_version))
-        os.environ['CASSANDRA_VERSION'] = self.test_version_metas[0].version
+        cluster = self.cluster
+        cluster.set_install_dir(version=self.test_version_metas[0].version)
         switch_jdks(self.test_version_metas[0].java_version)
-
-        super(TestUpgrade, self).setUp()
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         logger.debug("Versions to test (%s): %s" % (type(self), str([v.version for v in self.test_version_metas])))
 
     def init_config(self):
@@ -273,6 +275,7 @@ class TestUpgrade(Tester):
         """
         self.upgrade_scenario()
 
+    @pytest.mark.timeout(3000)
     def test_rolling_upgrade(self):
         """
         Test rolling upgrade of the cluster, so we have mixed versions part way through.
@@ -285,6 +288,7 @@ class TestUpgrade(Tester):
         """
         self.upgrade_scenario(internode_ssl=True)
 
+    @pytest.mark.timeout(3000)
     def test_rolling_upgrade_with_internode_ssl(self):
         """
         Rolling upgrade test using internode ssl.
@@ -293,6 +297,8 @@ class TestUpgrade(Tester):
 
     def upgrade_scenario(self, populate=True, create_schema=True, rolling=False, after_upgrade_call=(), internode_ssl=False):
         # Record the rows we write as we go:
+        if populate:
+            self.prepare()
         self.row_values = set()
         cluster = self.cluster
         if cluster.version() >= '3.0':
@@ -350,6 +356,7 @@ class TestUpgrade(Tester):
                           (num + 1, len(self.cluster.nodelist()), version_meta.version))
 
                 self.cluster.set_install_dir(version=version_meta.version)
+                self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
             # Stop write processes
             write_proc.terminate()
@@ -367,6 +374,7 @@ class TestUpgrade(Tester):
 
                 self.upgrade_to_version(version_meta, internode_ssl=internode_ssl)
                 self.cluster.set_install_dir(version=version_meta.version)
+                self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
 
                 self._check_values()
                 self._check_counters()
@@ -432,7 +440,7 @@ class TestUpgrade(Tester):
         for node in nodes:
             node.set_install_dir(version=version_meta.version)
             logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
-            if internode_ssl and version_meta.version >= '4.0':
+            if internode_ssl and (version_meta.family == 'trunk' or version_meta.family >= '4.0'):
                 node.set_configuration_options({'server_encryption_options': {'enabled': True, 'enable_legacy_ssl_storage_port': True}})
 
         # hacky? yes. We could probably extend ccm to allow this publicly.
@@ -553,7 +561,7 @@ class TestUpgrade(Tester):
         Returns the writer process, verifier process, and the to_verify_queue.
         """
         # queue of writes to be verified
-        to_verify_queue = Queue()
+        to_verify_queue = Queue(10000)
         # queue of verified writes, which are update candidates
         verification_done_queue = Queue(maxsize=500)
 
@@ -629,7 +637,7 @@ class TestUpgrade(Tester):
             if fail_count > 100:
                 break
 
-        assert fail_count, 100 < "Too many counter increment failures"
+        assert fail_count < 100, "Too many counter increment failures"
 
     def _check_counters(self):
         logger.debug("Checking counter values...")
@@ -668,7 +676,6 @@ class TestUpgrade(Tester):
         else:
             self.fail("Count query did not return")
 
-
 class BootstrapMixin(object):
     """
     Can be mixed into UpgradeTester or a subclass thereof to add bootstrap tests.
@@ -705,6 +712,7 @@ class BootstrapMixin(object):
     def test_bootstrap_multidc(self):
         # try and add a new node
         # multi dc, 2 nodes in each dc
+        self.prepare()
         cluster = self.cluster
 
         if cluster.version() >= '3.0':
@@ -773,13 +781,16 @@ def create_upgrade_class(clsname, version_metas, protocol_version,
     print("  to run these tests alone, use `nosetests {}.py:{}`".format(__name__, clsname))
 
     upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or version_metas[-1].matches_current_env_version_family
-    if not upgrade_applies_to_env:
-        pytest.mark.skip(reason='test not applicable to env.')
     newcls = type(
             clsname,
             parent_classes,
             {'test_version_metas': version_metas, '__test__': True, 'protocol_version': protocol_version, 'extra_config': extra_config}
         )
+    # Remove the skip annotation in the superclass we just derived from, we will add it back if we actually intend
+    # to skip with a better message
+    newcls.pytestmark = [mark for mark in newcls.pytestmark if not mark.name == "skip"]
+    if not upgrade_applies_to_env:
+        newcls.pytestmark.append(pytest.mark.skip("test not applicable to env"))
 
     if clsname in globals():
         raise RuntimeError("Class by name already exists!")
@@ -791,41 +802,33 @@ def create_upgrade_class(clsname, version_metas, protocol_version,
 MultiUpgrade = namedtuple('MultiUpgrade', ('name', 'version_metas', 'protocol_version', 'extra_config'))
 
 MULTI_UPGRADES = (
-    # Proto v1 upgrades (v1 supported on 2.0, 2.1, 2.2)
-    MultiUpgrade(name='ProtoV1Upgrade_AllVersions_EndsAt_indev_2_2_x',
-                 version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=1, extra_config=None),
-    MultiUpgrade(name='ProtoV1Upgrade_AllVersions_RandomPartitioner_EndsAt_indev_2_2_x',
-                 version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=1,
-                 extra_config=(
-                     ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'),
-                 )),
-
-    # Proto v2 upgrades (v2 is supported on 2.0, 2.1, 2.2)
-    MultiUpgrade(name='ProtoV2Upgrade_AllVersions_EndsAt_indev_2_2_x',
-                 version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=2, extra_config=None),
-    MultiUpgrade(name='ProtoV2Upgrade_AllVersions_RandomPartitioner_EndsAt_indev_2_2_x',
-                 version_metas=[current_2_0_x, current_2_1_x, indev_2_2_x], protocol_version=2,
-                 extra_config=(
-                     ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'),
-                 )),
-
-    # Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.1, trunk)
-    MultiUpgrade(name='ProtoV3Upgrade_AllVersions_EndsAt_Trunk_HEAD',
-                 version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3, extra_config=None),
-    MultiUpgrade(name='ProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
-                 version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3,
+    # Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.11)
+    MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_3_11_X',
+                 version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=None),
+    MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_3_11_X_HEAD',
+                 version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3,
                  extra_config=(
                      ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'),
                  )),
 
     # Proto v4 upgrades (v4 is supported on 2.2, 3.0, 3.1, trunk)
-    MultiUpgrade(name='ProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD',
-                 version_metas=[current_2_2_x, current_3_0_x, indev_3_x], protocol_version=4, extra_config=None),
-    MultiUpgrade(name='ProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
-                 version_metas=[current_2_2_x, current_3_0_x, indev_3_x], protocol_version=4,
+    MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD',
+                 version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4, extra_config=None),
+    MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
+                 version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4,
                  extra_config=(
                      ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'),
                  )),
+    #Beta versions don't work with this test since it doesn't specify use beta in the client
+    #It's fine I guess for now? Can update on release
+    # Proto v5 upgrades (v5 is supported on 3.0, 3.11, trunk)
+    # MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD',
+    #              version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5, extra_config=None),
+    # MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
+    #              version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5,
+    #              extra_config=(
+    #                  ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'),
+    #              )),
 )
 
 for upgrade in MULTI_UPGRADES:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org