You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/01/19 14:35:59 UTC

[cassandra] branch trunk updated: Fix client notifications in v5

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c9d2258  Fix client notifications in v5
c9d2258 is described below

commit c9d22583d22d566807e76fa10c65af29104ae16c
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Dec 15 17:37:18 2020 +0000

    Fix client notifications in v5
    
    Patch by Sam Tunnicliffe; reviewed by Benjamin Lerer for CASSANDRA-16353
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/transport/Dispatcher.java |  33 +++++-
 .../cassandra/transport/PipelineConfigurator.java  |   5 +
 .../org/apache/cassandra/transport/Server.java     |  37 +++++--
 .../transport/ClientNotificiationsTest.java        | 117 +++++++++++++++++++++
 5 files changed, 185 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 426de4c..09b02d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
  * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
  * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
  * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 65093f3..05b55e8 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -20,15 +20,19 @@ package org.apache.cassandra.transport;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
 
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
+import io.netty.util.AttributeKey;
 import org.apache.cassandra.concurrent.LocalAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Flusher.FlushItem;
 import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.apache.cassandra.transport.messages.EventMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
@@ -61,7 +65,8 @@ public class Dispatcher
         this.useLegacyFlusher = useLegacyFlusher;
     }
 
-    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher) {
+    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher)
+    {
         requestExecutor.submit(() -> processRequest(channel, request, forFlusher));
     }
 
@@ -140,4 +145,30 @@ public class Dispatcher
             requestExecutor.shutdown();
         }
     }
+
+
+    /**
+     * Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy
+     * for delivering events to registered clients is dependent on protocol version and the configuration
+     * of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
+     * wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
+     * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter)}.
+     * It's worth noting that events are not generally fired as a direct response to a client request,
+     * so this flush item has a null request attribute. The dispatcher itself is created when the
+     * pipeline is first configured during protocol negotiation and is attached to the channel for
+     * later retrieval.
+     *
+     * Pre-v5 connections simply write the EventMessage directly to the pipeline.
+     */
+    static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");
+    Consumer<EventMessage> eventDispatcher(final Channel channel,
+                                           final ProtocolVersion version,
+                                           final FrameEncoder.PayloadAllocator allocator)
+    {
+        return eventMessage -> flush(new FlushItem.Framed(channel,
+                                                          eventMessage.encode(version),
+                                                          null,
+                                                          allocator,
+                                                          f -> f.response.release()));
+    }
 }
diff --git a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
index d2a5cad..82865f2 100644
--- a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
+++ b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
@@ -300,6 +300,11 @@ public class PipelineConfigurator
         pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
         pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler);
         pipeline.remove(INITIAL_HANDLER);
+
+        // Handles delivering event messages to registered clients
+        ctx.channel()
+           .attr(Dispatcher.EVENT_DISPATCHER)
+           .set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
         onNegotiationComplete(pipeline);
     }
 
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 2e317e5..5c9e575 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -32,6 +32,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelMatcher;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.GlobalEventExecutor;
@@ -97,7 +98,8 @@ public class Server implements CassandraDaemon.Server
                                                           DatabaseDescriptor.useNativeTransportLegacyFlusher(),
                                                           builder.tlsEncryptionPolicy);
 
-        EventNotifier notifier = new EventNotifier(this);
+        EventNotifier notifier = builder.eventNotifier != null ? builder.eventNotifier : new EventNotifier();
+        notifier.registerConnectionTracker(connectionTracker);
         StorageService.instance.register(notifier);
         Schema.instance.registerListener(notifier);
     }
@@ -177,6 +179,7 @@ public class Server implements CassandraDaemon.Server
         private int port = -1;
         private InetSocketAddress socket;
         private PipelineConfigurator pipelineConfigurator;
+        private EventNotifier eventNotifier;
 
         public Builder withTlsEncryptionPolicy(EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy)
         {
@@ -210,6 +213,12 @@ public class Server implements CassandraDaemon.Server
             return this;
         }
 
