You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2022/11/09 11:04:26 UTC
[cassandra] branch cassandra-4.1 updated: Restore streaming_keep_alive_period functionality on the netty control streaming channel
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new c100d34a1a Restore streaming_keep_alive_period functionality on the netty control streaming channel
c100d34a1a is described below
commit c100d34a1a1d497eee85ca10c5cc1e43eb4871c4
Author: Aleksey Yeschenko <al...@apache.org>
AuthorDate: Wed Nov 2 17:30:57 2022 +0000
Restore streaming_keep_alive_period functionality on the netty control streaming channel
patch by Aleksey Yeschenko, Mick Semb Wever; reviewed by Mick Semb Wever, Berenguer Blasi for CASSANDRA-17768
Co-authored-by: Aleksey Yeschenko <aleksey@apache.org
Co-authored-by: Mick Semb Wever <mc...@apache.org>
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 14 ++--
.../cassandra/net/InboundConnectionInitiator.java | 7 +-
.../apache/cassandra/streaming/StreamSession.java | 6 +-
.../async/StreamingMultiplexedChannel.java | 74 +++++++++++++++++++++-
.../streaming/messages/KeepAliveMessage.java | 30 ++++-----
.../streaming/messages/KeepAliveMessageTest.java | 33 +++-------
7 files changed, 114 insertions(+), 51 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 10b4ac7016..ffa03b7322 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1-beta2
+ * Restore streaming_keep_alive_period on the netty control streaming channel (CASSANDRA-17768)
* Move Schema.FORCE_LOAD_KEYSPACES and Schema.FORCE_LOAD_KEYSPACES_PROP to CassandraRelevantProps (CASSANDRA-17783)
* Add --resolve-ip option to nodetool gossipinfo (CASSANDRA-17934)
* Allow pre-V5 global limit on bytes in flight to revert to zero asynchronously in RateLimitingTest (CASSANDRA-17927)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 92addc5ae3..ef7cc72605 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1145,12 +1145,14 @@ slow_query_log_timeout: 500ms
# since this is a requirement for general correctness of last write wins.
# internode_timeout: true
-# Set keep-alive period for streaming
-# This node will send a keep-alive message periodically with this period.
-# If the node does not receive a keep-alive message from the peer for
-# 2 keep-alive cycles the stream session times out and fail
-# Default value is 300s (5 minutes), which means stalled stream
-# times out in 10 minutes by default
+# Set period for idle state control messages for earlier detection of failed streams
+# This node will send a keep-alive message periodically on the streaming's control channel.
+# This ensures that any eventual SocketTimeoutException will occur within 2 keep-alive cycles
+# If the node cannot send, or timeouts sending, the keep-alive message on the netty control channel
+# the stream session is closed.
+# Default value is 300s (5 minutes), which means stalled streams
+# are detected within 10 minutes
+# Specify 0 to disable.
# Min unit: s
# streaming_keep_alive_period: 300s
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index 7ac479fd9f..807d0262d8 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -452,7 +452,12 @@ public class InboundConnectionInitiator
}
BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
- NettyStreamingChannel streamingChannel = new NettyStreamingChannel(current_version, channel, StreamingChannel.Kind.CONTROL);
+
+ // we can't infer the type of streaming connection at this point,
+ // so we use CONTROL unconditionally; it's ugly but does what we want
+ // (establishes an AsyncStreamingInputPlus)
+ NettyStreamingChannel streamingChannel =
+ new NettyStreamingChannel(current_version, channel, StreamingChannel.Kind.CONTROL);
pipeline.replace(this, "streamInbound", streamingChannel);
executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", from, channel.id()),
new StreamDeserializingTask(null, streamingChannel, current_version));
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2da7021968..a540a1b6d0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -682,11 +682,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
if (e instanceof SocketTimeoutException)
{
- logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
- "If not, maybe try increasing streaming_keep_alive_period.", planId(),
+ logger.error("[Stream #{}] Timeout from peer {}{}. Is peer down? " +
+ "If not, and earlier failure detection is required enable (or lower) streaming_keep_alive_period.",
+ planId(),
hostAddressAndPort(channel.peer()),
channel.peer().equals(channel.connectedTo()) ? "" : " through " + hostAddressAndPort(channel.connectedTo()),
- 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
e);
}
else
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 38277e6741..560fee9ad2 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -36,6 +37,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future; // checkstyle: permit this import
import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.StreamDeserializingTask;
@@ -43,6 +45,7 @@ import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -166,9 +169,12 @@ public class StreamingMultiplexedChannel
StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
new StreamDeserializingTask(session, channel, messagingVersion));
+
session.attachInbound(channel);
session.attachOutbound(channel);
+ scheduleKeepAliveTask(channel);
+
logger.debug("Creating control {}", channel.description());
return channel;
}
@@ -246,7 +252,7 @@ public class StreamingMultiplexedChannel
*
* Note: this is called from the netty event loop.
*
- * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate
+ * @return null if the message was processed successfully; else, a {@link java.util.concurrent.Future} to indicate
* the status of aborting any remaining tasks in the session.
*/
Future<?> onMessageComplete(Future<?> future, StreamMessage msg)
@@ -407,6 +413,72 @@ public class StreamingMultiplexedChannel
}
}
+ /**
+ * Periodically sends the {@link KeepAliveMessage}.
+ * <p>
+ * NOTE: this task, and the callback function are executed in the netty event loop.
+ */
+ class KeepAliveTask implements Runnable
+ {
+ private final StreamingChannel channel;
+
+ /**
+ * A reference to the scheduled task for this instance so that it may be cancelled.
+ */
+ ScheduledFuture<?> future;
+
+ KeepAliveTask(StreamingChannel channel)
+ {
+ this.channel = channel;
+ }
+
+ @Override
+ public void run()
+ {
+ // if the channel has been closed, cancel the scheduled task and return
+ if (!channel.connected() || closed)
+ {
+ if (null != future)
+ future.cancel(false);
+ return;
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("{} Sending keep-alive to {}.", createLogTag(session, channel), session.peer);
+
+ sendControlMessage(new KeepAliveMessage()).addListener(f ->
+ {
+ if (f.isSuccess() || f.isCancelled())
+ return;
+
+ if (logger.isDebugEnabled())
+ logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).",
+ createLogTag(session, channel), f.cause());
+ });
+ }
+ }
+
+ private void scheduleKeepAliveTask(StreamingChannel channel)
+ {
+ if (!(channel instanceof NettyStreamingChannel))
+ return;
+
+ int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
+ if (keepAlivePeriod <= 0)
+ return;
+
+ if (logger.isDebugEnabled())
+ logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(session, channel), keepAlivePeriod);
+
+ KeepAliveTask task = new KeepAliveTask(channel);
+ ScheduledFuture<?> scheduledFuture =
+ ((NettyStreamingChannel)channel).channel
+ .eventLoop()
+ .scheduleAtFixedRate(task, keepAlivePeriod, keepAlivePeriod, TimeUnit.SECONDS);
+ task.future = scheduledFuture;
+ channelKeepAlives.add(scheduledFuture);
+ }
+
/**
* For testing purposes only.
*/
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index 7333f25bb8..a09cfcae82 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -15,40 +15,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.streaming.messages;
-import java.io.IOException;
-
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.StreamSession;
public class KeepAliveMessage extends StreamMessage
{
+
+ public KeepAliveMessage()
+ {
+ super(Type.KEEP_ALIVE);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "keep-alive";
+ }
+
public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
{
- public KeepAliveMessage deserialize(DataInputPlus in, int version) throws IOException
+ public KeepAliveMessage deserialize(DataInputPlus in, int version)
{
return new KeepAliveMessage();
}
public void serialize(KeepAliveMessage message, StreamingDataOutputPlus out, int version, StreamSession session)
- {}
+ {
+ }
public long serializedSize(KeepAliveMessage message, int version)
{
return 0;
}
};
-
- public KeepAliveMessage()
- {
- super(Type.KEEP_ALIVE);
- }
-
- public String toString()
- {
- return "keep-alive";
- }
}
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
similarity index 51%
copy from src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
copy to test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
index 7333f25bb8..7ba637be21 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/test/unit/org/apache/cassandra/streaming/messages/KeepAliveMessageTest.java
@@ -20,35 +20,18 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.streaming.StreamingDataOutputPlus;
-import org.apache.cassandra.streaming.StreamSession;
+import org.junit.Test;
-public class KeepAliveMessage extends StreamMessage
-{
- public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
- {
- public KeepAliveMessage deserialize(DataInputPlus in, int version) throws IOException
- {
- return new KeepAliveMessage();
- }
-
- public void serialize(KeepAliveMessage message, StreamingDataOutputPlus out, int version, StreamSession session)
- {}
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.equalTo;
- public long serializedSize(KeepAliveMessage message, int version)
- {
- return 0;
- }
- };
+public class KeepAliveMessageTest
+{
- public KeepAliveMessage()
+ @Test
+ public void testSerializedSize() throws IOException
{
- super(Type.KEEP_ALIVE);
+ assertThat(StreamMessage.serializedSize(new KeepAliveMessage(), 0), equalTo(1L));
}
- public String toString()
- {
- return "keep-alive";
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org