You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2018/02/11 19:45:39 UTC

cassandra-dtest git commit: CASSANDRA-12148 cdc hard-link support

Repository: cassandra-dtest
Updated Branches:
  refs/heads/master 781c4ecdf -> 19a17fa90


CASSANDRA-12148 cdc hard-link support


Project: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/commit/19a17fa9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/tree/19a17fa9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra-dtest/diff/19a17fa9

Branch: refs/heads/master
Commit: 19a17fa90c1156ce1306e459249d67b759cb0f01
Parents: 781c4ec
Author: Josh McKenzie <jm...@apache.org>
Authored: Wed Aug 3 13:30:25 2016 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Sun Feb 11 14:23:43 2018 -0500

----------------------------------------------------------------------
 cdc_test.py                | 288 ++++++++++++++++++++++++++++++++--------
 cqlsh_tests/cqlsh_tools.py |  30 +++++
 2 files changed, 259 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/19a17fa9/cdc_test.py
----------------------------------------------------------------------
diff --git a/cdc_test.py b/cdc_test.py
index 28b66d1..4cbbb81 100644
--- a/cdc_test.py
+++ b/cdc_test.py
@@ -1,29 +1,32 @@
+from __future__ import division
+
 import errno
 import os
+import re
 import shutil
 import time
 import uuid
 from collections import namedtuple
-import pytest
-import logging
-
 from itertools import repeat
+from pprint import pformat
 
+import pytest
 from cassandra import WriteFailure
 from cassandra.concurrent import (execute_concurrent,
                                   execute_concurrent_with_args)
 from ccmlib.node import Node
+from nose.tools import assert_equal, assert_less_equal, assert_not_equal, assert_less, assert_true, \
+    assert_greater_equal, assert_not_in, assert_is_not_none, assert_raises
 
-from dtest import Tester, create_ks
+from cqlsh_tests.cqlsh_tools import assert_resultset_contains
+from dtest import Tester, create_ks, logger
+from tools.assertions import assert_length_equal
 from tools.data import rows_to_list
 from tools.files import size_of_files_in_dir
 from tools.funcutils import get_rate_limited_function
 from tools.hacks import advance_to_next_cl_segment
-from tools.assertions import assert_lists_equal_ignoring_order
 
 since = pytest.mark.since
-logger = logging.getLogger(__name__)
-
 
 _16_uuid_column_spec = (
     'a uuid PRIMARY KEY, b uuid, c uuid, d uuid, e uuid, f uuid, g uuid, '
@@ -42,12 +45,12 @@ def _insert_rows(session, table_name, insert_stmt, values):
     logger.debug('{n} rows inserted into {table_name}'.format(n=len(data_loaded), table_name=table_name))
     # use assert_equal over assert_length_equal to avoid printing out
     # potentially large lists
-    assert len(values) == len(data_loaded)
+    assert_equal(len(values), len(data_loaded))
     return data_loaded
 
 
-def _move_contents(source_dir, dest_dir, verbose=True):
-    for source_filename in os.listdir(source_dir):
+def _move_commitlog_segments(source_dir, dest_dir, verbose=True):
+    for source_filename in [name for name in os.listdir(source_dir) if not name.endswith('_cdc.idx')]:
         source_path, dest_path = (os.path.join(source_dir, source_filename),
                                   os.path.join(dest_dir, source_filename))
         if verbose:
@@ -74,28 +77,30 @@ def _get_create_table_statement(ks_name, table_name, column_spec, options=None):
 
     return (
         'CREATE TABLE ' + ks_name + '.' + table_name + ' '
-        '(' + column_spec + ') ' + options_string
+                                                       '(' + column_spec + ') ' + options_string
     )
 
 
-def _write_to_cdc_WriteFailure(session, insert_stmt):
+def _write_to_cdc_write_failure(session, insert_stmt):
     prepared = session.prepare(insert_stmt)
     start, rows_loaded, error_found = time.time(), 0, False
-    rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5)
+    rate_limited_debug = get_rate_limited_function(logger.debug, 5)
     while not error_found:
         # We want to fail if inserting data takes too long. Locally this
         # takes about 10s, but let's be generous.
