You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2020/08/12 14:14:12 UTC

[cassandra-dtest] branch master updated: Add test for the upgrade of KEYS 2i to 4.0 (CASSANDRA-15906)

This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-dtest.git


The following commit(s) were added to refs/heads/master by this push:
     new f25832a  Add test for the upgrade of KEYS 2i to 4.0 (CASSANDRA-15906)
f25832a is described below

commit f25832aef72d62e9f00a45c54b2cdc17dc9f2d7b
Author: Sylvain Lebresne <le...@gmail.com>
AuthorDate: Thu Jun 25 17:52:14 2020 +0200

    Add test for the upgrade of KEYS 2i to 4.0 (CASSANDRA-15906)
---
 upgrade_tests/thrift_upgrade_test.py | 195 ++++++++++++++++++++++++++++++++++-
 1 file changed, 190 insertions(+), 5 deletions(-)

diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py
index b427660..8846625 100644
--- a/upgrade_tests/thrift_upgrade_test.py
+++ b/upgrade_tests/thrift_upgrade_test.py
@@ -2,15 +2,17 @@ import itertools
 import pytest
 import logging
 
-from cassandra.query import dict_factory
+from cassandra.query import dict_factory, SimpleStatement
 
 from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester
 from thrift_bindings.thrift010 import Cassandra
 from thrift_bindings.thrift010.Cassandra import (Column, ColumnDef,
-                                           ColumnParent, ConsistencyLevel,
-                                           SlicePredicate, SliceRange)
+                                                 ColumnParent, ConsistencyLevel,
+                                                 IndexType,
+                                                 SlicePredicate, SliceRange)
 from thrift_test import _i64, get_thrift_client
-from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal
+from tools.assertions import (assert_all, assert_length_equal,
+                              assert_lists_of_dicts_equal)
 from tools.misc import wait_for_agreement, add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
@@ -187,7 +189,6 @@ def _validate_dense_thrift(client, cf='dense_super_1'):
         assert cosc.super_column.columns[0].name == _i64(100)
         assert cosc.super_column.columns[0].value == 'value1'.encode()
 
-
 @pytest.mark.upgrade_test
 class TestUpgradeSuperColumnsThrough(Tester):
     def upgrade_to_version(self, tag, nodes=None):
@@ -373,6 +374,190 @@ class TestUpgradeSuperColumnsThrough(Tester):
 
 
 @pytest.mark.upgrade_test
