You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/01/19 14:35:59 UTC
[cassandra] branch trunk updated: Fix client notifications in v5
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9d2258 Fix client notifications in v5
c9d2258 is described below
commit c9d22583d22d566807e76fa10c65af29104ae16c
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Dec 15 17:37:18 2020 +0000
Fix client notifications in v5
Patch by Sam Tunnicliffe; reviewed by Benjamin Lerer for CASSANDRA-16353
---
CHANGES.txt | 1 +
.../org/apache/cassandra/transport/Dispatcher.java | 33 +++++-
.../cassandra/transport/PipelineConfigurator.java | 5 +
.../org/apache/cassandra/transport/Server.java | 37 +++++--
.../transport/ClientNotificiationsTest.java | 117 +++++++++++++++++++++
5 files changed, 185 insertions(+), 8 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 426de4c..09b02d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
* Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
* Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
* Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 65093f3..05b55e8 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -20,15 +20,19 @@ package org.apache.cassandra.transport;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
+import io.netty.util.AttributeKey;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
@@ -61,7 +65,8 @@ public class Dispatcher
this.useLegacyFlusher = useLegacyFlusher;
}
- public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher) {
+ public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher)
+ {
requestExecutor.submit(() -> processRequest(channel, request, forFlusher));
}
@@ -140,4 +145,30 @@ public class Dispatcher
requestExecutor.shutdown();
}
}
+
+
+ /**
+ * Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy
+ * for delivering events to registered clients is dependent on protocol version and the configuration
+ * of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
+ * wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
+ * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter)}.
+ * It's worth noting that events are not generally fired as a direct response to a client request,
+ * so this flush item has a null request attribute. The dispatcher itself is created when the
+ * pipeline is first configured during protocol negotiation and is attached to the channel for
+ * later retrieval.
+ *
+ * Pre-v5 connections simply write the EventMessage directly to the pipeline.
+ */
+ static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");
+ Consumer<EventMessage> eventDispatcher(final Channel channel,
+ final ProtocolVersion version,
+ final FrameEncoder.PayloadAllocator allocator)
+ {
+ return eventMessage -> flush(new FlushItem.Framed(channel,
+ eventMessage.encode(version),
+ null,
+ allocator,
+ f -> f.response.release()));
+ }
}
diff --git a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
index d2a5cad..82865f2 100644
--- a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
+++ b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
@@ -300,6 +300,11 @@ public class PipelineConfigurator
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler);
pipeline.remove(INITIAL_HANDLER);
+
+ // Handles delivering event messages to registered clients
+ ctx.channel()
+ .attr(Dispatcher.EVENT_DISPATCHER)
+ .set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
onNegotiationComplete(pipeline);
}
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 2e317e5..5c9e575 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -32,6 +32,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
@@ -97,7 +98,8 @@ public class Server implements CassandraDaemon.Server
DatabaseDescriptor.useNativeTransportLegacyFlusher(),
builder.tlsEncryptionPolicy);
- EventNotifier notifier = new EventNotifier(this);
+ EventNotifier notifier = builder.eventNotifier != null ? builder.eventNotifier : new EventNotifier();
+ notifier.registerConnectionTracker(connectionTracker);
StorageService.instance.register(notifier);
Schema.instance.registerListener(notifier);
}
@@ -177,6 +179,7 @@ public class Server implements CassandraDaemon.Server
private int port = -1;
private InetSocketAddress socket;
private PipelineConfigurator pipelineConfigurator;
+ private EventNotifier eventNotifier;
public Builder withTlsEncryptionPolicy(EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy)
{
@@ -210,6 +213,12 @@ public class Server implements CassandraDaemon.Server
return this;
}
+ public Builder withEventNotifier(EventNotifier eventNotifier)
+ {
+ this.eventNotifier = eventNotifier;
+ return this;
+ }
+
public Server build()
{
return new Server(this);
@@ -234,6 +243,11 @@ public class Server implements CassandraDaemon.Server
public static class ConnectionTracker implements Connection.Tracker
{
+ private static final ChannelMatcher PRE_V5_CHANNEL = channel -> channel.attr(Connection.attributeKey)
+ .get()
+ .getVersion()
+ .isSmallerThan(ProtocolVersion.V5);
+
// TODO: should we be using the GlobalEventExecutor or defining our own?
public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class);
@@ -260,7 +274,16 @@ public class Server implements CassandraDaemon.Server
public void send(Event event)
{
- groups.get(event.type).writeAndFlush(new EventMessage(event));
+ ChannelGroup registered = groups.get(event.type);
+ EventMessage message = new EventMessage(event);
+
+ // Deliver event to pre-v5 channels
+ registered.writeAndFlush(message, PRE_V5_CHANNEL);
+
+ // Deliver event to post-v5 channels
+ for (Channel c : registered)
+ if (!PRE_V5_CHANNEL.matches(c))
+ c.attr(Dispatcher.EVENT_DISPATCHER).get().accept(message);
}
void closeAll()
@@ -331,9 +354,9 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
+ public static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
{
- private final Server server;
+ private ConnectionTracker connectionTracker;
// We keep track of the latest status change events we have sent to avoid sending duplicates
// since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
@@ -342,9 +365,9 @@ public class Server implements CassandraDaemon.Server
// state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
- private EventNotifier(Server server)
+ private void registerConnectionTracker(ConnectionTracker connectionTracker)
{
- this.server = server;
+ this.connectionTracker = connectionTracker;
}
private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint)
@@ -381,7 +404,7 @@ public class Server implements CassandraDaemon.Server
private void send(Event event)
{
- server.connectionTracker.send(event);
+ connectionTracker.send(event);
}
public void onJoinCluster(InetAddressAndPort endpoint)
diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
new file mode 100644
index 0000000..bd1ec63
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collections;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.RegisterMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class ClientNotificiationsTest extends CQLTester
+{
+ private static Server.EventNotifier notifier = new Server.EventNotifier();
+
+ @Before
+ public void setup()
+ {
+ requireNetwork(builder -> builder.withEventNotifier(notifier));
+ }
+
+ @Parameterized.Parameter(0)
+ public ProtocolVersion version;
+
+ @Parameterized.Parameters(name = "{index}: protocol version={0}")
+ public static Iterable<ProtocolVersion> params()
+ {
+ return ProtocolVersion.SUPPORTED;
+ }
+
+ @Test
+ public void testNotifications() throws Exception
+ {
+ SimpleClient.Builder builder = SimpleClient.builder(nativeAddr.getHostAddress(), nativePort)
+ .protocolVersion(version);
+ if (version.isBeta())
+ builder.useBeta();
+
+ try (SimpleClient client = builder.build())
+ {
+ EventHandler handler = new EventHandler();
+ client.setEventHandler(handler);
+ client.connect(false);
+ client.execute(new RegisterMessage(Collections.singletonList(Event.Type.STATUS_CHANGE)));
+ client.execute(new RegisterMessage(Collections.singletonList(Event.Type.TOPOLOGY_CHANGE)));
+ client.execute(new RegisterMessage(Collections.singletonList(Event.Type.SCHEMA_CHANGE)));
+
+ InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort();
+ InetAddressAndPort nativeAddress = FBUtilities.getBroadcastNativeAddressAndPort();
+
+ // Necessary or else the NEW_NODE notification is deferred (CASSANDRA-11038)
+ // (note: this works because the notifications are for the local address)
+ StorageService.instance.setRpcReady(true);
+
+ notifier.onUp(broadcastAddress);
+ notifier.onDown(broadcastAddress);
+ notifier.onJoinCluster(broadcastAddress);
+ notifier.onMove(broadcastAddress);
+ notifier.onLeaveCluster(broadcastAddress);
+ notifier.onCreateKeyspace("ks");
+ notifier.onAlterKeyspace("ks");
+ notifier.onDropKeyspace("ks");
+
+ handler.assertNextEvent(Event.StatusChange.nodeUp(nativeAddress));
+ handler.assertNextEvent(Event.StatusChange.nodeDown(nativeAddress));
+ handler.assertNextEvent(Event.TopologyChange.newNode(nativeAddress));
+ handler.assertNextEvent(Event.TopologyChange.movedNode(nativeAddress));
+ handler.assertNextEvent(Event.TopologyChange.removedNode(nativeAddress));
+ handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, "ks"));
+ handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, "ks"));
+ handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, "ks"));
+ }
+ }
+
+ static class EventHandler extends SimpleClient.SimpleEventHandler
+ {
+ public void assertNextEvent(Event expected)
+ {
+ try
+ {
+ Event actual = queue.poll(100, TimeUnit.MILLISECONDS);
+ assertEquals(expected, actual);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(String.format("Expected event %s, but not received withing timeout", expected));
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org