-        assert (
-            (time.time() - start) <= 600), "It's taken more than 10 minutes to reach a WriteFailure trying " + \
-            "to overrun the space designated for CDC commitlogs. This could " + \
-            "be because data isn't being written quickly enough in this " + \
-            "environment, or because C* is failing to reject writes when " + \
-            "it should."
+        assert_less_equal(
+            (time.time() - start), 600,
+            "It's taken more than 10 minutes to reach a WriteFailure trying "
+            'to overrun the space designated for CDC commitlogs. This could '
+            "be because data isn't being written quickly enough in this "
+            'environment, or because C* is failing to reject writes when '
+            'it should.'
+        )
 
         # If we haven't logged from here in the last 5s, do so.
-        rate_limited_debug_logger(
-            "  data load step has lasted {s:.2f}s, " +
-            "loaded {r} rows".format(s=(time.time() - start), r=rows_loaded))
+        rate_limited_debug(
+            '  data load step has lasted {s:.2f}s, '
+            'loaded {r} rows'.format(s=(time.time() - start), r=rows_loaded))
 
         batch_results = list(execute_concurrent(
             session,
@@ -111,17 +116,18 @@ def _write_to_cdc_WriteFailure(session, insert_stmt):
         # number of successfully completed statements...
         rows_loaded += len([br for br in batch_results if br[0]])
         # then, we make sure that the only failures are the expected
-        # WriteFailures.
-        assert [] == [result for (success, result) in batch_results
-                       if not success and not isinstance(result, WriteFailure)]
+        # WriteFailure.
+        assert_equal([],
+                     [result for (success, result) in batch_results
+                      if not success and not isinstance(result, WriteFailure)])
         # Finally, if we find a WriteFailure, that means we've inserted all
         # the CDC data we can and so we flip error_found to exit the loop.
         if any(isinstance(result, WriteFailure) for (_, result) in batch_results):
             logger.debug("write failed (presumably because we've overrun "
-                  'designated CDC commitlog space) after '
-                  'loading {r} rows in {s:.2f}s'.format(
-                      r=rows_loaded,
-                      s=time.time() - start))
+                         'designated CDC commitlog space) after '
+                         'loading {r} rows in {s:.2f}s'.format(
+                             r=rows_loaded,
+                             s=time.time() - start))
             error_found = True
     return rows_loaded
 
@@ -172,12 +178,14 @@ def _get_set_cdc_func(session, ks_name, table_name):
     that takes enables CDC on that keyspace if its argument is truthy and
     otherwise disables it.
     """
+
     def set_cdc(value):
         return _set_cdc_on_table(
             session=session,
             ks_name=ks_name, table_name=table_name,
             value=value
         )
+
     return set_cdc
 
 
@@ -201,11 +209,20 @@ def _get_cdc_raw_files(node_path, cdc_raw_dir_name='cdc_raw'):
 class TestCDC(Tester):
     """
     @jira_ticket CASSANDRA-8844
+    @jira_ticket CASSANDRA-12148
 
     Test the correctness of some features of CDC, Change Data Capture, which
     provides a view of the commitlog on tables for which it is enabled.
     """
 
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.allow_log_errors = True
+        fixture_dtest_setup.ignore_log_patterns = (
+            # We expect to see this error in the logs when we reach CDC limit
+            r'Failed to apply mutation locally'
+        )
+
     def _create_temp_dir(self, dir_name, verbose=True):
         """
         Create a directory that will be deleted when this test class is torn
@@ -259,8 +276,8 @@ class TestCDC(Tester):
         create_ks(session, ks_name, rf=1)
 
         if table_name is not None:
-            assert cdc_enabled_table is not None, 'if creating a table in prepare is not None, must specify whether or not CDC is enabled on it'
-            assert column_spec is not None, 'if creating a table in prepare is not None, must specify its schema'
+            assert_is_not_none(cdc_enabled_table, 'if creating a table in prepare, must specify whether or not CDC is enabled on it')
+            assert_is_not_none(column_spec, 'if creating a table in prepare, must specify its schema')
             options = {}
             if gc_grace_seconds is not None:
                 options['gc_grace_seconds'] = gc_grace_seconds
@@ -294,15 +311,20 @@ class TestCDC(Tester):
         set_cdc = _get_set_cdc_func(session=session, ks_name=ks_name, table_name=table_name)
 
         insert_stmt = session.prepare('INSERT INTO ' + table_name + ' (a, b) VALUES (?, ?)')
-        data = tuple(zip(list(range(1000)), list(range(1000))))
+        # data = zip(list(range(1000)), list(range(1000)))
+        start = 0
+        stop = 1000
+        step = 1
+        data = [(n, min(n+step, stop)) for n in range(start, stop, step)]
+
         execute_concurrent_with_args(session, insert_stmt, data)
 
         # We need data to be in commitlogs, not sstables.
-        assert [] == list(node.get_sstables(ks_name, table_name))
+        assert_equal([], list(node.get_sstables(ks_name, table_name)))
 
         for enable in alter_path:
             set_cdc(enable)
-            assert_lists_equal_ignoring_order(session.execute('SELECT * FROM ' + table_name), data)
+            assert_resultset_contains(session.execute('SELECT * FROM ' + table_name), data)
 
     def test_cdc_enabled_data_readable_on_round_trip(self):
         """
@@ -318,18 +340,15 @@ class TestCDC(Tester):
         """
         self._assert_cdc_data_readable_on_round_trip(start_with_cdc_enabled=False)
 
-    @pytest.mark.skip(reason="Test always fails so skipping until fixed. Tracked with CASSANDRA-14146")
     def test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
         """
         Test that C* behaves correctly when CDC tables have consumed all the
         space available to them. In particular: after writing
         cdc_total_space_in_mb MB into CDC commitlogs:
-
         - CDC writes are rejected
         - non-CDC writes are accepted
         - on flush, CDC commitlogs are copied to cdc_raw
         - on flush, non-CDC commitlogs are not copied to cdc_raw
-
         This is a lot of behavior to validate in one test, but we do so to
         avoid running multiple tests that each write 1MB of data to fill
         cdc_total_space_in_mb.
@@ -346,11 +365,6 @@ class TestCDC(Tester):
             # Make CDC space as small as possible so we can fill it quickly.
             'cdc_total_space_in_mb': 4,
         }
-
-        self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [
-            r'org.apache.cassandra.exceptions.CDCWriteException: Rejecting mutation to keyspace ks. '
-            r'Free up space in .* by processing CDC logs']
-
         node, session = self.prepare(
             ks_name=ks_name,
             configuration_overrides=configuration_overrides
@@ -383,10 +397,11 @@ class TestCDC(Tester):
         node.flush()
         # Then, we insert rows into the CDC table until we can't anymore.
         logger.debug('beginning data insert to fill CDC commitlogs')
-        rows_loaded = _write_to_cdc_WriteFailure(session, full_cdc_table_info.insert_stmt)
+        rows_loaded = _write_to_cdc_write_failure(session, full_cdc_table_info.insert_stmt)
 
-        assert 0 < rows_loaded, 'No CDC rows inserted. ' \
-                                'This may happen when cdc_total_space_in_mb > commitlog_segment_size_in_mb'
+        assert_less(0, rows_loaded,
+                    'No CDC rows inserted. This may happen when '
+                    'cdc_total_space_in_mb > commitlog_segment_size_in_mb')
 
         commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
         commitlogs_size = size_of_files_in_dir(commitlog_dir)
@@ -394,10 +409,10 @@ class TestCDC(Tester):
 
         # We should get a WriteFailure when trying to write to the CDC table
         # that's filled the designated CDC space...
-        with pytest.raises(WriteFailure):
+        with assert_raises(WriteFailure):
             session.execute(full_cdc_table_info.insert_stmt)
         # or any CDC table.
-        with pytest.raises(WriteFailure):
+        with assert_raises(WriteFailure):
             session.execute(empty_cdc_table_info.insert_stmt)
 
         # Now we test for behaviors of non-CDC tables when we've exceeded
@@ -411,6 +426,15 @@ class TestCDC(Tester):
         node.start(wait_for_binary_proto=True)
         session = self.patient_cql_connection(node)
         pre_non_cdc_write_cdc_raw_segments = _get_cdc_raw_files(node.get_path())
+
+        # Snapshot the _cdc.idx file if > 4.0 for comparison at end
+        before_cdc_state = []  # init empty here to quiet PEP
+        if self.cluster.version() >= '4.0':
+            # Create ReplayData objects for each index file found in loading cluster
+            node1_path = os.path.join(node.get_path(), 'cdc_raw')
+            before_cdc_state = [ReplayData.load(node1_path, name)
+                                for name in os.listdir(node1_path) if name.endswith('_cdc.idx')]
+
         # save the names of all the commitlog segments written up to this
         # point:
         pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
@@ -427,15 +451,16 @@ class TestCDC(Tester):
         #
         # First, write to non-cdc tables.
         start, time_limit = time.time(), 600
-        rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5)
+        rate_limited_debug = get_rate_limited_function(logger.debug, 5)
         logger.debug('writing to non-cdc table')
         # We write until we get a new commitlog segment.
         while _get_commitlog_files(node.get_path()) <= pre_non_cdc_write_segments:
             elapsed = time.time() - start
-            rate_limited_debug_logger('  non-cdc load step has lasted {s:.2f}s'.format(s=elapsed))
-            assert elapsed <= time_limit, \
-                "It's been over a {s}s and we haven't written a new commitlog segment. Something is wrong.".format(s=time_limit)
-
+            rate_limited_debug('  non-cdc load step has lasted {s:.2f}s'.format(s=elapsed))
+            assert_less_equal(elapsed, time_limit,
+                              "It's been over a {s}s and we haven't written a new "
+                              "commitlog segment. Something is wrong.".format(s=time_limit)
+                              )
             execute_concurrent(
                 session,
                 ((non_cdc_prepared_insert, ()) for _ in range(1000)),
@@ -446,7 +471,51 @@ class TestCDC(Tester):
         # Finally, we check that draining doesn't move any new segments to cdc_raw:
         node.drain()
         session.cluster.shutdown()
-        assert pre_non_cdc_write_cdc_raw_segments == _get_cdc_raw_files(node.get_path())
+
+        if self.cluster.version() < '4.0':
+            assert_equal(pre_non_cdc_write_cdc_raw_segments, _get_cdc_raw_files(node.get_path()))
+        else:
+            # Create ReplayData objects for each index file found in loading cluster
+            node2_path = os.path.join(node.get_path(), 'cdc_raw')
+            after_cdc_state = [ReplayData.load(node2_path, name)
+                               for name in os.listdir(node2_path) if name.endswith('_cdc.idx')]
+
+            # Confirm all indexes in 1st are accounted for and match corresponding entry in 2nd.
+            found = True
+            for idx in before_cdc_state:
+                idx_found = False
+                for idx_two in after_cdc_state:
+                    if compare_replay_data(idx, idx_two):
+                        idx_found = True
+                if not idx_found:
+                    found = False
+                    break
+            if not found:
+                self._fail_and_print_sets(before_cdc_state, after_cdc_state,
+                                          'Found CDC index in before not matched in after (non-CDC write test)')
+
+            # Now we confirm we don't have anything that showed up in 2nd not accounted for in 1st
+            orphan_found = False
+            for idx_two in after_cdc_state:
+                index_found = False
+                for idx in before_cdc_state:
+                    if compare_replay_data(idx_two, idx):
+                        index_found = True
+                if not index_found:
+                    orphan_found = True
+                    break
+            if orphan_found:
+                self._fail_and_print_sets(before_cdc_state, after_cdc_state,
+                                          'Found orphaned index file in after CDC state not in former.')
+
+    def _fail_and_print_sets(self, rd_one, rd_two, msg):
+        print('Set One:')
+        for idx in rd_one:
+            print('   {},{},{},{}'.format(idx.name, idx.completed, idx.offset, idx.log_name))
+        print('Set Two:')
+        for idx_two in rd_two:
+            print('   {},{},{},{}'.format(idx_two.name, idx_two.completed, idx_two.offset, idx_two.log_name))
+        self.fail(msg)
 
     def _init_new_loading_node(self, ks_name, create_stmt, use_thrift=False):
         loading_node = Node(
@@ -470,8 +539,8 @@ class TestCDC(Tester):
         logger.debug('creating new table')
         loading_session.execute(create_stmt)
         logger.debug('stopping new node')
-        loading_node.stop()
         loading_session.cluster.shutdown()
+        loading_node.stop()
         return loading_node
 
     def test_cdc_data_available_in_cdc_raw(self):
@@ -502,15 +571,23 @@ class TestCDC(Tester):
         generation_session.execute(cdc_table_info.create_stmt)
 
         # insert 10000 rows
-        inserted_rows = _insert_rows(generation_session, cdc_table_info.name, cdc_table_info.insert_stmt, repeat((), 10000))
+        inserted_rows = _insert_rows(generation_session, cdc_table_info.name, cdc_table_info.insert_stmt,
+                                     repeat((), 10000))
 
-        # drain the node to guarantee all cl segements will be recycled
+        # drain the node to guarantee all cl segments will be recycled
         logger.debug('draining')
         generation_node.drain()
         logger.debug('stopping')
         # stop the node and clean up all sessions attached to it
-        generation_node.stop()
         generation_session.cluster.shutdown()
+        generation_node.stop()
+
+        # We can rely on the existing _cdc.idx files to determine which .log files contain cdc data.
+        source_path = os.path.join(generation_node.get_path(), 'cdc_raw')
+        source_cdc_indexes = {ReplayData.load(source_path, name)
+                              for name in source_path if name.endswith('_cdc.idx')}
+        # assertNotEqual(source_cdc_indexes, {})
+        assert_not_equal(source_cdc_indexes, {})
 
         # create a new node to use for cdc_raw cl segment replay
         loading_node = self._init_new_loading_node(ks_name, cdc_table_info.create_stmt, self.cluster.version() < '4')
@@ -519,7 +596,7 @@ class TestCDC(Tester):
         # node again to trigger commitlog replay, which should replay the
         # cdc_raw files we moved to commitlogs into memtables.
         logger.debug('moving cdc_raw and restarting node')
-        _move_contents(
+        _move_commitlog_segments(
             os.path.join(generation_node.get_path(), 'cdc_raw'),
             os.path.join(loading_node.get_path(), 'commitlogs')
         )
@@ -536,7 +613,100 @@ class TestCDC(Tester):
         logger.debug('found {cdc} values in CDC table'.format(
             cdc=len(data_in_cdc_table_after_restart)
         ))
+
         # Then we assert that the CDC data that we expect to be there is there.
         # All data that was in CDC tables should have been copied to cdc_raw,
         # then used in commitlog replay, so it should be back in the cluster.
-        assert inserted_rows == data_in_cdc_table_after_restart, 'not all expected data selected'
+        assert_equal(inserted_rows,
+                     data_in_cdc_table_after_restart,
+                     # The message on failure is too long, since cdc_data is thousands
+                     # of items, so we print something else here
+                     msg='not all expected data selected')
+
+        if self.cluster.version() >= '4.0':
+            # Create ReplayData objects for each index file found in loading cluster
+            loading_path = os.path.join(loading_node.get_path(), 'cdc_raw')
+            dest_cdc_indexes = [ReplayData.load(loading_path, name)
+                                for name in os.listdir(loading_path) if name.endswith('_cdc.idx')]
+
+            # Compare source replay data to dest to ensure replay process created both hard links and index files.
+            for srd in source_cdc_indexes:
+                # Confirm both log and index are in dest
+                assert_true(os.path.isfile(os.path.join(loading_path, srd.idx_name)))
+                assert_true(os.path.isfile(os.path.join(loading_path, srd.log_name)))
+
+                # Find dest ReplayData that corresponds to the source (should be exactly 1)
+                corresponding_dest_replay_datae = [x for x in dest_cdc_indexes
+                                                   if srd.idx_name == x.idx_name]
+                assert_length_equal(corresponding_dest_replay_datae, 1)
+                drd = corresponding_dest_replay_datae[0]
+
+                # We can't compare equality on offsets since replay uses the raw file length as the written
+                # cdc offset. We *can*, however, confirm that the offset in the replayed file is >=
+                # the source file, ensuring clients are signaled to replay at least all the data in the
+                # log.
+                assert_greater_equal(drd.offset, srd.offset)
+
+                # Confirm completed flag is the same in both
+                assert_equal(srd.completed, drd.completed)
+
+            # Confirm that the relationship between index files on the source
+            # and destination looks like we expect.
+            # First, grab the mapping between the two, make sure it's a 1-1
+            # mapping, and transform the dict to reflect that:
+            src_to_dest_idx_map = {
+                src_rd: [dest_rd for dest_rd in dest_cdc_indexes
+                         if dest_rd.idx_name == src_rd.idx_name]
+                for src_rd in source_cdc_indexes
+            }
+            for src_rd, dest_rds in src_to_dest_idx_map.items():
+                assert_length_equal(dest_rds, 1)
+                src_to_dest_idx_map[src_rd] = dest_rds[0]
+            # All offsets in idx files that were copied should be >0 on the
+            # destination node.
+            assert_not_in(
+                0, {i.offset for i in src_to_dest_idx_map.values()},
+                ('Found index offsets == 0 in an index file on the '
+                 'destination node that corresponds to an index file on the '
+                 'source node:\n'
+                 '{}').format(pformat(src_to_dest_idx_map))
+            )
+            # Offsets of all shared indexes should be >= on the destination
+            # than on the source.
+            for src_rd, dest_rd in src_to_dest_idx_map.items():
+                assert_greater_equal(dest_rd.offset, src_rd.offset)
+
+            src_to_dest_idx_map = {
+                src_rd: [dest_rd for dest_rd in dest_cdc_indexes
+                         if dest_rd.idx_name == src_rd.idx_name]
+                for src_rd in source_cdc_indexes
+            }
+            for k, v in src_to_dest_idx_map.items():
+                assert_length_equal(v, 1)
+                assert_greater_equal(k.offset, v.offset)
+
+
+def compare_replay_data(rd_one, rd_two):
+    return rd_one.idx_name == rd_two.idx_name and \
+           rd_one.completed == rd_two.completed and \
+           rd_one.offset == rd_two.offset and \
+           rd_one.log_name == rd_two.log_name
+
+
+class ReplayData(namedtuple('ReplayData', ['idx_name', 'completed', 'offset', 'log_name'])):
+    """
+    Replay data class containing data from a _cdc.idx file. Build one with the load method.
+    """
+
+    @classmethod
+    def load(cls, path, name):
+        assert '_cdc' in name, 'expected to find _cdc in passed in index name. Did not: ' + name
+        with open(os.path.join(path, name), 'r') as f:
+            offset, completed = [line.strip() for line in f.readlines()]
+
+        return cls(
+            idx_name=name,
+            completed=completed,
+            offset=int(offset),
+            log_name=re.sub('_cdc.idx', '.log', name)
+        )

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/19a17fa9/cqlsh_tests/cqlsh_tools.py
----------------------------------------------------------------------
diff --git a/cqlsh_tests/cqlsh_tools.py b/cqlsh_tests/cqlsh_tools.py
index d7aca97..762d360 100644
--- a/cqlsh_tests/cqlsh_tools.py
+++ b/cqlsh_tests/cqlsh_tools.py
@@ -3,6 +3,10 @@ import random
 
 import cassandra
 
+from cassandra.cluster import ResultSet
+from nose.tools import assert_true
+from typing import List
+
 
 class DummyColorMap(object):
 
@@ -89,3 +93,29 @@ def unmonkeypatch_driver(cache):
 
     if hasattr(cassandra, 'deserializers'):
         cassandra.deserializers.DesDateType = cache['DesDateType']
+
+
+def assert_resultset_contains(got: ResultSet, expected: List[tuple]) -> None:
+    """
+    So this is slow. I would hope a ResultSet has the capability of pulling data by PK or clustering,
+    however I'm not finding it atm. As such, this method isn't intended for use with large datasets.
+    :param got: ResultSet, expect schema of [a, b]
+    :param expected: list of tuples with 2 members corresponding with a/b schema of ResultSet
+    """
+    # Adding a touch of sanity check so people don't mis-use this. n^2 is bad.
+    assert len(expected) <= 1000, 'This is a slow comparison method. Don\'t use for > 1000 tuples.'
+
+    # First quick check: if we have a different count, we can just die.
+    assert_true(len(got.current_rows) == len(expected))
+
+    for t in expected:
+        assert len(t) == 2, 'Got unexpected tuple len. Expected 2, got tuple: {}'.format(t)
+        found = False
+        for row in got.current_rows:
+            if found:
+                break
+            if row.a == t[0] and row.b == t[1]:
+                found = True
+        assert_true(found, 'Failed to find expected row: {}'.format(t))
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org