You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/03/05 18:40:35 UTC
[1/2] cassandra git commit: Add ability to limit number of native
connections
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 790256825 -> b1c2536cc
Add ability to limit number of native connections
Patch by Norman Maurer; reviewed by jmckenzie for CASSANDRA-8086
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e56d9efb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e56d9efb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e56d9efb
Branch: refs/heads/cassandra-2.1
Commit: e56d9efb7c18138fac9059207568598bbb964eb9
Parents: 2428b9c
Author: Norman Maurer <no...@apache.org>
Authored: Thu Mar 5 10:37:53 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Mar 5 10:37:53 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 19 ++++
.../apache/cassandra/service/StorageProxy.java | 8 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../transport/ConnectionLimitHandler.java | 111 +++++++++++++++++++
.../org/apache/cassandra/transport/Server.java | 9 ++
8 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f98bb3f..4e34c9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.13:
+ * Add ability to limit number of native connections (CASSANDRA-8086)
* Add offline tool to relevel sstables (CASSANDRA-8301)
* Preserve stream ID for more protocol errors (CASSANDRA-8848)
* Fix combining token() function with multi-column relations on
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 163ae9e..f99ade1 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -332,6 +332,14 @@ native_transport_port: 9042
# be rejected as invalid. The default is 256MB.
# native_transport_max_frame_size_in_mb: 256
+# The maximum number of concurrent client connections.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections: -1
+
+# The maximum number of concurrent client connections per source ip.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections_per_ip: -1
+
# Whether to start the thrift rpc server.
start_rpc: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 4dd71aa..3c223e3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -112,6 +112,8 @@ public class Config
public Integer native_transport_port = 9042;
public Integer native_transport_max_threads = 128;
public Integer native_transport_max_frame_size_in_mb = 256;
+ public volatile Long native_transport_max_concurrent_connections = -1L;
+ public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 286014e..b3b10c1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1108,6 +1108,25 @@ public class DatabaseDescriptor
return conf.native_transport_max_frame_size_in_mb * 1024 * 1024;
}
+ public static Long getNativeTransportMaxConcurrentConnections()
+ {
+ return conf.native_transport_max_concurrent_connections;
+ }
+
+ public static void setNativeTransportMaxConcurrentConnections(long nativeTransportMaxConcurrentConnections)
+ {
+ conf.native_transport_max_concurrent_connections = nativeTransportMaxConcurrentConnections;
+ }
+
+ public static Long getNativeTransportMaxConcurrentConnectionsPerIp() {
+ return conf.native_transport_max_concurrent_connections_per_ip;
+ }
+
+ public static void setNativeTransportMaxConcurrentConnectionsPerIp(long native_transport_max_concurrent_connections_per_ip)
+ {
+ conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
+ }
+
public static double getCommitLogSyncBatchWindow()
{
return conf.commitlog_sync_batch_window_in_ms;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fcc9665..d033929 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2125,9 +2125,15 @@ public class StorageProxy implements StorageProxyMBean
public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
+
+ public Long getNativeTransportMaxConcurrentConnections() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); }
+ public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections); }
+
+ public Long getNativeTransportMaxConcurrentConnectionsPerIp() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); }
+ public void setNativeTransportMaxConcurrentConnectionsPerIp(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections); }
+
public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); }
-
public long getReadRepairAttempted() {
return ReadRepairMetrics.attempted.count();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 203cabe..03b9b58 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -95,6 +95,9 @@ public interface StorageProxyMBean
public Long getTruncateRpcTimeout();
public void setTruncateRpcTimeout(Long timeoutInMillis);
+ public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections);
+ public Long getNativeTransportMaxConcurrentConnections();
+
public void reloadTriggerClasses();
public long getReadRepairAttempted();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
new file mode 100644
index 0000000..c45d2cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent
+ * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels.
+ */
+@ChannelHandler.Sharable
+final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class);
+ private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>();
+ private final AtomicLong counter = new AtomicLong(0);
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
+ {
+ final long count = counter.incrementAndGet();
+ long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+ // Setting the limit to -1 disables it.
+ if(limit < 0)
+ {
+ limit = Long.MAX_VALUE;
+ }
+ if (count > limit)
+ {
+ // The decrement will be done in channelClosed(...)
+ logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count);
+ ctx.getChannel().close();
+ }
+ else
+ {
+ long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+ if (perIpLimit > 0)
+ {
+ InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
+
+ AtomicLong perIpCount = connectionsPerClient.get(address);
+ if (perIpCount == null)
+ {
+ perIpCount = new AtomicLong(0);
+
+ AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
+ if (old != null)
+ {
+ perIpCount = old;
+ }
+ }
+ if (perIpCount.incrementAndGet() > perIpLimit)
+ {
+ // The decrement will be done in channelClosed(...)
+ logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount);
+ ctx.getChannel().close();
+ return;
+ }
+ }
+ super.channelOpen(ctx, event);
+ }
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
+ {
+ counter.decrementAndGet();
+ InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
+
+ AtomicLong count = connectionsPerClient.get(address);
+ if (count != null)
+ {
+ if (count.decrementAndGet() <= 0)
+ {
+ connectionsPerClient.remove(address);
+ }
+ }
+ super.channelClosed(ctx, event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index df4f127..30b8a9d 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -240,6 +240,7 @@ public class Server implements CassandraDaemon.Server
private static final Frame.Compressor frameCompressor = new Frame.Compressor();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+ private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
private final Server server;
@@ -252,6 +253,14 @@ public class Server implements CassandraDaemon.Server
{
ChannelPipeline pipeline = Channels.pipeline();
+ // Add the ConnectionLimitHandler to the pipeline if configured to do so.
+ if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+ || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+ {
+ // Add as first to the pipeline so the limit is enforced as first action.
+ pipeline.addFirst("connectionLimitHandler", connectionLimitHandler);
+ }
+
//pipeline.addLast("debug", new LoggingHandler());
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by jm...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c2536c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c2536c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c2536c
Branch: refs/heads/cassandra-2.1
Commit: b1c2536ccb244be078cddc0da6c788d2b7c0f4ec
Parents: 7902568 e56d9ef
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Mar 5 11:32:41 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Mar 5 11:32:41 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 19 ++++
.../apache/cassandra/service/StorageProxy.java | 8 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../transport/ConnectionLimitHandler.java | 108 +++++++++++++++++++
.../org/apache/cassandra/transport/Server.java | 9 ++
8 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 137c0f1,4e34c9e..aa2e1f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,5 +1,37 @@@
-2.0.13:
+2.1.4
+ * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
+ * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
+ * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
+ * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
+ * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
+ * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366)
+ * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
+ * Fix parallelism adjustment in range and secondary index queries
+ when the first fetch does not satisfy the limit (CASSANDRA-8856)
+ * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
+ * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
+ * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812)
+ * Improve assertions in Memory (CASSANDRA-8792)
+ * Fix SSTableRewriter cleanup (CASSANDRA-8802)
+ * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
+ * 'nodetool info' prints exception against older node (CASSANDRA-8796)
+ * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
+ * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
+ * Enforce SSTableReader.first/last (CASSANDRA-8744)
+ * Cleanup SegmentedFile API (CASSANDRA-8749)
+ * Avoid overlap with early compaction replacement (CASSANDRA-8683)
+ * Safer Resource Management++ (CASSANDRA-8707)
+ * Write partition size estimates into a system table (CASSANDRA-7688)
+ * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
+ (CASSANDRA-8154)
+ * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+ each IndexSummary opened from it (CASSANDRA-8757)
+ * markCompacting only succeeds if the exact SSTableReader instances being
+ marked are in the live set (CASSANDRA-8689)
+ * cassandra-stress support for varint (CASSANDRA-8882)
+Merged from 2.0:
+ * Add ability to limit number of native connections (CASSANDRA-8086)
* Add offline tool to relevel sstables (CASSANDRA-8301)
* Preserve stream ID for more protocol errors (CASSANDRA-8848)
* Fix combining token() function with multi-column relations on
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
index 0000000,c45d2cb..7bcf280
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@@ -1,0 -1,111 +1,108 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.transport;
+
+
++import io.netty.channel.ChannelHandler;
++import io.netty.channel.ChannelHandlerContext;
++import io.netty.channel.ChannelInboundHandlerAdapter;
+ import org.apache.cassandra.config.DatabaseDescriptor;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.net.InetAddress;
+ import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicLong;
+
+
+ /**
- * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent
++ * {@link ChannelInboundHandlerAdapter} implementation which allows to limit the number of concurrent
+ * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels.
+ */
+ @ChannelHandler.Sharable
-final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
++final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter
+ {
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class);
+ private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>();
+ private final AtomicLong counter = new AtomicLong(0);
+
+ @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
++ public void channelActive(ChannelHandlerContext ctx) throws Exception
+ {
+ final long count = counter.incrementAndGet();
+ long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+ // Setting the limit to -1 disables it.
+ if(limit < 0)
+ {
+ limit = Long.MAX_VALUE;
+ }
+ if (count > limit)
+ {
+ // The decrement will be done in channelClosed(...)
+ logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count);
- ctx.getChannel().close();
++ ctx.close();
+ }
+ else
+ {
+ long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+ if (perIpLimit > 0)
+ {
- InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
++ InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
+
+ AtomicLong perIpCount = connectionsPerClient.get(address);
+ if (perIpCount == null)
+ {
+ perIpCount = new AtomicLong(0);
+
+ AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
+ if (old != null)
+ {
+ perIpCount = old;
+ }
+ }
+ if (perIpCount.incrementAndGet() > perIpLimit)
+ {
+ // The decrement will be done in channelClosed(...)
+ logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount);
- ctx.getChannel().close();
++ ctx.close();
+ return;
+ }
+ }
- super.channelOpen(ctx, event);
++ ctx.fireChannelActive();
+ }
+ }
+
+ @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
++ public void channelInactive(ChannelHandlerContext ctx) throws Exception
+ {
+ counter.decrementAndGet();
- InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
++ InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
+
+ AtomicLong count = connectionsPerClient.get(address);
+ if (count != null)
+ {
+ if (count.decrementAndGet() <= 0)
+ {
+ connectionsPerClient.remove(address);
+ }
+ }
- super.channelClosed(ctx, event);
++ ctx.fireChannelInactive();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 60d3e70,30b8a9d..f396fd9
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -278,10 -249,18 +279,18 @@@ public class Server implements Cassandr
this.server = server;
}
- public ChannelPipeline getPipeline() throws Exception
+ protected void initChannel(Channel channel) throws Exception
{
- ChannelPipeline pipeline = Channels.pipeline();
+ ChannelPipeline pipeline = channel.pipeline();
+ // Add the ConnectionLimitHandler to the pipeline if configured to do so.
+ if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+ || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+ {
+ // Add as first to the pipeline so the limit is enforced as first action.
+ pipeline.addFirst("connectionLimitHandler", connectionLimitHandler);
+ }
+
//pipeline.addLast("debug", new LoggingHandler());
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));