+        public Builder withEventNotifier(EventNotifier eventNotifier)
+        {
+            this.eventNotifier = eventNotifier;
+            return this;
+        }
+
         public Server build()
         {
             return new Server(this);
@@ -234,6 +243,11 @@ public class Server implements CassandraDaemon.Server
 
     public static class ConnectionTracker implements Connection.Tracker
     {
+        private static final ChannelMatcher PRE_V5_CHANNEL = channel -> channel.attr(Connection.attributeKey)
+                                                                               .get()
+                                                                               .getVersion()
+                                                                               .isSmallerThan(ProtocolVersion.V5);
+
         // TODO: should we be using the GlobalEventExecutor or defining our own?
         public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
         private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class);
@@ -260,7 +274,16 @@ public class Server implements CassandraDaemon.Server
 
         public void send(Event event)
         {
-            groups.get(event.type).writeAndFlush(new EventMessage(event));
+            ChannelGroup registered = groups.get(event.type);
+            EventMessage message = new EventMessage(event);
+
+            // Deliver event to pre-v5 channels
+            registered.writeAndFlush(message, PRE_V5_CHANNEL);
+
+            // Deliver event to post-v5 channels
+            for (Channel c : registered)
+                if (!PRE_V5_CHANNEL.matches(c))
+                    c.attr(Dispatcher.EVENT_DISPATCHER).get().accept(message);
         }
 
         void closeAll()
@@ -331,9 +354,9 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
+    public static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
     {
-        private final Server server;
+        private ConnectionTracker connectionTracker;
 
         // We keep track of the latest status change events we have sent to avoid sending duplicates
         // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
@@ -342,9 +365,9 @@ public class Server implements CassandraDaemon.Server
         // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
         private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
 
-        private EventNotifier(Server server)
+        private void registerConnectionTracker(ConnectionTracker connectionTracker)
         {
-            this.server = server;
+            this.connectionTracker = connectionTracker;
         }
 
         private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint)
@@ -381,7 +404,7 @@ public class Server implements CassandraDaemon.Server
 
         private void send(Event event)
         {
-            server.connectionTracker.send(event);
+            connectionTracker.send(event);
         }
 
         public void onJoinCluster(InetAddressAndPort endpoint)
diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
new file mode 100644
index 0000000..bd1ec63
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collections;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.RegisterMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class ClientNotificiationsTest extends CQLTester
+{
+    private static Server.EventNotifier notifier = new Server.EventNotifier();
+
+    @Before
+    public void setup()
+    {
+        requireNetwork(builder -> builder.withEventNotifier(notifier));
+    }
+
+    @Parameterized.Parameter(0)
+    public ProtocolVersion version;
+
+    @Parameterized.Parameters(name = "{index}: protocol version={0}")
+    public static Iterable<ProtocolVersion> params()
+    {
+        return ProtocolVersion.SUPPORTED;
+    }
+
+    @Test
+    public void testNotifications() throws Exception
+    {
+        SimpleClient.Builder builder = SimpleClient.builder(nativeAddr.getHostAddress(), nativePort)
+                                                   .protocolVersion(version);
+        if (version.isBeta())
+            builder.useBeta();
+
+        try (SimpleClient client = builder.build())
+        {
+            EventHandler handler = new EventHandler();
+            client.setEventHandler(handler);
+            client.connect(false);
+            client.execute(new RegisterMessage(Collections.singletonList(Event.Type.STATUS_CHANGE)));
+            client.execute(new RegisterMessage(Collections.singletonList(Event.Type.TOPOLOGY_CHANGE)));
+            client.execute(new RegisterMessage(Collections.singletonList(Event.Type.SCHEMA_CHANGE)));
+
+            InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort();
+            InetAddressAndPort nativeAddress = FBUtilities.getBroadcastNativeAddressAndPort();
+
+            // Necessary or else the NEW_NODE notification is deferred (CASSANDRA-11038)
+            // (note: this works because the notifications are for the local address)
+            StorageService.instance.setRpcReady(true);
+
+            notifier.onUp(broadcastAddress);
+            notifier.onDown(broadcastAddress);
+            notifier.onJoinCluster(broadcastAddress);
+            notifier.onMove(broadcastAddress);
+            notifier.onLeaveCluster(broadcastAddress);
+            notifier.onCreateKeyspace("ks");
+            notifier.onAlterKeyspace("ks");
+            notifier.onDropKeyspace("ks");
+
+            handler.assertNextEvent(Event.StatusChange.nodeUp(nativeAddress));
+            handler.assertNextEvent(Event.StatusChange.nodeDown(nativeAddress));
+            handler.assertNextEvent(Event.TopologyChange.newNode(nativeAddress));
+            handler.assertNextEvent(Event.TopologyChange.movedNode(nativeAddress));
+            handler.assertNextEvent(Event.TopologyChange.removedNode(nativeAddress));
+            handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, "ks"));
+            handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, "ks"));
+            handler.assertNextEvent(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, "ks"));
+        }
+    }
+
+    static class EventHandler extends SimpleClient.SimpleEventHandler
+    {
+        public void assertNextEvent(Event expected)
+        {
+            try
+            {
+                Event actual = queue.poll(100, TimeUnit.MILLISECONDS);
+                assertEquals(expected, actual);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(String.format("Expected event %s, but not received withing timeout", expected));
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org