You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/10/25 15:43:51 UTC
[cassandra] branch trunk updated: v4+ protocol did not clean up
client warnings, which caused leaking the state
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8e225c5 v4+ protocol did not clean up client warnings, which caused leaking the state
8e225c5 is described below
commit 8e225c55c49493f00fc9bc0b5809ab026d60c767
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Oct 25 07:28:08 2021 -0700
v4+ protocol did not clean up client warnings, which caused leaking the state
patch by David Capwell; reviewed by Caleb Rackliffe, Jon Meredith, Sam Tunnicliffe for CASSANDRA-17054
---
CHANGES.txt | 1 +
.../org/apache/cassandra/transport/Dispatcher.java | 38 ++++-----
.../transport/InitialConnectionHandler.java | 2 +-
.../org/apache/cassandra/transport/Message.java | 11 ++-
.../distributed/test/JavaDriverUtils.java | 9 +++
.../distributed/test/NativeMixedVersionTest.java | 89 ++++++++++++++++++++++
6 files changed, 130 insertions(+), 20 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 68aeb04..8ef8531 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
* Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
* Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
* Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030)
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 31b750e..3aff2d2 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -82,9 +82,10 @@ public class Dispatcher
}
/**
- * Note: this method may be executed on the netty event loop, during initial protocol negotiation
+ * Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is
+ * responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.).
*/
- static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure)
+ private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure)
{
long queryStartNanoTime = nanoTime();
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
@@ -99,7 +100,7 @@ public class Dispatcher
{
String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.",
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
-
+
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
ClientWarn.instance.warn(message);
}
@@ -107,7 +108,7 @@ public class Dispatcher
{
String message = String.format("Request breached limit(s) on bytes in flight (Endpoint: %d, Global: %d) and triggered backpressure.",
ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
-
+
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
ClientWarn.instance.warn(message);
}
@@ -129,39 +130,42 @@ public class Dispatcher
}
/**
- * Note: this method is not expected to execute on the netty event loop.
+ * Note: this method may be executed on the netty event loop.
*/
- void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+ static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure)
{
- final Message.Response response;
- final ServerConnection connection;
- FlushItem<?> toFlush;
try
{
- assert request.connection() instanceof ServerConnection;
- connection = (ServerConnection) request.connection();
- response = processRequest(connection, request, backpressure);
- toFlush = forFlusher.toFlushItem(channel, request, response);
- Message.logger.trace("Responding: {}, v={}", response, connection.getVersion());
+ return processRequest((ServerConnection) request.connection(), request, backpressure);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
-
+
if (request.isTrackable())
CoordinatorWarnings.done();
-
+
Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
error.setStreamId(request.getStreamId());
error.setWarnings(ClientWarn.instance.getWarnings());
- toFlush = forFlusher.toFlushItem(channel, request, error);
+ return error;
}
finally
{
CoordinatorWarnings.reset();
ClientWarn.instance.resetWarnings();
}
+ }
+
+ /**
+ * Note: this method is not expected to execute on the netty event loop.
+ */
+ void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+ {
+ Message.Response response = processRequest(channel, request, backpressure);
+ FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
+ Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion());
flush(toFlush);
}
diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index e122b6e..75cb72e 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -148,7 +148,7 @@ public class InitialConnectionHandler extends ByteToMessageDecoder
promise = new VoidChannelPromise(ctx.channel(), false);
}
- final Message.Response response = Dispatcher.processRequest((ServerConnection) connection, startup, Overload.NONE);
+ final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE);
outbound = response.encode(inbound.header.version);
ctx.writeAndFlush(outbound, promise);
logger.trace("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0f8002f..c40aa7a 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -330,9 +330,16 @@ public abstract class Message
List<String> warnings = message.getWarnings();
if (warnings != null)
{
+ // if cassandra populates warnings for <= v3 protocol, this is a bug
if (version.isSmallerThan(ProtocolVersion.V4))
- throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4");
- messageSize += CBUtil.sizeOfStringList(warnings);
+ {
+ logger.warn("Warnings present in message with version less than v4 (it is {}); warnings={}", version, warnings);
+ warnings = null;
+ }
+ else
+ {
+ messageSize += CBUtil.sizeOfStringList(warnings);
+ }
}
if (customPayload != null)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
index bc39ba1..c7c478b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.test;
+import com.datastax.driver.core.ProtocolVersion;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
@@ -30,6 +31,11 @@ public final class JavaDriverUtils
public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest)
{
+ return create(dtest, null);
+ }
+
+ public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest, ProtocolVersion version)
+ {
if (dtest.size() == 0)
throw new IllegalArgumentException("Attempted to open java driver for empty cluster");
@@ -45,6 +51,9 @@ public final class JavaDriverUtils
//TODO support auth
dtest.stream().forEach(i -> builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress()));
+ if (version != null)
+ builder.withProtocolVersion(version);
+
return builder.build();
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java
new file mode 100644
index 0000000..b2391e3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeMixedVersionTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.assertj.core.api.Assertions;
+
+public class NativeMixedVersionTest extends TestBaseImpl
+{
+ @Test
+ public void v4ConnectionCleansUpThreadLocalState() throws IOException
+ {
+ // make sure to limit the netty thread pool to size 1, this will make the test determanistic as all work
+ // will happen on the single thread.
+ System.setProperty("io.netty.eventLoopThreads", "1");
+ try (Cluster cluster = Cluster.build(1)
+ .withConfig(c ->
+ c.with(Feature.values())
+ .set("track_warnings", ImmutableMap.of(
+ "enabled", true,
+ "local_read_size", ImmutableMap.of("warn_threshold_kb", 1)
+ ))
+ )
+ .start())
+ {
+ init(cluster);
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck1 int, value blob, PRIMARY KEY (pk, ck1))"));
+ IInvokableInstance node = cluster.get(1);
+
+ ByteBuffer blob = ByteBuffer.wrap("This is just some large string to get some number of bytes".getBytes(StandardCharsets.UTF_8));
+
+ for (int i = 0; i < 100; i++)
+ node.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, ck1, value) VALUES (?, ?, ?)"), 0, i, blob);
+
+ // v4+ process STARTUP message on the netty thread. To make sure we do not leak the ClientWarn state,
+ // make sure a warning will be generated by a query then run on the same threads on the v3 protocol (which
+ // does not support warnings)
+ try (com.datastax.driver.core.Cluster driver = JavaDriverUtils.create(cluster, ProtocolVersion.V5);
+ Session session = driver.connect())
+ {
+ ResultSet rs = session.execute(withKeyspace("SELECT * FROM %s.tbl"));
+ Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isNotEmpty();
+ }
+
+ try (com.datastax.driver.core.Cluster driver = JavaDriverUtils.create(cluster, ProtocolVersion.V3);
+ Session session = driver.connect())
+ {
+ ResultSet rs = session.execute(withKeyspace("SELECT * FROM %s.tbl"));
+ Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+
+ // this should not happen; so make sure no logs are found
+ List<String> result = node.logs().grep("Warnings present in message with version less than").getResult();
+ Assertions.assertThat(result).isEmpty();
+ }
+ finally
+ {
+ System.clearProperty("io.netty.eventLoopThreads");
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org