You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/03/05 18:40:35 UTC

[1/2] cassandra git commit: Add ability to limit number of native connections

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 790256825 -> b1c2536cc


Add ability to limit number of native connections

Patch by Norman Maurer; reviewed by jmckenzie for CASSANDRA-8086


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e56d9efb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e56d9efb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e56d9efb

Branch: refs/heads/cassandra-2.1
Commit: e56d9efb7c18138fac9059207568598bbb964eb9
Parents: 2428b9c
Author: Norman Maurer <no...@apache.org>
Authored: Thu Mar 5 10:37:53 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Mar 5 10:37:53 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 ++
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 ++++
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../transport/ConnectionLimitHandler.java       | 111 +++++++++++++++++++
 .../org/apache/cassandra/transport/Server.java  |   9 ++
 8 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f98bb3f..4e34c9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.13:
+ * Add ability to limit number of native connections (CASSANDRA-8086)
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)
  * Fix combining token() function with multi-column relations on

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 163ae9e..f99ade1 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -332,6 +332,14 @@ native_transport_port: 9042
 # be rejected as invalid. The default is 256MB.
 # native_transport_max_frame_size_in_mb: 256
 
+# The maximum number of concurrent client connections.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections: -1
+
+# The maximum number of concurrent client connections per source ip.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections_per_ip: -1
+
 # Whether to start the thrift rpc server.
 start_rpc: true
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 4dd71aa..3c223e3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -112,6 +112,8 @@ public class Config
     public Integer native_transport_port = 9042;
     public Integer native_transport_max_threads = 128;
     public Integer native_transport_max_frame_size_in_mb = 256;
+    public volatile Long native_transport_max_concurrent_connections = -1L;
+    public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 286014e..b3b10c1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1108,6 +1108,25 @@ public class DatabaseDescriptor
         return conf.native_transport_max_frame_size_in_mb * 1024 * 1024;
     }
 
+    public static Long getNativeTransportMaxConcurrentConnections()
+    {
+        return conf.native_transport_max_concurrent_connections;
+    }
+
+    public static void setNativeTransportMaxConcurrentConnections(long nativeTransportMaxConcurrentConnections)
+    {
+        conf.native_transport_max_concurrent_connections = nativeTransportMaxConcurrentConnections;
+    }
+
+    public static Long getNativeTransportMaxConcurrentConnectionsPerIp() {
+        return conf.native_transport_max_concurrent_connections_per_ip;
+    }
+
+    public static void setNativeTransportMaxConcurrentConnectionsPerIp(long native_transport_max_concurrent_connections_per_ip)
+    {
+        conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fcc9665..d033929 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2125,9 +2125,15 @@ public class StorageProxy implements StorageProxyMBean
 
     public Long getTruncateRpcTimeout() { return DatabaseDescriptor.getTruncateRpcTimeout(); }
     public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
+
+    public Long getNativeTransportMaxConcurrentConnections() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); }
+    public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnections(nativeTransportMaxConcurrentConnections); }
+
+    public Long getNativeTransportMaxConcurrentConnectionsPerIp() { return DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); }
+    public void setNativeTransportMaxConcurrentConnectionsPerIp(Long nativeTransportMaxConcurrentConnections) { DatabaseDescriptor.setNativeTransportMaxConcurrentConnectionsPerIp(nativeTransportMaxConcurrentConnections); }
+
     public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); }
 
-    
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.count();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 203cabe..03b9b58 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -95,6 +95,9 @@ public interface StorageProxyMBean
     public Long getTruncateRpcTimeout();
     public void setTruncateRpcTimeout(Long timeoutInMillis);
 
+    public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections);
+    public Long getNativeTransportMaxConcurrentConnections();
+
     public void reloadTriggerClasses();
 
     public long getReadRepairAttempted();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