+@since('4')
+class TestUpgradeTo40(Tester):
+    """
+    Thrift is dead in 4.0. However, we still want to ensure users that used thrift
+     in 3.0 or earlier have an upgrade path to 4.0 and this class provides tests
+     cases for this.
+
+     Note that we don't want to run this if the "current" version (the one we're
+     upgrading to) is not 4.0 or more, as the tests makes assumptions on that.
+    """
+    def prepare(self, start_version, num_nodes=1, rf=1):
+        """
+        Prepare the test, starting a cluster on the initial version, creating
+        a keyspace (named 'ks') and returning a CQL and a thrift connection to
+        the first node (and set on the created keyspace).
+
+        :param start_version: the version to set the node at initially.
+        :param num_nodes: the number of nodes to use.
+        :param rf: replication factor for the keyspace created.
+        :return: a pair (cql, thrift) of a CQL connection and an open thrift
+            connection to the first node in the cluster.
+        """
+        self.cluster.set_install_dir(version=start_version)
+        self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
+
+        self.cluster.populate(num_nodes)
+        for node in self.cluster.nodelist():
+            node.set_configuration_options(values={'start_rpc': 'true'})
+
+        self.cluster.start()
+        logger.debug("Started node on %s", start_version)
+
+        node = self.cluster.nodelist()[0]
+        cql = self.patient_cql_connection(node)
+
+        cql.execute("CREATE KEYSPACE ks WITH replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '{}' }}".format(rf))
+        cql.execute("USE ks")
+
+        host, port = node.network_interfaces['thrift']
+        thrift = get_thrift_client(host, port)
+        thrift.transport.open()
+        thrift.set_keyspace('ks')
+        return cql, thrift
+
+    def _connect(self, node):
+        connection = self.patient_cql_connection(node)
+        connection.execute("USE ks")
+        return connection
+
+    def _do_rolling_upgrade(self, after_each_upgrade=None):
+        """
+        Upgrade all the nodes in the cluster to the "current" version (so 4.0+)
+        in a rolling fashion.
+
+        :param after_each_upgrade: if not None, a function that is called with 2
+          arguments (in that order):
+          - the index of the node we just upgraded.
+          - a CQL connection to that node we just upgraded
+        :return: a CQL connection to the first node in the cluster (now upgraded).
+        """
+        for idx, node in enumerate(self.cluster.nodelist(), start=1):
+            self.set_node_to_current_version(node)
+            upgraded_version = node.get_cassandra_version()
+            logger.debug("Upgrading node %i (%s) to %s", idx, node.address(), upgraded_version)
+            node.stop()
+            node.start()
+            if after_each_upgrade:
+                after_each_upgrade(idx, self._connect(node))
+
+        return self._connect(self.cluster.nodelist()[0])
+
+    def test_keys_index_3_0_created(self):
+        self.test_keys_index_3_x_created('github:apache/cassandra-3.0')
+
+    def test_keys_index_3_11_created(self):
+        self.test_keys_index_3_x_created('github:apache/cassandra-3.11')
+
+    def test_keys_index_3_x_created(self, from_version):
+        cql, thrift = self.prepare(start_version=from_version,
+                                   num_nodes=3,
+                                   rf=3)
+
+        # Create a table with a KEYS index. This can only be done from thrift.
+        logger.debug("Creating table with index from thrift")
+
+        indexed_column = ColumnDef('c1'.encode(), 'UTF8Type', IndexType.KEYS, 'idx')
+        other_column = ColumnDef('c2'.encode(), 'UTF8Type', None, None)
+        table_def = Cassandra.CfDef(
+            'ks',
+            'ti',
+            key_validation_class='UTF8Type',
+            comparator_type='UTF8Type',
+            default_validation_class='UTF8Type',
+            column_metadata=[indexed_column, other_column],
+        )
+        thrift.system_add_column_family(table_def)
+        logger.debug("Waiting for schema agreement")
+        wait_for_agreement(thrift)
+
+        # We're going to insert and delete some rows, and need to validate the
+        # indexed entries are what we expect. To make this easier, we define
+        # _insert and _delete methods that not only insert/delete the provided
+        # rows, but also keep track of all the entries whose 'c1 == v1' in
+        # `expected_entries`, as that is the index value we'll use for
+        # validation.
+
+        expected_entries = []
+
+        def _insert(connection, r):
+            logger.debug("Inserting %s", r)
+            q = "INSERT INTO ti(key, c1, c2) VALUES ('{}', '{}', '{}')".format(r[0], r[1], r[2])
+            connection.execute(SimpleStatement(q, consistency_level=ConsistencyLevel.QUORUM))
+            if r[1] == 'v1':
+                expected_entries.append(r)
+
+        def _delete(connection, r):
+            logger.debug("Deleting %s", r)
+            q = "DELETE FROM ti WHERE key='{}'".format(r[0])
+            connection.execute(SimpleStatement(q, consistency_level=ConsistencyLevel.QUORUM))
+            if r[1] == 'v1':
+                expected_entries.remove(r)
+
+        def _validate_entries(connection):
+            logger.debug("Expecting entries %s", expected_entries)
+            assert_all(connection, "SELECT key, c2 FROM ti WHERE c1='v1'",
+                       [[key, c2] for [key, _, c2] in expected_entries],
+                       ignore_order=True, cl=ConsistencyLevel.QUORUM)
+
+        to_insert = [
+            ['k0', 'v1', 'goo'],
+            ['k1', 'v1', 'foo'],
+            ['k2', 'v2', 'bar'],
+            ['k3', 'v1', 'baz'],
+            ['k4', 'v3', 'oof'],
+            ['k5', 'v0', 'zab'],
+        ]
+        for row in to_insert:
+            _insert(cql, row)
+
+        # Sanity check that we can query the index properly
+        logger.debug("Checking index before upgrade")
+        _validate_entries(cql)
+
+        # Delete one entry, so we test upgrade with a tombstone in
+        _delete(cql, to_insert[1])
+        _validate_entries(cql)
+
+        # Before upgrading, we need to DROP COMPACT first, or this won't work.
+        cql.execute("ALTER TABLE ti DROP COMPACT STORAGE")
+
+        # Let's make sure our DROP COMPACT STORAGE didn't break our index even
+        # before upgrade.
+        _validate_entries(cql)
+
+        # At every step, we'll add a few entries and ensure we can query the
+        # index. Specifically, each node will add 4 keys, 2 indexed, 2 non
+        # indexed, query from all nodes, then remove one of the indexed entry
+        # and query again.
+        def _after_upgrade(idx, client):
+            logger.debug("Checking index after upgrade of node %i", idx)
+
+            added = []
+            for i in range(4):
+                key = 'k{}{}'.format(idx, i)
+                c1 = 'v1' if i % 2 == 0 else 'v2'
+                c2 = 'val{}{}'.format(idx, i)
+                to_add = [key, c1, c2]
+                _insert(client, to_add)
+                added.append(to_add)
+
+            # Test querying from every node, so we hit both upgraded and
+            # non-upgraded in general
+            for idx, node in enumerate(self.cluster.nodelist(), start=1):
+                _validate_entries(self._connect(node))
+
+            _delete(client, added[0])
+
+            for idx, node in enumerate(self.cluster.nodelist(), start=1):
+                _validate_entries(self._connect(node))
+
+        self._do_rolling_upgrade(_after_upgrade)
+
+
+@pytest.mark.upgrade_test
 @since('2.1', max_version='3.99')
 class TestThrift(UpgradeTester):
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org