You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/07/10 21:05:36 UTC

[cassandra] branch trunk updated: Introduce optional timeouts for idle client sessions

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0240a46  Introduce optional timeouts for idle client sessions
0240a46 is described below

commit 0240a4659d761f06f94f8cd97097f2d0ad2d220c
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Wed Jun 26 14:44:15 2019 +0200

    Introduce optional timeouts for idle client sessions
    
    Patch by Alex Petrov, reviewed by Aleksey Yeshchenko for CASSANDRA-11097
---
 conf/cassandra.yaml                                | 10 +++
 src/java/org/apache/cassandra/config/Config.java   |  2 +
 .../cassandra/config/DatabaseDescriptor.java       | 10 +++
 .../org/apache/cassandra/transport/Server.java     | 18 +++++
 .../cassandra/transport/IdleDisconnectTest.java    | 78 ++++++++++++++++++++++
 5 files changed, 118 insertions(+)

diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 3002857..28d86fd 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -683,6 +683,16 @@ native_transport_port: 9042
 # The default is true, which means all supported protocols will be honored.
 native_transport_allow_older_protocols: true
 
+# Controls when idle client connections are closed. Idle connections are ones that had neither reads
+# nor writes for a time period.
+#
+# Clients may implement heartbeats by sending OPTIONS native protocol message after a timeout, which
+# will reset idle timeout timer on the server side. To close idle client connections, corresponding
+# values for heartbeat intervals have to be set on the client side.
+#
+# Idle connection timeouts are disabled by default.
+# native_transport_idle_timeout_in_ms: 60000
+
 # The address or interface to bind the native transport server to.
 #
 # Set rpc_address OR rpc_interface, not both.
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9713ea2..6b487fe 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -87,6 +87,8 @@ public class Config
     /** Triggers automatic allocation of tokens if set, using the replication strategy of the referenced keyspace */
     public String allocate_tokens_for_keyspace = null;
 
+    public long native_transport_idle_timeout_in_ms = 0L;
+
     public volatile long request_timeout_in_ms = 10000L;
 
     public volatile long read_request_timeout_in_ms = 5000L;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3b7009b..bb92716 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1473,6 +1473,16 @@ public class DatabaseDescriptor
         return Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "ssl_storage_port", Integer.toString(conf.ssl_storage_port)));
     }
 
+    public static long nativeTransportIdleTimeout()
+    {
+        return conf.native_transport_idle_timeout_in_ms;
+    }
+
+    public static void setNativeTransportIdleTimeout(long nativeTransportTimeout)
+    {
+        conf.native_transport_idle_timeout_in_ms = nativeTransportTimeout;
+    }
+
     public static long getRpcTimeout(TimeUnit unit)
     {
         return unit.convert(conf.request_timeout_in_ms, MILLISECONDS);
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 33cd0fb..f16aa88 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -41,6 +42,9 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.Version;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.GlobalEventExecutor;
@@ -368,6 +372,20 @@ public class Server implements CassandraDaemon.Server
                 pipeline.addFirst("connectionLimitHandler", connectionLimitHandler);
             }
 
+            long idleTimeout = DatabaseDescriptor.nativeTransportIdleTimeout();
+            if (idleTimeout > 0)
+            {
+                pipeline.addLast("idleStateHandler", new IdleStateHandler(false, 0, 0, idleTimeout, TimeUnit.MILLISECONDS)
+                {
+                    @Override
+                    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt)
+                    {
+                        logger.info("Closing client connection {} after timeout of {}ms", channel.remoteAddress(), idleTimeout);
+                        ctx.close();
+                    }
+                });
+            }
+
             //pipeline.addLast("debug", new LoggingHandler());
 
             pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
diff --git a/test/unit/org/apache/cassandra/transport/IdleDisconnectTest.java b/test/unit/org/apache/cassandra/transport/IdleDisconnectTest.java
new file mode 100644
index 0000000..2c8adea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/IdleDisconnectTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ConsistencyLevel;
+
+public class IdleDisconnectTest extends CQLTester
+{
+    private static final long TIMEOUT = 2000L;
+
+    @BeforeClass
+    public static void setUp()
+    {
+        requireNetwork();
+        DatabaseDescriptor.setNativeTransportIdleTimeout(TIMEOUT);
+    }
+
+    @Test
+    public void testIdleDisconnect() throws Throwable
+    {
+        DatabaseDescriptor.setNativeTransportIdleTimeout(TIMEOUT);
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort))
+        {
+            client.connect(false, false);
+            Assert.assertTrue(client.channel.isOpen());
+            long start = System.currentTimeMillis();
+            CompletableFuture.runAsync(() -> {
+                while (!Thread.currentThread().isInterrupted() && client.channel.isOpen());
+            }).get(30, TimeUnit.SECONDS);
+            Assert.assertFalse(client.channel.isOpen());
+            Assert.assertTrue(System.currentTimeMillis() - start >= TIMEOUT);
+        }
+    }
+
+    @Test
+    public void testIdleDisconnectProlonged() throws Throwable
+    {
+        long sleepTime = 1000;
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort))
+        {
+            client.connect(false, false);
+            Assert.assertTrue(client.channel.isOpen());
+            long start = System.currentTimeMillis();
+            Thread.sleep(sleepTime);
+            client.execute("SELECT * FROM system.peers", ConsistencyLevel.ONE);
+            CompletableFuture.runAsync(() -> {
+                while (!Thread.currentThread().isInterrupted() && client.channel.isOpen());
+            }).get(30, TimeUnit.SECONDS);
+            Assert.assertFalse(client.channel.isOpen());
+            Assert.assertTrue(System.currentTimeMillis() - start >= TIMEOUT + sleepTime);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org