You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2022/11/09 11:04:26 UTC

[cassandra] branch cassandra-4.1 updated: Restore streaming_keep_alive_period functionality on the netty control streaming channel

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new c100d34a1a Restore streaming_keep_alive_period functionality on the netty control streaming channel
c100d34a1a is described below

commit c100d34a1a1d497eee85ca10c5cc1e43eb4871c4
Author: Aleksey Yeschenko <al...@apache.org>
AuthorDate: Wed Nov 2 17:30:57 2022 +0000

    Restore streaming_keep_alive_period functionality on the netty control streaming channel
    
     patch by Aleksey Yeschenko, Mick Semb Wever; reviewed by Mick Semb Wever, Berenguer Blasi for CASSANDRA-17768
    
    Co-authored-by: Aleksey Yeschenko <aleksey@apache.org
    Co-authored-by: Mick Semb Wever <mc...@apache.org>
---
 CHANGES.txt                                        |  1 +
 conf/cassandra.yaml                                | 14 ++--
 .../cassandra/net/InboundConnectionInitiator.java  |  7 +-
 .../apache/cassandra/streaming/StreamSession.java  |  6 +-
 .../async/StreamingMultiplexedChannel.java         | 74 +++++++++++++++++++++-
 .../streaming/messages/KeepAliveMessage.java       | 30 ++++-----
 .../streaming/messages/KeepAliveMessageTest.java   | 33 +++-------
 7 files changed, 114 insertions(+), 51 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 10b4ac7016..ffa03b7322 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1-beta2
+ * Restore streaming_keep_alive_period on the netty control streaming channel (CASSANDRA-17768)
  * Move Schema.FORCE_LOAD_KEYSPACES and Schema.FORCE_LOAD_KEYSPACES_PROP to CassandraRelevantProps (CASSANDRA-17783)
  * Add --resolve-ip option to nodetool gossipinfo (CASSANDRA-17934)
  * Allow pre-V5 global limit on bytes in flight to revert to zero asynchronously in RateLimitingTest (CASSANDRA-17927)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 92addc5ae3..ef7cc72605 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1145,12 +1145,14 @@ slow_query_log_timeout: 500ms
 # since this is a requirement for general correctness of last write wins.
 # internode_timeout: true
 
-# Set keep-alive period for streaming
-# This node will send a keep-alive message periodically with this period.
-# If the node does not receive a keep-alive message from the peer for
-# 2 keep-alive cycles the stream session times out and fail
-# Default value is 300s (5 minutes), which means stalled stream
-# times out in 10 minutes by default
+# Set period for idle state control messages for earlier detection of failed streams
+# This node will send a keep-alive message periodically on the streaming's control channel.
+# This ensures that any eventual SocketTimeoutException will occur within 2 keep-alive cycles
+# If the node cannot send, or timeouts sending, the keep-alive message on the netty control channel
+# the stream session is closed.
+# Default value is 300s (5 minutes), which means stalled streams
+# are detected within 10 minutes
+# Specify 0 to disable.
 # Min unit: s
 # streaming_keep_alive_period: 300s
 
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index 7ac479fd9f..807d0262d8 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -452,7 +452,12 @@ public class InboundConnectionInitiator
             }
 
             BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
-            NettyStreamingChannel streamingChannel = new NettyStreamingChannel(current_version, channel, StreamingChannel.Kind.CONTROL);
+
+            // we can't infer the type of streaming connection at this point,
+            // so we use CONTROL unconditionally; it's ugly but does what we want
+            // (establishes an AsyncStreamingInputPlus)
+            NettyStreamingChannel streamingChannel =
+                new NettyStreamingChannel(current_version, channel, StreamingChannel.Kind.CONTROL);
             pipeline.replace(this, "streamInbound", streamingChannel);
             executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", from, channel.id()),
                                           new StreamDeserializingTask(null, streamingChannel, current_version));
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2da7021968..a540a1b6d0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -682,11 +682,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         if (e instanceof SocketTimeoutException)
         {
-            logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
-                         "If not, maybe try increasing streaming_keep_alive_period.", planId(),
+            logger.error("[Stream #{}] Timeout from peer {}{}. Is peer down? " +
+                         "If not, and earlier failure detection is required enable (or lower) streaming_keep_alive_period.",
+                         planId(),
                          hostAddressAndPort(channel.peer()),
                          channel.peer().equals(channel.connectedTo()) ? "" : " through " + hostAddressAndPort(channel.connectedTo()),
-                         2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
                          e);
         }
         else
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 38277e6741..560fee9ad2 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -36,6 +37,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future; // checkstyle: permit this import
 import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.StreamDeserializingTask;
@@ -43,6 +45,7 @@ import org.apache.cassandra.streaming.StreamingChannel;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -166,9 +169,12 @@ public class StreamingMultiplexedChannel
         StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
         executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
                                       new StreamDeserializingTask(session, channel, messagingVersion));