new file mode 100644
index 0000000..c45d2cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent
+ * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels.
+ */
+@ChannelHandler.Sharable
+final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class);
+    private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>();
+    private final AtomicLong counter = new AtomicLong(0);
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
+    {
+        final long count = counter.incrementAndGet();
+        long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+        // Setting the limit to -1 disables it.
+        if(limit < 0)
+        {
+            limit = Long.MAX_VALUE;
+        }
+        if (count > limit)
+        {
+            // The decrement will be done in channelClosed(...)
+            logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count);
+            ctx.getChannel().close();
+        }
+        else
+        {
+            long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+            if (perIpLimit > 0)
+            {
+                InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
+
+                AtomicLong perIpCount = connectionsPerClient.get(address);
+                if (perIpCount == null)
+                {
+                    perIpCount = new AtomicLong(0);
+
+                    AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
+                    if (old != null)
+                    {
+                        perIpCount = old;
+                    }
+                }
+                if (perIpCount.incrementAndGet() > perIpLimit)
+                {
+                    // The decrement will be done in channelClosed(...)
+                    logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount);
+                    ctx.getChannel().close();
+                    return;
+                }
+            }
+            super.channelOpen(ctx, event);
+        }
+    }
+
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
+    {
+        counter.decrementAndGet();
+        InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
+
+        AtomicLong count = connectionsPerClient.get(address);
+        if (count != null)
+        {
+            if (count.decrementAndGet() <= 0)
+            {
+                connectionsPerClient.remove(address);
+            }
+        }
+        super.channelClosed(ctx, event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e56d9efb/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index df4f127..30b8a9d 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -240,6 +240,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+        private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
 
         private final Server server;
 
@@ -252,6 +253,14 @@ public class Server implements CassandraDaemon.Server
         {
             ChannelPipeline pipeline = Channels.pipeline();
 
+            // Add the ConnectionLimitHandler to the pipeline if configured to do so.
+            if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+                    || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+            {
+                // Add as first to the pipeline so the limit is enforced as first action.
+                pipeline.addFirst("connectionLimitHandler", connectionLimitHandler);
+            }
+
             //pipeline.addLast("debug", new LoggingHandler());
 
             pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));


[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by jm...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c2536c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c2536c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c2536c

Branch: refs/heads/cassandra-2.1
Commit: b1c2536ccb244be078cddc0da6c788d2b7c0f4ec
Parents: 7902568 e56d9ef
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Mar 5 11:32:41 2015 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Mar 5 11:32:41 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 ++
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 ++++
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../transport/ConnectionLimitHandler.java       | 108 +++++++++++++++++++
 .../org/apache/cassandra/transport/Server.java  |   9 ++
 8 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 137c0f1,4e34c9e..aa2e1f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,5 +1,37 @@@
 -2.0.13:
 +2.1.4
 + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
 + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
 + * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
 + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
 + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
 + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366)
 + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
 + * Fix parallelism adjustment in range and secondary index queries
 +   when the first fetch does not satisfy the limit (CASSANDRA-8856)
 + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
 + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
 + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812)
 + * Improve assertions in Memory (CASSANDRA-8792)
 + * Fix SSTableRewriter cleanup (CASSANDRA-8802)
 + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
 + * Enforce SSTableReader.first/last (CASSANDRA-8744)
 + * Cleanup SegmentedFile API (CASSANDRA-8749)
 + * Avoid overlap with early compaction replacement (CASSANDRA-8683)
 + * Safer Resource Management++ (CASSANDRA-8707)
 + * Write partition size estimates into a system table (CASSANDRA-7688)
 + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
 +   (CASSANDRA-8154)
 + * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
 + * IndexSummaryBuilder utilises offheap memory, and shares data between
 +   each IndexSummary opened from it (CASSANDRA-8757)
 + * markCompacting only succeeds if the exact SSTableReader instances being 
 +   marked are in the live set (CASSANDRA-8689)
 + * cassandra-stress support for varint (CASSANDRA-8882)
 +Merged from 2.0:
+  * Add ability to limit number of native connections (CASSANDRA-8086)
   * Add offline tool to relevel sstables (CASSANDRA-8301)
   * Preserve stream ID for more protocol errors (CASSANDRA-8848)
   * Fix combining token() function with multi-column relations on

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
index 0000000,c45d2cb..7bcf280
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@@ -1,0 -1,111 +1,108 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.transport;
+ 
+ 
++import io.netty.channel.ChannelHandler;
++import io.netty.channel.ChannelHandlerContext;
++import io.netty.channel.ChannelInboundHandlerAdapter;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.jboss.netty.channel.ChannelHandler;
 -import org.jboss.netty.channel.ChannelHandlerContext;
 -import org.jboss.netty.channel.ChannelStateEvent;
 -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.net.InetAddress;
+ import java.net.InetSocketAddress;
 -import java.util.HashMap;
 -import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ 
+ /**
 - * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent
++ * {@link ChannelInboundHandlerAdapter} implementation which allows to limit the number of concurrent
+  * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels.
+  */
+ @ChannelHandler.Sharable
 -final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
++final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter
+ {
+     private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class);
+     private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>();
+     private final AtomicLong counter = new AtomicLong(0);
+ 
+     @Override
 -    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
++    public void channelActive(ChannelHandlerContext ctx) throws Exception
+     {
+         final long count = counter.incrementAndGet();
+         long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+         // Setting the limit to -1 disables it.
+         if(limit < 0)
+         {
+             limit = Long.MAX_VALUE;
+         }
+         if (count > limit)
+         {
+             // The decrement will be done in channelClosed(...)
+             logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count);
 -            ctx.getChannel().close();
++            ctx.close();
+         }
+         else
+         {
+             long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+             if (perIpLimit > 0)
+             {
 -                InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
++                InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
+ 
+                 AtomicLong perIpCount = connectionsPerClient.get(address);
+                 if (perIpCount == null)
+                 {
+                     perIpCount = new AtomicLong(0);
+ 
+                     AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
+                     if (old != null)
+                     {
+                         perIpCount = old;
+                     }
+                 }
+                 if (perIpCount.incrementAndGet() > perIpLimit)
+                 {
+                     // The decrement will be done in channelClosed(...)
+                     logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount);
 -                    ctx.getChannel().close();
++                    ctx.close();
+                     return;
+                 }
+             }
 -            super.channelOpen(ctx, event);
++            ctx.fireChannelActive();
+         }
+     }
+ 
+     @Override
 -    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception
++    public void channelInactive(ChannelHandlerContext ctx) throws Exception
+     {
+         counter.decrementAndGet();
 -        InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress();
++        InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress();
+ 
+         AtomicLong count = connectionsPerClient.get(address);
+         if (count != null)
+         {
+             if (count.decrementAndGet() <= 0)
+             {
+                 connectionsPerClient.remove(address);
+             }
+         }
 -        super.channelClosed(ctx, event);
++        ctx.fireChannelInactive();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 60d3e70,30b8a9d..f396fd9
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -278,10 -249,18 +279,18 @@@ public class Server implements Cassandr
              this.server = server;
          }
  
 -        public ChannelPipeline getPipeline() throws Exception
 +        protected void initChannel(Channel channel) throws Exception
          {
 -            ChannelPipeline pipeline = Channels.pipeline();
 +            ChannelPipeline pipeline = channel.pipeline();
  
+             // Add the ConnectionLimitHandler to the pipeline if configured to do so.
+             if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+                     || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+             {
+                 // Add as first to the pipeline so the limit is enforced as first action.
+                 pipeline.addFirst("connectionLimitHandler", connectionLimitHandler);
+             }
+ 
              //pipeline.addLast("debug", new LoggingHandler());
  
              pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));