You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/10/08 19:29:55 UTC
[cassandra] branch cassandra-3.0 updated: Allow max protocol
version to be capped
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 0388d89 Allow max protocol version to be capped
0388d89 is described below
commit 0388d89e29393d0b1f50baa24848bc8cb0a7c9a3
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Jul 9 12:51:16 2019 +0100
Allow max protocol version to be capped
Patch by Sam Tunnicliffe; reviewed by Alex Petrov and Aleksey
Yeschenko for CASSANDRA-15193
---
CHANGES.txt | 1 +
NEWS.txt | 5 +
bin/cqlsh.py | 47 +++---
pylib/cqlshlib/test/cassconnect.py | 8 +-
pylib/cqlshlib/test/test_cqlsh_completion.py | 2 +-
pylib/cqlshlib/test/test_cqlsh_output.py | 86 +++++------
src/java/org/apache/cassandra/config/Config.java | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 20 +++
.../org/apache/cassandra/db/SystemKeyspace.java | 43 ++++++
.../apache/cassandra/service/CassandraDaemon.java | 10 ++
.../cassandra/service/NativeTransportService.java | 19 +++
.../apache/cassandra/service/StorageService.java | 21 ++-
.../cassandra/service/StorageServiceMBean.java | 3 +
.../cassandra/transport/ConfiguredLimit.java | 117 +++++++++++++++
src/java/org/apache/cassandra/transport/Frame.java | 10 +-
.../org/apache/cassandra/transport/Message.java | 9 +-
.../cassandra/transport/ProtocolVersionLimit.java | 27 ++++
.../org/apache/cassandra/transport/Server.java | 22 ++-
.../apache/cassandra/transport/SimpleClient.java | 4 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 49 +++++-
.../cassandra/transport/DynamicLimitTest.java | 111 ++++++++++++++
.../cassandra/transport/ProtocolErrorTest.java | 8 +-
.../transport/ProtocolNegotiationTest.java | 166 +++++++++++++++++++++
.../cassandra/transport/ProtocolTestHelper.java | 95 ++++++++++++
24 files changed, 794 insertions(+), 91 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ca6ea2e..925a90a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.19
+ * Add ability to cap max negotiable protocol version (CASSANDRA-15193)
* Gossip tokens on startup if available (CASSANDRA-15335)
* Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340)
* Fix merge which reverted CASSANDRA-14993 (CASSANDRA-15289)
diff --git a/NEWS.txt b/NEWS.txt
index 704fde1..c03284b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -49,6 +49,11 @@ Upgrading
---------
- repair_session_max_tree_depth setting has been added to cassandra.yaml to allow operators to reduce
merkle tree size if repair is creating too much heap pressure. See CASSANDRA-14096 for details.
+ - native_transport_max_negotiable_protocol_version has been added to cassandra.yaml to allow operators to
+ enforce an upper limit on the version of the native protocol that servers will negotiate with clients.
+ This can be used during upgrades from 2.1 to 3.0 to prevent errors due to incompatible paging state formats
+ between the two versions. See CASSANDRA-15193 for details.
+
3.0.18
======
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 1f1fa47..08b026c 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -174,8 +174,6 @@ from cqlshlib.util import get_file_encoding_bomsize, trim_if_present
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 9042
-DEFAULT_CQLVER = '3.4.0'
-DEFAULT_PROTOCOL_VERSION = 4
DEFAULT_CONNECT_TIMEOUT_SECONDS = 5
DEFAULT_REQUEST_TIMEOUT_SECONDS = 10
@@ -216,9 +214,13 @@ parser.add_option('--debug', action='store_true',
parser.add_option("--encoding", help="Specify a non-default encoding for output." +
" (Default: %s)" % (UTF8,))
parser.add_option("--cqlshrc", help="Specify an alternative cqlshrc file location.")
-parser.add_option('--cqlversion', default=DEFAULT_CQLVER,
- help='Specify a particular CQL version (default: %default).'
+parser.add_option('--cqlversion', default=None,
+ help='Specify a particular CQL version, '
+ 'by default the highest version supported by the server will be used.'
' Examples: "3.0.3", "3.1.0"')
+parser.add_option("--protocol-version", type="int", default=None,
+ help='Specify a specific protcol version otherwise the client will default and downgrade as necessary')
+
parser.add_option("-e", "--execute", help='Execute the statement and quit.')
parser.add_option("--connect-timeout", default=DEFAULT_CONNECT_TIMEOUT_SECONDS, dest='connect_timeout',
help='Specify the connection timeout in seconds (default: %default seconds).')
@@ -704,7 +706,7 @@ class Shell(cmd.Cmd):
def __init__(self, hostname, port, color=False,
username=None, password=None, encoding=None, stdin=None, tty=True,
completekey=DEFAULT_COMPLETEKEY, browser=None, use_conn=None,
- cqlver=DEFAULT_CQLVER, keyspace=None,
+ cqlver=None, keyspace=None,
tracing_enabled=False, expand_enabled=False,
no_compact=False,
display_nanotime_format=DEFAULT_NANOTIME_FORMAT,
@@ -716,7 +718,7 @@ class Shell(cmd.Cmd):
ssl=False,
single_statement=None,
request_timeout=DEFAULT_REQUEST_TIMEOUT_SECONDS,
- protocol_version=DEFAULT_PROTOCOL_VERSION,
+ protocol_version=None,
connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS):
cmd.Cmd.__init__(self, completekey=completekey)
self.hostname = hostname
@@ -735,15 +737,19 @@ class Shell(cmd.Cmd):
if use_conn:
self.conn = use_conn
else:
- self.conn = Cluster(contact_points=(self.hostname,), port=self.port, cql_version=cqlver,
- protocol_version=protocol_version,
+ kwargs = {}
+ if protocol_version is not None:
+ kwargs['protocol_version'] = protocol_version
+ if cqlver is not None:
+ kwargs['cql_version'] = cqlver
+ self.conn = Cluster(contact_points=(self.hostname,), port=self.port,
auth_provider=self.auth_provider, no_compact=no_compact,
ssl_options=sslhandling.ssl_settings(hostname, CONFIG_FILE) if ssl else None,
load_balancing_policy=WhiteListRoundRobinPolicy([self.hostname]),
control_connection_timeout=connect_timeout,
- connect_timeout=connect_timeout)
+ connect_timeout=connect_timeout,
+ **kwargs)
self.owns_connection = not use_conn
- self.set_expanded_cql_version(cqlver)
if keyspace:
self.session = self.conn.connect(keyspace)
@@ -767,6 +773,7 @@ class Shell(cmd.Cmd):
self.session.row_factory = ordered_dict_factory
self.session.default_consistency_level = cassandra.ConsistencyLevel.ONE
self.get_connection_versions()
+ self.set_expanded_cql_version(self.connection_versions['cql'])
self.current_keyspace = keyspace
@@ -877,9 +884,9 @@ class Shell(cmd.Cmd):
result, = self.session.execute("select * from system.local where key = 'local'")
vers = {
'build': result['release_version'],
- 'protocol': result['native_protocol_version'],
'cql': result['cql_version'],
}
+ vers['protocol'] = self.conn.protocol_version
self.connection_versions = vers
def get_keyspace_names(self):
@@ -1933,9 +1940,9 @@ class Shell(cmd.Cmd):
direction = parsed.get_binding('dir').upper()
if direction == 'FROM':
- task = ImportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION, CONFIG_FILE)
+ task = ImportTask(self, ks, table, columns, fname, opts, self.conn.protocol_version, CONFIG_FILE)
elif direction == 'TO':
- task = ExportTask(self, ks, table, columns, fname, opts, DEFAULT_PROTOCOL_VERSION, CONFIG_FILE)
+ task = ExportTask(self, ks, table, columns, fname, opts, self.conn.protocol_version, CONFIG_FILE)
else:
raise SyntaxError("Unknown direction %s" % direction)
@@ -2495,7 +2502,8 @@ def read_options(cmdlineargs, environment):
optvalues.encoding = option_with_default(configs.get, 'ui', 'encoding', UTF8)
optvalues.tty = option_with_default(configs.getboolean, 'ui', 'tty', sys.stdin.isatty())
- optvalues.cqlversion = option_with_default(configs.get, 'cql', 'version', DEFAULT_CQLVER)
+ optvalues.cqlversion = option_with_default(configs.get, 'cql', 'version', None)
+ optvalues.protocol_version = option_with_default(configs.getint, 'protocol', 'version', None)
optvalues.connect_timeout = option_with_default(configs.getint, 'connection', 'timeout', DEFAULT_CONNECT_TIMEOUT_SECONDS)
optvalues.request_timeout = option_with_default(configs.getint, 'connection', 'request_timeout', DEFAULT_REQUEST_TIMEOUT_SECONDS)
optvalues.execute = None
@@ -2539,11 +2547,11 @@ def read_options(cmdlineargs, environment):
else:
options.color = should_use_color()
- options.cqlversion, cqlvertup = full_cql_version(options.cqlversion)
- if cqlvertup[0] < 3:
- parser.error('%r is not a supported CQL version.' % options.cqlversion)
- else:
- options.cqlmodule = cql3handling
+ if options.cqlversion is not None:
+ options.cqlversion, cqlvertup = full_cql_version(options.cqlversion)
+ if cqlvertup[0] < 3:
+ parser.error('%r is not a supported CQL version.' % options.cqlversion)
+ options.cqlmodule = cql3handling
try:
port = int(port)
@@ -2647,6 +2655,7 @@ def main(options, hostname, port):
tty=options.tty,
completekey=options.completekey,
browser=options.browser,
+ protocol_version=options.protocol_version,
cqlver=options.cqlversion,
keyspace=options.keyspace,
no_compact=options.no_compact,
diff --git a/pylib/cqlshlib/test/cassconnect.py b/pylib/cqlshlib/test/cassconnect.py
index 71f7565..501850c 100644
--- a/pylib/cqlshlib/test/cassconnect.py
+++ b/pylib/cqlshlib/test/cassconnect.py
@@ -24,15 +24,13 @@ from .run_cqlsh import run_cqlsh, call_cqlsh
test_keyspace_init = os.path.join(rundir, 'test_keyspace_init.cql')
-def get_cassandra_connection(cql_version=cqlsh.DEFAULT_CQLVER):
- if cql_version is None:
- cql_version = cqlsh.DEFAULT_CQLVER
+def get_cassandra_connection(cql_version=None):
conn = cql((TEST_HOST,), TEST_PORT, cql_version=cql_version, load_balancing_policy=policy)
# until the cql lib does this for us
conn.cql_version = cql_version
return conn
-def get_cassandra_cursor(cql_version=cqlsh.DEFAULT_CQLVER):
+def get_cassandra_cursor(cql_version=None):
return get_cassandra_connection(cql_version=cql_version).cursor()
TEST_KEYSPACES_CREATED = []
@@ -83,7 +81,7 @@ def remove_db():
c.execute('DROP KEYSPACE %s' % quote_name(TEST_KEYSPACES_CREATED.pop(-1)))
@contextlib.contextmanager
-def cassandra_connection(cql_version=cqlsh.DEFAULT_CQLVER):
+def cassandra_connection(cql_version=None):
"""
Make a Cassandra CQL connection with the given CQL version and get a cursor
for it, and optionally connect to a given keyspace.
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index e736ea7..75198b6 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -42,7 +42,7 @@ completion_separation_re = re.compile(r'\s+')
class CqlshCompletionCase(BaseTestCase):
def setUp(self):
- self.cqlsh_runner = testrun_cqlsh(cqlver=cqlsh.DEFAULT_CQLVER, env={'COLUMNS': '100000'})
+ self.cqlsh_runner = testrun_cqlsh(cqlver=None, env={'COLUMNS': '100000'})
self.cqlsh = self.cqlsh_runner.__enter__()
def tearDown(self):
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index d905095..50849d4 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -67,13 +67,6 @@ class TestCqlshOutput(BaseTestCase):
'Actually got: %s\ncolor code: %s'
% (tags, coloredtext.colored_version(), coloredtext.colortags()))
- def assertCqlverQueriesGiveColoredOutput(self, queries_and_expected_outputs,
- cqlver=(cqlsh.DEFAULT_CQLVER,), **kwargs):
- if not isinstance(cqlver, (tuple, list)):
- cqlver = (cqlver,)
- for ver in cqlver:
- self.assertQueriesGiveColoredOutput(queries_and_expected_outputs, cqlver=ver, **kwargs)
-
def assertQueriesGiveColoredOutput(self, queries_and_expected_outputs, **kwargs):
"""
Allow queries and expected output to be specified in structured tuples,
@@ -133,7 +126,7 @@ class TestCqlshOutput(BaseTestCase):
self.assertHasColors(c.read_to_next_prompt())
def test_count_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
('select count(*) from has_all_types;', """
count
MMMMM
@@ -198,7 +191,7 @@ class TestCqlshOutput(BaseTestCase):
(1 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
q = 'select COUNT(*) FROM twenty_rows_composite_table limit 1000000;'
self.assertQueriesGiveColoredOutput((
@@ -214,10 +207,10 @@ class TestCqlshOutput(BaseTestCase):
(1 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_static_cf_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select a, b from twenty_rows_table where a in ('1', '13', '2');", """
a | b
RR MM
@@ -234,7 +227,7 @@ class TestCqlshOutput(BaseTestCase):
(3 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
self.assertQueriesGiveColoredOutput((
('select * from dynamic_columns;', """
@@ -257,11 +250,11 @@ class TestCqlshOutput(BaseTestCase):
(5 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_empty_cf_output(self):
# we print the header after CASSANDRA-6910
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
('select * from empty_table;', """
lonelykey | lonelycol
RRRRRRRRR MMMMMMMMM
@@ -270,7 +263,7 @@ class TestCqlshOutput(BaseTestCase):
(0 rows)
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
q = 'select * from has_all_types where num = 999;'
@@ -284,7 +277,7 @@ class TestCqlshOutput(BaseTestCase):
(0 rows)
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_columnless_key_output(self):
q = "select a from twenty_rows_table where a in ('1', '2', '-9192');"
@@ -304,10 +297,10 @@ class TestCqlshOutput(BaseTestCase):
(2 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_numeric_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
('''select intcol, bigintcol, varintcol \
from has_all_types \
where num in (0, 1, 2, 3, 4);''', """
@@ -353,7 +346,7 @@ class TestCqlshOutput(BaseTestCase):
(5 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_timestamp_output(self):
self.assertQueriesGiveColoredOutput((
@@ -390,7 +383,7 @@ class TestCqlshOutput(BaseTestCase):
pass
def test_boolean_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
('select num, booleancol from has_all_types where num in (0, 1, 2, 3);', """
num | booleancol
RRR MMMMMMMMMM
@@ -409,11 +402,11 @@ class TestCqlshOutput(BaseTestCase):
(4 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_null_output(self):
# column with metainfo but no values
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select k, c, notthere from undefined_values_table where k in ('k1', 'k2');", """
k | c | notthere
R M MMMMMMMM
@@ -428,7 +421,7 @@ class TestCqlshOutput(BaseTestCase):
(2 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
# all-columns, including a metainfo column has no values (cql3)
self.assertQueriesGiveColoredOutput((
@@ -446,10 +439,10 @@ class TestCqlshOutput(BaseTestCase):
(2 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_string_output_ascii(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select * from ascii_with_special_chars where k in (0, 1, 2, 3);", r"""
k | val
R MMM
@@ -468,7 +461,7 @@ class TestCqlshOutput(BaseTestCase):
(4 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_string_output_utf8(self):
# many of these won't line up visually here, to keep the source code
@@ -477,7 +470,7 @@ class TestCqlshOutput(BaseTestCase):
# terminals, but the color-checking machinery here will still treat
# it as one character, so those won't seem to line up visually either.
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select * from utf8_with_special_chars where k in (0, 1, 2, 3, 4, 5, 6);", u"""
k | val
R MMM
@@ -502,10 +495,10 @@ class TestCqlshOutput(BaseTestCase):
(7 rows)
nnnnnnnn
""".encode('utf-8')),
- ), cqlver=cqlsh.DEFAULT_CQLVER, env={'LANG': 'en_US.UTF-8'})
+ ), env={'LANG': 'en_US.UTF-8'})
def test_blob_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select num, blobcol from has_all_types where num in (0, 1, 2, 3);", r"""
num | blobcol
RRR MMMMMMM
@@ -524,10 +517,10 @@ class TestCqlshOutput(BaseTestCase):
(4 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_prompt(self):
- with testrun_cqlsh(tty=True, keyspace=None, cqlver=cqlsh.DEFAULT_CQLVER) as c:
+ with testrun_cqlsh(tty=True, keyspace=None) as c:
self.assertTrue(c.output_header.splitlines()[-1].endswith('cqlsh> '))
c.send('\n')
@@ -559,8 +552,7 @@ class TestCqlshOutput(BaseTestCase):
"RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR")
def test_describe_keyspace_output(self):
- fullcqlver = cqlsh.DEFAULT_CQLVER
- with testrun_cqlsh(tty=True, cqlver=fullcqlver) as c:
+ with testrun_cqlsh(tty=True) as c:
ks = get_keyspace()
qks = quote_name(ks)
for cmd in ('describe keyspace', 'desc keyspace'):
@@ -568,7 +560,7 @@ class TestCqlshOutput(BaseTestCase):
for semicolon in ('', ';'):
fullcmd = cmd + (' ' if givename else '') + givename + semicolon
desc = c.cmd_and_response(fullcmd)
- self.check_describe_keyspace_output(desc, givename or qks, fullcqlver)
+ self.check_describe_keyspace_output(desc, givename or qks)
# try to actually execute that last keyspace description, with a
# new keyspace name
@@ -577,7 +569,7 @@ class TestCqlshOutput(BaseTestCase):
statements = split_cql_commands(copy_desc)
do_drop = True
- with cassandra_cursor(cql_version=fullcqlver) as curs:
+ with cassandra_cursor() as curs:
try:
for stmt in statements:
cqlshlog.debug('TEST EXEC: %s' % stmt)
@@ -587,7 +579,7 @@ class TestCqlshOutput(BaseTestCase):
if do_drop:
curs.execute('drop keyspace %s' % quote_name(new_ks_name))
- def check_describe_keyspace_output(self, output, qksname, fullcqlver):
+ def check_describe_keyspace_output(self, output, qksname):
expected_bits = [r'(?im)^CREATE KEYSPACE %s WITH\b' % re.escape(qksname),
r';\s*$',
r'\breplication = {\'class\':']
@@ -635,7 +627,7 @@ class TestCqlshOutput(BaseTestCase):
""" % quote_name(get_keyspace()))
- with testrun_cqlsh(tty=True, cqlver=cqlsh.DEFAULT_CQLVER) as c:
+ with testrun_cqlsh(tty=True) as c:
for cmdword in ('describe table', 'desc columnfamily'):
for semicolon in (';', ''):
output = c.cmd_and_response('%s has_all_types%s' % (cmdword, semicolon))
@@ -653,7 +645,7 @@ class TestCqlshOutput(BaseTestCase):
ks = get_keyspace()
- with testrun_cqlsh(tty=True, keyspace=None, cqlver=cqlsh.DEFAULT_CQLVER) as c:
+ with testrun_cqlsh(tty=True, keyspace=None) as c:
# when not in a keyspace
for cmdword in ('DESCRIBE COLUMNFAMILIES', 'desc tables'):
@@ -704,7 +696,7 @@ class TestCqlshOutput(BaseTestCase):
\n
'''
- with testrun_cqlsh(tty=True, keyspace=None, cqlver=cqlsh.DEFAULT_CQLVER) as c:
+ with testrun_cqlsh(tty=True, keyspace=None) as c:
# not in a keyspace
for semicolon in ('', ';'):
@@ -792,7 +784,7 @@ class TestCqlshOutput(BaseTestCase):
pass
def test_user_types_output(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select addresses from users;", r"""
addresses
MMMMMMMMM
@@ -807,8 +799,8 @@ class TestCqlshOutput(BaseTestCase):
(2 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
- self.assertCqlverQueriesGiveColoredOutput((
+ ))
+ self.assertQueriesGiveColoredOutput((
("select phone_numbers from users;", r"""
phone_numbers
MMMMMMMMMMMMM
@@ -823,10 +815,10 @@ class TestCqlshOutput(BaseTestCase):
(2 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
def test_user_types_with_collections(self):
- self.assertCqlverQueriesGiveColoredOutput((
+ self.assertQueriesGiveColoredOutput((
("select info from songs;", r"""
info
MMMM
@@ -839,8 +831,8 @@ class TestCqlshOutput(BaseTestCase):
(1 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
- self.assertCqlverQueriesGiveColoredOutput((
+ ))
+ self.assertQueriesGiveColoredOutput((
("select tags from songs;", r"""
tags
MMMM
@@ -853,4 +845,4 @@ class TestCqlshOutput(BaseTestCase):
(1 rows)
nnnnnnnn
"""),
- ), cqlver=cqlsh.DEFAULT_CQLVER)
+ ))
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 830d3e1..bc3e3bf 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -156,7 +156,7 @@ public class Config
public boolean native_transport_flush_in_batches_legacy = true;
public volatile long native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
public volatile long native_transport_max_concurrent_requests_in_bytes = -1L;
-
+ public Integer native_transport_max_negotiable_protocol_version = Integer.MIN_VALUE;
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8417c39..a161a2a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.*;
@@ -764,6 +765,20 @@ public class DatabaseDescriptor
throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
}
+ // If max protocol version has been set, just validate it's within an acceptable range
+ if (conf.native_transport_max_negotiable_protocol_version != Integer.MIN_VALUE)
+ {
+ if (conf.native_transport_max_negotiable_protocol_version < Server.MIN_SUPPORTED_VERSION
+ || conf.native_transport_max_negotiable_protocol_version > Server.CURRENT_VERSION)
+ {
+ throw new ConfigurationException(String.format("Invalid setting for native_transport_max_negotiable_version (%d); " +
+ "Values between %s and %s are supported",
+ conf.native_transport_max_negotiable_protocol_version,
+ Server.MIN_SUPPORTED_VERSION,
+ Server.CURRENT_VERSION));
+ }
+ }
+
if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
throw new ConfigurationException("max_value_size_in_mb must be positive", false);
else if (conf.max_value_size_in_mb >= 2048)
@@ -1525,6 +1540,11 @@ public class DatabaseDescriptor
return conf.native_transport_flush_in_batches_legacy;
}
+ public static int getNativeProtocolMaxVersionOverride()
+ {
+ return conf.native_transport_max_negotiable_protocol_version;
+ }
+
public static double getCommitLogSyncBatchWindow()
{
return conf.commitlog_sync_batch_window_in_ms;
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 541dd34..7c222dd 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -716,6 +716,18 @@ public final class SystemKeyspace
return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value));
}
+ public static void updatePeerReleaseVersion(final InetAddress ep, final Object value, Runnable postUpdateTask, ExecutorService executorService)
+ {
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
+ return;
+
+ String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
+ executorService.execute(() -> {
+ executeInternal(String.format(req, PEERS, "release_version"), ep, value);
+ postUpdateTask.run();
+ });
+ }
+
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
{
// with 30 day TTL
@@ -812,6 +824,37 @@ public final class SystemKeyspace
}
/**
+ * Return a map of IP address to C* version. If an invalid version string, or no version
+ * at all is stored for a given peer IP, then NULL_VERSION will be reported for that peer
+ */
+ public static Map<InetAddress, CassandraVersion> loadPeerVersions()
+ {
+ Map<InetAddress, CassandraVersion> releaseVersionMap = new HashMap<>();
+ for (UntypedResultSet.Row row : executeInternal("SELECT peer, release_version FROM system." + PEERS))
+ {
+ InetAddress peer = row.getInetAddress("peer");
+ if (row.has("release_version"))
+ {
+ try
+ {
+ releaseVersionMap.put(peer, new CassandraVersion(row.getString("release_version")));
+ }
+ catch (IllegalArgumentException e)
+ {
+ logger.info("Invalid version string found for {}", peer);
+ releaseVersionMap.put(peer, NULL_VERSION);
+ }
+ }
+ else
+ {
+ logger.info("No version string found for {}", peer);
+ releaseVersionMap.put(peer, NULL_VERSION);
+ }
+ }
+ return releaseVersionMap;
+ }
+
+ /**
* Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself.
*
* @param ep endpoint address to check
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index ad4a344..cc8b2ae 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -668,6 +668,16 @@ public class CassandraDaemon
return nativeTransportService != null ? nativeTransportService.isRunning() : false;
}
+ public int getMaxNativeProtocolVersion()
+ {
+ return nativeTransportService.getMaxProtocolVersion();
+ }
+
+ public void refreshMaxNativeProtocolVersion()
+ {
+ if (nativeTransportService != null)
+ nativeTransportService.refreshMaxNegotiableProtocolVersion();
+ }
/**
* A convenience method to stop and destroy the daemon in one shot.
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 2280818..587f781 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -33,6 +33,7 @@ import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.transport.ConfiguredLimit;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.Server;
@@ -48,6 +49,7 @@ public class NativeTransportService
private boolean initialized = false;
private EventLoopGroup workerGroup;
+ private ConfiguredLimit protocolVersionLimit;
/**
* Creates netty thread pools and event loops.
@@ -69,12 +71,15 @@ public class NativeTransportService
logger.info("Netty using Java NIO event loop");
}
+ protocolVersionLimit = ConfiguredLimit.newLimit();
+
int nativePort = DatabaseDescriptor.getNativeTransportPort();
int nativePortSSL = DatabaseDescriptor.getNativeTransportPortSSL();
InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder()
.withEventLoopGroup(workerGroup)
+ .withProtocolVersionLimit(protocolVersionLimit)
.withHost(nativeAddr);
if (!DatabaseDescriptor.getClientEncryptionOptions().enabled)
@@ -137,6 +142,20 @@ public class NativeTransportService
Message.Dispatcher.shutdown();
}
+ public int getMaxProtocolVersion()
+ {
+ return protocolVersionLimit.getMaxVersion();
+ }
+
+ public void refreshMaxNegotiableProtocolVersion()
+ {
+ // lowering the max negotiable protocol version is only safe if we haven't already
+ // allowed clients to connect with a higher version. This still allows the max
+ // version to be raised, as that is safe.
+ if (initialized)
+ protocolVersionLimit.updateMaxSupportedVersion();
+ }
+
/**
* @return intend to use epoll bassed event looping
*/
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2af7fb7..8c29601 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -442,6 +442,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return daemon.isNativeTransportRunning();
}
+ public int getMaxNativeProtocolVersion()
+ {
+ if (daemon == null)
+ {
+ throw new IllegalStateException("No configured daemon");
+ }
+ return daemon.getMaxNativeProtocolVersion();
+ }
+
+ private void refreshMaxNativeProtocolVersion()
+ {
+ if (daemon != null)
+ {
+ daemon.refreshMaxNativeProtocolVersion();
+ }
+ }
+
public void stopTransports()
{
if (isInitialized())
@@ -1797,7 +1814,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
switch (state)
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value, executor);
+ SystemKeyspace.updatePeerReleaseVersion(endpoint, value.value, this::refreshMaxNativeProtocolVersion, executor);
break;
case DC:
updateTopology(endpoint);
@@ -1874,7 +1891,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
switch (entry.getKey())
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value, executor);
+ SystemKeyspace.updatePeerReleaseVersion(endpoint, entry.getValue().value, this::refreshMaxNativeProtocolVersion, executor);
break;
case DC:
SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value, executor);
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ddd2da0..e22b094 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -619,4 +619,7 @@ public interface StorageServiceMBean extends NotificationEmitter
* @return true if the node successfully starts resuming. (this does not mean bootstrap streaming was success.)
*/
public boolean resumeBootstrap();
+
+ /** Returns the max version that this node will negotiate for native protocol connections */
+ public int getMaxNativeProtocolVersion();
}
diff --git a/src/java/org/apache/cassandra/transport/ConfiguredLimit.java b/src/java/org/apache/cassandra/transport/ConfiguredLimit.java
new file mode 100644
index 0000000..98518b8
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ConfiguredLimit.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.utils.CassandraVersion;
+
+public abstract class ConfiguredLimit implements ProtocolVersionLimit
+{
+ private static final Logger logger = LoggerFactory.getLogger(ConfiguredLimit.class);
+ static final String DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE = "cassandra.disable_max_protocol_auto_override";
+ static final CassandraVersion MIN_VERSION_FOR_V4 = new CassandraVersion("3.0.0");
+
+ public abstract int getMaxVersion();
+ public abstract void updateMaxSupportedVersion();
+
+ public static ConfiguredLimit newLimit()
+ {
+ if (Boolean.getBoolean(DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE))
+ return new StaticLimit(Server.CURRENT_VERSION);
+
+ int fromConfig = DatabaseDescriptor.getNativeProtocolMaxVersionOverride();
+ return fromConfig != Integer.MIN_VALUE
+ ? new StaticLimit(fromConfig)
+ : new DynamicLimit(Server.CURRENT_VERSION);
+ }
+
+ private static class StaticLimit extends ConfiguredLimit
+ {
+ private final int maxVersion;
+ private StaticLimit(int maxVersion)
+ {
+ if (maxVersion < Server.MIN_SUPPORTED_VERSION || maxVersion > Server.CURRENT_VERSION)
+ throw new IllegalArgumentException(String.format("Invalid max protocol version supplied (%s); " +
+ "Values between %s and %s are supported",
+ maxVersion,
+ Server.MIN_SUPPORTED_VERSION,
+ Server.CURRENT_VERSION));
+ this.maxVersion = maxVersion;
+ logger.info("Native transport max negotiable version statically limited to {}", maxVersion);
+ }
+
+ public int getMaxVersion()
+ {
+ return maxVersion;
+ }
+
+ public void updateMaxSupportedVersion()
+ {
+ // statically configured, so this is a no-op
+ }
+ }
+
+ private static class DynamicLimit extends ConfiguredLimit
+ {
+ private volatile int maxVersion;
+ private DynamicLimit(int initialLimit)
+ {
+ maxVersion = initialLimit;
+ maybeUpdateVersion(true);
+ }
+
+ public int getMaxVersion()
+ {
+ return maxVersion;
+ }
+
+ public void updateMaxSupportedVersion()
+ {
+ maybeUpdateVersion(false);
+ }
+
+ private void maybeUpdateVersion(boolean allowLowering)
+ {
+ boolean enforceV3Cap = SystemKeyspace.loadPeerVersions()
+ .values()
+ .stream()
+ .anyMatch(v -> v.compareTo(MIN_VERSION_FOR_V4) < 0);
+
+ if (!enforceV3Cap)
+ {
+ maxVersion = Server.CURRENT_VERSION;
+ return;
+ }
+
+ if (maxVersion > Server.VERSION_3 && !allowLowering)
+ {
+ logger.info("Detected peers which do not fully support protocol V4, but V4 was previously negotiable. " +
+ "Not enforcing cap as this can cause issues for older client versions. After the next " +
+ "restart the server will apply the cap");
+ return;
+ }
+ logger.info("Detected peers which do not fully support protocol V4. Capping max negotiable version to V3");
+ maxVersion = Server.VERSION_3;
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index c28be9f..a07551f 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -145,10 +145,12 @@ public class Frame
private int tooLongStreamId;
private final Connection.Factory factory;
+ private final ProtocolVersionLimit versionCap;
- public Decoder(Connection.Factory factory)
+ public Decoder(Connection.Factory factory, ProtocolVersionLimit versionCap)
{
this.factory = factory;
+ this.versionCap = versionCap;
}
@Override
@@ -175,10 +177,10 @@ public class Frame
int firstByte = buffer.getByte(idx++);
Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
int version = firstByte & PROTOCOL_VERSION_MASK;
- if (version < Server.MIN_SUPPORTED_VERSION || version > Server.CURRENT_VERSION)
+ if (version < Server.MIN_SUPPORTED_VERSION || version > versionCap.getMaxVersion())
throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d",
- version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION),
- version);
+ version, Server.MIN_SUPPORTED_VERSION, versionCap.getMaxVersion()),
+ version < Server.MIN_SUPPORTED_VERSION ? version : null);
// Wait until we have the complete header
if (readableBytes < Header.LENGTH)
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 08a8600..5202578 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -322,11 +322,18 @@ public abstract class Message
@ChannelHandler.Sharable
public static class ProtocolEncoder extends MessageToMessageEncoder<Message>
{
+ private final ProtocolVersionLimit versionCap;
+
+ ProtocolEncoder(ProtocolVersionLimit versionCap)
+ {
+ this.versionCap = versionCap;
+ }
+
public void encode(ChannelHandlerContext ctx, Message message, List results)
{
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
// The only case the connection can be null is when we send the initial STARTUP message (client side thus)
- int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion();
+ int version = connection == null ? versionCap.getMaxVersion() : connection.getVersion();
EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class);
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java b/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java
new file mode 100644
index 0000000..c476efb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+@FunctionalInterface
+public interface ProtocolVersionLimit
+{
+ public int getMaxVersion();
+
+ public static final ProtocolVersionLimit SERVER_DEFAULT = () -> Server.CURRENT_VERSION;
+}
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 83a676c..012b326 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -87,11 +87,14 @@ public class Server implements CassandraDaemon.Server
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private EventLoopGroup workerGroup;
+ private final ProtocolVersionLimit protocolVersionLimit;
private Server (Builder builder)
{
this.socket = builder.getSocket();
this.useSSL = builder.useSSL;
+ this.protocolVersionLimit = builder.getProtocolVersionLimit();
+
if (builder.workerGroup != null)
{
workerGroup = builder.workerGroup;
@@ -188,6 +191,7 @@ public class Server implements CassandraDaemon.Server
private InetAddress hostAddr;
private int port = -1;
private InetSocketAddress socket;
+ private ProtocolVersionLimit versionLimit;
public Builder withSSL(boolean useSSL)
{
@@ -215,6 +219,19 @@ public class Server implements CassandraDaemon.Server
return this;
}
+ public Builder withProtocolVersionLimit(ProtocolVersionLimit limit)
+ {
+ this.versionLimit = limit;
+ return this;
+ }
+
+ ProtocolVersionLimit getProtocolVersionLimit()
+ {
+ if (versionLimit == null)
+ throw new IllegalArgumentException("Missing protocol version limiter");
+ return versionLimit;
+ }
+
public Server build()
{
return new Server(this);
@@ -327,7 +344,6 @@ public class Server implements CassandraDaemon.Server
{
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
- private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
private static final Frame.Compressor frameCompressor = new Frame.Compressor();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
@@ -355,14 +371,14 @@ public class Server implements CassandraDaemon.Server
//pipeline.addLast("debug", new LoggingHandler());
- pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
+ pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory, server.protocolVersionLimit));
pipeline.addLast("frameEncoder", frameEncoder);
pipeline.addLast("frameDecompressor", frameDecompressor);
pipeline.addLast("frameCompressor", frameCompressor);
pipeline.addLast("messageDecoder", messageDecoder);
- pipeline.addLast("messageEncoder", messageEncoder);
+ pipeline.addLast("messageEncoder", new Message.ProtocolEncoder(server.protocolVersionLimit));
pipeline.addLast("executor", new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher(),
EndpointPayloadTracker.get(((InetSocketAddress) channel.remoteAddress()).getAddress())));
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 7916deb..7d34d98 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -251,7 +251,7 @@ public class SimpleClient implements Closeable
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
- private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
+ private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder(ProtocolVersionLimit.SERVER_DEFAULT);
private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
private static final Frame.Compressor frameCompressor = new Frame.Compressor();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
@@ -274,7 +274,7 @@ public class SimpleClient implements Closeable
channel.attr(Connection.attributeKey).set(connection);
ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory));
+ pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory, ProtocolVersionLimit.SERVER_DEFAULT));
pipeline.addLast("frameEncoder", frameEncoder);
pipeline.addLast("frameDecompressor", frameDecompressor);
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 999404e..95366c2 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.ConfiguredLimit;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -88,6 +89,7 @@ public abstract class CQLTester
private static org.apache.cassandra.transport.Server server;
protected static final int nativePort;
protected static final InetAddress nativeAddr;
+ protected static ConfiguredLimit protocolVersionLimit;
private static final Map<Integer, Cluster> clusters = new HashMap<>();
private static final Map<Integer, Session> sessions = new HashMap<>();
@@ -330,11 +332,43 @@ public abstract class CQLTester
if (server != null)
return;
+ prepareNetwork();
+ initializeNetwork();
+ }
+
+ protected static void prepareNetwork()
+ {
SystemKeyspace.finishStartup();
StorageService.instance.initServer();
SchemaLoader.startGossiper();
+ }
+
+ protected static void reinitializeNetwork()
+ {
+ if (server != null && server.isRunning())
+ {
+ server.stop();
+ server = null;
+ }
+ List<CloseFuture> futures = new ArrayList<>();
+ for (Cluster cluster : clusters.values())
+ futures.add(cluster.closeAsync());
+ for (Session session : sessions.values())
+ futures.add(session.closeAsync());
+ FBUtilities.waitOnFutures(futures);
+ clusters.clear();
+ sessions.clear();
+
+ initializeNetwork();
+ }
- server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
+ private static void initializeNetwork()
+ {
+ protocolVersionLimit = ConfiguredLimit.newLimit();
+ server = new Server.Builder().withHost(nativeAddr)
+ .withPort(nativePort)
+ .withProtocolVersionLimit(protocolVersionLimit)
+ .build();
ClientMetrics.instance.init(Collections.singleton(server));
server.start();
@@ -343,9 +377,12 @@ public abstract class CQLTester
if (clusters.containsKey(version))
continue;
+ if (version > protocolVersionLimit.getMaxVersion())
+ continue;
+
Cluster cluster = Cluster.builder()
.addContactPoints(nativeAddr)
- .withClusterName("Test Cluster")
+ .withClusterName("Test Cluster-v" + version)
.withPort(nativePort)
.withProtocolVersion(ProtocolVersion.fromInt(version))
.build();
@@ -356,6 +393,14 @@ public abstract class CQLTester
}
}
+ protected void updateMaxNegotiableProtocolVersion()
+ {
+ if (protocolVersionLimit == null)
+ throw new IllegalStateException("Native transport server has not been initialized");
+
+ protocolVersionLimit.updateMaxSupportedVersion();
+ }
+
protected void dropPerTestKeyspace() throws Throwable
{
execute(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_PER_TEST));
diff --git a/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java b/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java
new file mode 100644
index 0000000..83a0dd9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+
+import static org.apache.cassandra.transport.ProtocolTestHelper.cleanupPeers;
+import static org.apache.cassandra.transport.ProtocolTestHelper.setStaticLimitInConfig;
+import static org.apache.cassandra.transport.ProtocolTestHelper.setupPeer;
+import static org.apache.cassandra.transport.ProtocolTestHelper.updatePeerInfo;
+import static org.junit.Assert.assertEquals;
+
+public class DynamicLimitTest
+{
+ @BeforeClass
+ public static void setup()
+ {
+ CQLTester.prepareServer();
+ }
+
+ @Test
+ public void disableDynamicLimitWithSystemProperty() throws Throwable
+ {
+ // Dynamic limiting of the max negotiable protocol version can be
+ // disabled with a system property
+
+ // ensure that no static limit is configured
+ setStaticLimitInConfig(null);
+
+ // set the property which disables dynamic limiting
+ System.setProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE, "true");
+ // insert a legacy peer into system.peers and also
+ InetAddress peer = null;
+ try
+ {
+ peer = setupPeer("127.1.0.1", "2.2.0");
+ ConfiguredLimit limit = ConfiguredLimit.newLimit();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ // clearing the property after the limit has been returned has no effect
+ System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE);
+ limit.updateMaxSupportedVersion();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ // a new limit should now be dynamic
+ limit = ConfiguredLimit.newLimit();
+ assertEquals(Server.VERSION_3, limit.getMaxVersion());
+ }
+ finally
+ {
+ System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE);
+ cleanupPeers(peer);
+ }
+ }
+
+ @Test
+ public void disallowLoweringMaxVersion() throws Throwable
+ {
+ // Lowering the max version once connections have been established is a problem
+ // for some clients. So for a dynamic limit, if notifications of peer versions
+ // trigger a change to the max version, it's only allowed to increase the max
+ // negotiable version
+
+ InetAddress peer = null;
+ try
+ {
+ // ensure that no static limit is configured
+ setStaticLimitInConfig(null);
+ ConfiguredLimit limit = ConfiguredLimit.newLimit();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ peer = setupPeer("127.1.0.1", "3.0.0");
+ limit.updateMaxSupportedVersion();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ // learn that peer doesn't actually fully support V4, behaviour should remain the same
+ updatePeerInfo(peer, "2.2.0");
+ limit.updateMaxSupportedVersion();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ // finally learn that peer2 has been upgraded, just for completeness
+ updatePeerInfo(peer, "3.3.0");
+ limit.updateMaxSupportedVersion();
+ assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
+
+ } finally {
+ cleanupPeers(peer);
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
index 599087c..e212c4c 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -43,7 +43,7 @@ public class ProtocolErrorTest {
public void testInvalidProtocolVersion(int version) throws Exception
{
- Frame.Decoder dec = new Frame.Decoder(null);
+ Frame.Decoder dec = new Frame.Decoder(null, ProtocolVersionLimit.SERVER_DEFAULT);
List<Object> results = new ArrayList<>();
byte[] frame = new byte[] {
@@ -71,7 +71,7 @@ public class ProtocolErrorTest {
public void testInvalidProtocolVersionShortFrame() throws Exception
{
// test for CASSANDRA-11464
- Frame.Decoder dec = new Frame.Decoder(null);
+ Frame.Decoder dec = new Frame.Decoder(null, ProtocolVersionLimit.SERVER_DEFAULT);
List<Object> results = new ArrayList<>();
byte[] frame = new byte[] {
@@ -93,7 +93,7 @@ public class ProtocolErrorTest {
@Test
public void testInvalidDirection() throws Exception
{
- Frame.Decoder dec = new Frame.Decoder(null);
+ Frame.Decoder dec = new Frame.Decoder(null, ProtocolVersionLimit.SERVER_DEFAULT);
List<Object> results = new ArrayList<>();
// should generate a protocol exception for using a response frame with
@@ -124,7 +124,7 @@ public class ProtocolErrorTest {
@Test
public void testBodyLengthOverLimit() throws Exception
{
- Frame.Decoder dec = new Frame.Decoder(null);
+ Frame.Decoder dec = new Frame.Decoder(null, ProtocolVersionLimit.SERVER_DEFAULT);
List<Object> results = new ArrayList<>();
byte[] frame = new byte[] {
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
new file mode 100644
index 0000000..91c1d6a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ProtocolVersion;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+
+import static org.apache.cassandra.transport.ProtocolTestHelper.cleanupPeers;
+import static org.apache.cassandra.transport.ProtocolTestHelper.setStaticLimitInConfig;
+import static org.apache.cassandra.transport.ProtocolTestHelper.setupPeer;
+import static org.apache.cassandra.transport.ProtocolTestHelper.updatePeerInfo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProtocolNegotiationTest extends CQLTester
+{
+ // to avoid JMX naming clashes between cluster metrics
+ private int clusterId = 0;
+
+ @BeforeClass
+ public static void setup()
+ {
+ prepareNetwork();
+ }
+
+ @Before
+ public void clearConfig()
+ {
+ setStaticLimitInConfig(null);
+ }
+
+ @Test
+ public void serverSupportsV3AndV4ByDefault() throws Throwable
+ {
+ reinitializeNetwork();
+ // client can explicitly request either V3 or V4
+ testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
+ testConnection(ProtocolVersion.V4, ProtocolVersion.V4);
+
+ // if not specified, V4 is the default
+ testConnection(null, ProtocolVersion.V4);
+ }
+
+ @Test
+ public void testStaticLimit() throws Throwable
+ {
+ try
+ {
+ reinitializeNetwork();
+ // No limit enforced to start
+ assertEquals(Integer.MIN_VALUE, DatabaseDescriptor.getNativeProtocolMaxVersionOverride());
+ testConnection(null, ProtocolVersion.V4);
+
+ // Update DatabaseDescriptor, then re-initialise the server to force it to read it
+ setStaticLimitInConfig(ProtocolVersion.V3.toInt());
+ reinitializeNetwork();
+ assertEquals(3, DatabaseDescriptor.getNativeProtocolMaxVersionOverride());
+ testConnection(ProtocolVersion.V4, ProtocolVersion.V3);
+ testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
+ testConnection(null, ProtocolVersion.V3);
+ } finally {
+ setStaticLimitInConfig(null);
+ }
+ }
+
+ @Test
+ public void testDynamicLimit() throws Throwable
+ {
+ InetAddress peer1 = setupPeer("127.1.0.1", "2.2.0");
+ InetAddress peer2 = setupPeer("127.1.0.2", "2.2.0");
+ InetAddress peer3 = setupPeer("127.1.0.3", "2.2.0");
+ reinitializeNetwork();
+ try
+ {
+ // legacy peers means max negotiable version is V3
+ testConnection(ProtocolVersion.V4, ProtocolVersion.V3);
+ testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
+ testConnection(null, ProtocolVersion.V3);
+
+ // receive notification that 2 peers have upgraded to a version that fully supports V4
+ updatePeerInfo(peer1, "3.0.0");
+ updatePeerInfo(peer2, "3.0.0");
+ updateMaxNegotiableProtocolVersion();
+ // version should still be capped
+ testConnection(ProtocolVersion.V4, ProtocolVersion.V3);
+ testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
+ testConnection(null, ProtocolVersion.V3);
+
+ // no legacy peers so V4 is negotiable
+ // after the last peer upgrades, cap should be lifted
+ updatePeerInfo(peer3, "3.0.0");
+ updateMaxNegotiableProtocolVersion();
+ testConnection(ProtocolVersion.V4, ProtocolVersion.V4);
+ testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
+ testConnection(null, ProtocolVersion.V4);
+ } finally {
+ cleanupPeers(peer1, peer2, peer3);
+ }
+ }
+
+ private void testConnection(com.datastax.driver.core.ProtocolVersion requestedVersion,
+ com.datastax.driver.core.ProtocolVersion expectedVersion)
+ {
+ long start = System.nanoTime();
+ boolean expectError = requestedVersion != null && requestedVersion != expectedVersion;
+ Cluster.Builder builder = Cluster.builder()
+ .addContactPoints(nativeAddr)
+ .withClusterName("Test Cluster" + clusterId++)
+ .withPort(nativePort);
+
+ if (requestedVersion != null)
+ builder = builder.withProtocolVersion(requestedVersion) ;
+
+ Cluster cluster = builder.build();
+ logger.info("Setting up cluster took {}ms", TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+ start = System.nanoTime();
+ try {
+ cluster.connect();
+ if (expectError)
+ fail("Expected a protocol exception");
+ }
+ catch (Exception e)
+ {
+ if (!expectError)
+ {
+ e.printStackTrace();
+ fail("Did not expect any exception");
+ }
+
+ assertTrue(e.getMessage().contains(String.format("Host does not support protocol version %s but %s", requestedVersion, expectedVersion)));
+ } finally {
+ logger.info("Testing connection took {}ms", TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+ start = System.nanoTime();
+ cluster.closeAsync();
+ logger.info("Tearing down cluster connection took {}ms", TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+
+ }
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolTestHelper.java b/test/unit/org/apache/cassandra/transport/ProtocolTestHelper.java
new file mode 100644
index 0000000..90a2801
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ProtocolTestHelper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ProtocolTestHelper
+{
+ static ExecutorService executor = MoreExecutors.newDirectExecutorService();
+ static InetAddress setupPeer(String address, String version) throws Throwable
+ {
+ InetAddress peer = peer(address);
+ updatePeerInfo(peer, version);
+ return peer;
+ }
+
+ static void updatePeerInfo(InetAddress peer, String version) throws Throwable
+ {
+ SystemKeyspace.updatePeerInfo(peer, "release_version", version, executor);
+ }
+
+ static InetAddress peer(String address)
+ {
+ try
+ {
+ return InetAddress.getByName(address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Error creating peer", e);
+ }
+ }
+
+ static void cleanupPeers(InetAddress...peers) throws Throwable
+ {
+ for (InetAddress peer : peers)
+ if (peer != null)
+ SystemKeyspace.removeEndpoint(peer);
+ }
+
+ static void setStaticLimitInConfig(Integer version)
+ {
+ try
+ {
+ Field field = FBUtilities.getProtectedField(DatabaseDescriptor.class, "conf");
+ ((Config)field.get(null)).native_transport_max_negotiable_protocol_version = version == null ? Integer.MIN_VALUE : version;
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("Error setting native_transport_max_protocol_version on Config", e);
+ }
+ }
+
+ static VersionedValue releaseVersion(String versionString)
+ {
+ try
+ {
+ Constructor<VersionedValue> ctor = VersionedValue.class.getDeclaredConstructor(String.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(versionString);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error constructing VersionedValue for release version", e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org