+
         session.attachInbound(channel);
         session.attachOutbound(channel);
 
+        scheduleKeepAliveTask(channel);
+
         logger.debug("Creating control {}", channel.description());
         return channel;
     }
@@ -246,7 +252,7 @@ public class StreamingMultiplexedChannel
      *
      * Note: this is called from the netty event loop.
      *
-     * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate
+     * @return null if the message was processed successfully; else, a {@link java.util.concurrent.Future} to indicate
      * the status of aborting any remaining tasks in the session.
      */
     Future<?> onMessageComplete(Future<?> future, StreamMessage msg)
@@ -407,6 +413,72 @@ public class StreamingMultiplexedChannel
         }
     }
 
+    /**
+     * Periodically sends the {@link KeepAliveMessage}.
+     * <p>
+     * NOTE: this task, and the callback function are executed in the netty event loop.
+     */
+    class KeepAliveTask implements Runnable
+    {
+        private final StreamingChannel channel;
+
+        /**
+         * A reference to the scheduled task for this instance so that it may be cancelled.
+         */
+        ScheduledFuture<?> future;
+
+        KeepAliveTask(StreamingChannel channel)
+        {
+            this.channel = channel;
+        }
+
+        @Override
+        public void run()
+        {
+            // if the channel has been closed, cancel the scheduled task and return
+            if (!channel.connected() || closed)
+            {
+                if (null != future)
+                    future.cancel(false);
+                return;
+            }
+
+            if (logger.isTraceEnabled())
+                logger.trace("{} Sending keep-alive to {}.", createLogTag(session, channel), session.peer);
+
+            sendControlMessage(new KeepAliveMessage()).addListener(f ->
+            {
+                if (f.isSuccess() || f.isCancelled())
+                    return;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).",
+                                 createLogTag(session, channel), f.cause());
+            });
+        }
+    }
+
+    private void scheduleKeepAliveTask(StreamingChannel channel)
+    {
+        if (!(channel instanceof NettyStreamingChannel))
+            return;
+
+        int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
+        if (keepAlivePeriod <= 0)
+            return;
+
+        if (logger.isDebugEnabled())
+            logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(session, channel), keepAlivePeriod);
+
+        KeepAliveTask task = new KeepAliveTask(channel);
+        ScheduledFuture<?> scheduledFuture =
+            ((NettyStreamingChannel)channel).channel
+                                            .eventLoop()
+                                            .scheduleAtFixedRate(task, keepAlivePeriod, keepAlivePeriod, TimeUnit.SECONDS);
+        task.future = scheduledFuture;
+        channelKeepAlives.add(scheduledFuture);
+    }
+
     /**
      * For testing purposes only.
      */
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index 7333f25bb8..a09cfcae82 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -15,40 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.streaming.messages;
 
-import java.io.IOException;
-
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 public class KeepAliveMessage extends StreamMessage
 {
+
+    public KeepAliveMessage()
+    {
+        super(Type.KEEP_ALIVE);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "keep-alive";
+    }
+
     public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
     {
-        public KeepAliveMessage deserialize(DataInputPlus in, int version) throws IOException
+        public KeepAliveMessage deserialize(DataInputPlus in, int version)
         {
             return new KeepAliveMessage();
         }
 
         public void serialize(KeepAliveMessage message, StreamingDataOutputPlus out, int version, StreamSession session)
-        {}
+        {
+        }
 
         public long serializedSize(KeepAliveMessage message, int version)
         {
             return 0;
         }
     };
-
-    public KeepAliveMessage()
-    {
-        super(Type.KEEP_ALIVE);
-    }
-
-    public String toString()
-    {
-        return "keep-alive";
-    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
similarity index 51%
copy from src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
copy to test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
index 7333f25bb8..7ba637be21 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
@@ -20,35 +20,18 @@ package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
 
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.streaming.StreamingDataOutputPlus;
-import org.apache.cassandra.streaming.StreamSession;
+import org.junit.Test;
 
-public class KeepAliveMessage extends StreamMessage
-{
-    public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
-    {
-        public KeepAliveMessage deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return new KeepAliveMessage();
-        }
-
-        public void serialize(KeepAliveMessage message, StreamingDataOutputPlus out, int version, StreamSession session)
-        {}
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.equalTo;
 
-        public long serializedSize(KeepAliveMessage message, int version)
-        {
-            return 0;
-        }
-    };
+public class KeepAliveMessageTest
+{
 
-    public KeepAliveMessage()
+    @Test
+    public void testSerializedSize() throws IOException
     {
-        super(Type.KEEP_ALIVE);
+        assertThat(StreamMessage.serializedSize(new KeepAliveMessage(), 0), equalTo(1L));
     }
 
-    public String toString()
-    {
-        return "keep-alive";
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org