You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/03/14 23:19:14 UTC

[kafka] branch trunk updated: KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402)

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ec2ca5  KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402)
3ec2ca5 is described below

commit 3ec2ca5e334ef42683263ed50c718c62917046af
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Mar 14 16:18:52 2019 -0700

    KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402)
    
    Adds a new listener config `max.connections` to limit the number of active connections on each listener. The config may be prefixed with listener prefix. This limit may be dynamically reconfigured without restarting the broker.
    
    This is one of the PRs for KIP-402 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors). Note that this is currently built on top of PR #6022
    
    Author: Rajini Sivaram <ra...@googlemail.com>
    
    Reviewers: Gwen Shapira <cs...@gmail.com>
    
    Closes #6034 from rajinisivaram/KAFKA-7730-max-connections
---
 .../org/apache/kafka/common/network/Selector.java  |  22 +++
 .../apache/kafka/common/network/SelectorTest.java  |  30 +++
 .../main/scala/kafka/network/SocketServer.scala    | 209 +++++++++++++++++---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  40 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  19 +-
 .../kafka/network/DynamicConnectionQuotaTest.scala | 217 ++++++++++++++++-----
 .../kafka/server/DynamicBrokerConfigTest.scala     |   6 +
 7 files changed, 441 insertions(+), 102 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index e431e27..b349797 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -920,6 +920,28 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     /**
+     * Returns the lowest priority channel chosen using the following sequence:
+     *   1) If one or more channels are in closing state, return any one of them
+     *   2) If idle expiry manager is enabled, return the least recently updated channel
+     *   3) Otherwise return any of the channels
+     *
+     * This method is used to close a channel to accommodate a new channel on the inter-broker listener
+     * when broker-wide `max.connections` limit is enabled.
+     */
+    public KafkaChannel lowestPriorityChannel() {
+        KafkaChannel channel = null;
+        if (!closingChannels.isEmpty()) {
+            channel = closingChannels.values().iterator().next();
+        } else if (idleExpiryManager != null && !idleExpiryManager.lruConnections.isEmpty()) {
+            String channelId = idleExpiryManager.lruConnections.keySet().iterator().next();
+            channel = channel(channelId);
+        } else if (!channels.isEmpty()) {
+            channel = channels.values().iterator().next();
+        }
+        return channel;
+    }
+
+    /**
      * Get the channel associated with selectionKey
      */
     private KafkaChannel channel(SelectionKey key) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 7cb89c2..8cbada7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -688,6 +688,36 @@ public class SelectorTest {
         assertEquals((double) conns, getMetric("connection-count").metricValue());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testLowestPriorityChannel() throws Exception {
+        int conns = 5;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        for (int i = 0; i < conns; i++) {
+            connect(String.valueOf(i), addr);
+        }
+        assertNotNull(selector.lowestPriorityChannel());
+        for (int i = conns - 1; i >= 0; i--) {
+            if (i != 2)
+              assertEquals("", blockingRequest(String.valueOf(i), ""));
+            time.sleep(10);
+        }
+        assertEquals("2", selector.lowestPriorityChannel().id());
+
+        Field field = Selector.class.getDeclaredField("closingChannels");
+        field.setAccessible(true);
+        Map<String, KafkaChannel> closingChannels = (Map<String, KafkaChannel>) field.get(selector);
+        closingChannels.put("3", selector.channel("3"));
+        assertEquals("3", selector.lowestPriorityChannel().id());
+        closingChannels.remove("3");
+
+        for (int i = 0; i < conns; i++) {
+            selector.close(String.valueOf(i));
+        }
+        assertNull(selector.lowestPriorityChannel());
+    }
+
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d8c4a74..e06cee7 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 import java.net._
 import java.nio.channels._
 import java.nio.channels.{Selector => NSelector}
+import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.function.Supplier
@@ -32,15 +33,16 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingRespo
 import kafka.network.Processor._
 import kafka.network.SocketServer._
 import kafka.security.CredentialProvider
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils._
+import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.{KafkaException, Reconfigurable}
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
 import org.apache.kafka.common.metrics.stats.Total
 import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
-import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -70,7 +72,11 @@ import scala.util.control.ControlThrowable
  *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
  *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
  */
-class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
+class SocketServer(val config: KafkaConfig,
+                   val metrics: Metrics,
+                   val time: Time,
+                   val credentialProvider: CredentialProvider)
+  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
@@ -109,7 +115,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
    */
   def startup(startupProcessors: Boolean = true) {
     this.synchronized {
-      connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
+      connectionQuotas = new ConnectionQuotas(config, time)
       createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
       createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
       if (startupProcessors) {
@@ -212,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                     endpoints: Seq[EndPoint]): Unit = synchronized {
     endpoints.foreach { endpoint =>
+      connectionQuotas.addListener(config, endpoint.listenerName)
       val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
       addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
       KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
@@ -223,6 +230,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
     endpointOpt.foreach { endpoint =>
+      connectionQuotas.addListener(config, endpoint.listenerName)
       val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
       val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
       controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
@@ -324,18 +332,33 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
     info(s"Removing data-plane listeners for endpoints $listenersRemoved")
     listenersRemoved.foreach { endpoint =>
+      connectionQuotas.removeListener(config, endpoint.listenerName)
       dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
     }
   }
 
-  def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
-    info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
-    connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
+  override def reconfigurableConfigs: Set[String] = SocketServer.ReconfigurableConfigs
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+
   }
 
-  def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = {
-    info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
-    connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    val maxConnectionsPerIp = newConfig.maxConnectionsPerIp
+    if (maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) {
+      info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
+      connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
+    }
+    val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides
+    if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides) {
+      info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
+      connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+    }
+    val maxConnections = newConfig.maxConnections
+    if (maxConnections != oldConfig.maxConnections) {
+      info(s"Updating broker-wide maxConnections: $maxConnections")
+      connectionQuotas.updateBrokerMaxConnections(maxConnections)
+    }
   }
 
   // `protected` for test usage
@@ -373,6 +396,13 @@ object SocketServer {
   val ControlPlaneThreadPrefix = "control-plane"
   val DataPlaneMetricPrefix = ""
   val ControlPlaneMetricPrefix = "ControlPlane"
+
+  val ReconfigurableConfigs = Set(
+    KafkaConfig.MaxConnectionsPerIpProp,
+    KafkaConfig.MaxConnectionsPerIpOverridesProp,
+    KafkaConfig.MaxConnectionsProp)
+
+  val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp)
 }
 
 /**
@@ -427,10 +457,10 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
   /**
    * Close `channel` and decrement the connection count.
    */
-  def close(channel: SocketChannel): Unit = {
+  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
     if (channel != null) {
       debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
-      connectionQuotas.dec(channel.socket.getInetAddress)
+      connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
       CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
       CoreUtils.swallow(channel.close(), this, Level.ERROR)
     }
@@ -500,6 +530,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
       var currentProcessorIndex = 0
       while (isRunning) {
         try {
+
           val ready = nioSelector.select(500)
           if (ready > 0) {
             val keys = nioSelector.selectedKeys()
@@ -508,6 +539,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
               try {
                 val key = iter.next
                 iter.remove()
+
                 if (key.isAcceptable) {
                   accept(key).foreach { socketChannel =>
 
@@ -582,7 +614,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
     val socketChannel = serverSocketChannel.accept()
     try {
-      connectionQuotas.inc(socketChannel.socket().getInetAddress)
+      connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
       socketChannel.configureBlocking(false)
       socketChannel.socket().setTcpNoDelay(true)
       socketChannel.socket().setKeepAlive(true)
@@ -592,7 +624,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     } catch {
       case e: TooManyConnectionsException =>
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
-        close(socketChannel)
+        close(endPoint.listenerName, socketChannel)
         None
     }
   }
@@ -731,6 +763,7 @@ private[kafka] class Processor(val id: Int,
           processCompletedReceives()
           processCompletedSends()
           processDisconnected()
+          closeExcessConnections()
         } catch {
           // We catch all the throwables here to prevent the processor thread from exiting. We do this because
           // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
@@ -911,13 +944,21 @@ private[kafka] class Processor(val id: Int,
         }.remoteHost
         inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
         // the channel has been closed by the selector but the quotas still need to be updated
-        connectionQuotas.dec(InetAddress.getByName(remoteHost))
+        connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
       } catch {
         case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
       }
     }
   }
 
+  private def closeExcessConnections(): Unit = {
+    if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
+      val channel = selector.lowestPriorityChannel()
+      if (channel != null)
+        close(channel.id)
+    }
+  }
+
   /**
    * Close the connection identified by `connectionId` and decrement the connection count.
    * The channel will be immediately removed from the selector's `channels` or `closingChannels`
@@ -930,7 +971,7 @@ private[kafka] class Processor(val id: Int,
       debug(s"Closing selector connection $connectionId")
       val address = channel.socketAddress
       if (address != null)
-        connectionQuotas.dec(address)
+        connectionQuotas.dec(listenerName, address)
       selector.close(connectionId)
 
       inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response))
@@ -977,7 +1018,7 @@ private[kafka] class Processor(val id: Int,
         case e: Throwable =>
           val remoteAddress = channel.socket.getRemoteSocketAddress
           // need to close the channel here to avoid a socket leak.
-          close(channel)
+          close(listenerName, channel)
           processException(s"Processor $id closed connection from $remoteAddress", e)
       }
     }
@@ -1058,31 +1099,72 @@ private[kafka] class Processor(val id: Int,
 
 }
 
-class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
+class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
 
-  @volatile private var defaultMaxConnectionsPerIp = defaultMax
-  @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
+  @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
+  @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
+  @volatile private var brokerMaxConnections = config.maxConnections
   private val counts = mutable.Map[InetAddress, Int]()
 
-  def inc(address: InetAddress) {
+  // Listener counts and configs are synchronized on `counts`
+  private val listenerCounts = mutable.Map[ListenerName, Int]()
+  private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
+  @volatile private var totalCount = 0
+
+  def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter) {
     counts.synchronized {
+      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+
       val count = counts.getOrElseUpdate(address, 0)
       counts.put(address, count + 1)
+      totalCount += 1
+      if (listenerCounts.contains(listenerName)) {
+        listenerCounts.put(listenerName, listenerCounts(listenerName) + 1)
+      }
       val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp)
       if (count >= max)
         throw new TooManyConnectionsException(address, max)
     }
   }
 
-  def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
+  private[network] def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
     defaultMaxConnectionsPerIp = maxConnectionsPerIp
   }
 
-  def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
+  private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
     maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
   }
 
-  def dec(address: InetAddress) {
+  private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = {
+    counts.synchronized {
+      brokerMaxConnections = maxConnections
+      counts.notifyAll()
+    }
+  }
+
+  private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = {
+    counts.synchronized {
+      if (!maxConnectionsPerListener.contains(listenerName)) {
+        val newListenerQuota = new ListenerConnectionQuota(counts, listenerName)
+        maxConnectionsPerListener.put(listenerName, newListenerQuota)
+        listenerCounts.put(listenerName, 0)
+        config.addReconfigurable(newListenerQuota)
+      }
+      counts.notifyAll()
+    }
+  }
+
+  private[network] def removeListener(config: KafkaConfig, listenerName: ListenerName): Unit = {
+    counts.synchronized {
+      maxConnectionsPerListener.remove(listenerName).foreach { listenerQuota =>
+        listenerCounts.remove(listenerName)
+        counts.notifyAll() // wake up any waiting acceptors to close cleanly
+        config.removeReconfigurable(listenerQuota)
+      }
+    }
+  }
+
+  def dec(listenerName: ListenerName, address: InetAddress) {
     counts.synchronized {
       val count = counts.getOrElse(address,
         throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address"))
@@ -1090,6 +1172,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
         counts.remove(address)
       else
         counts.put(address, count - 1)
+
+      if (totalCount <= 0)
+        error(s"Attempted to decrease total connection count for broker with no connections")
+      totalCount -= 1
+
+      if (maxConnectionsPerListener.contains(listenerName)) {
+        val listenerCount = listenerCounts(listenerName)
+        if (listenerCount == 0)
+          error(s"Attempted to decrease connection count for listener $listenerName with no connections")
+        else
+          listenerCounts.put(listenerName, listenerCount - 1)
+      }
+      counts.notifyAll() // wake up any acceptors waiting to process a new connection since listener connection limit was reached
     }
   }
 
@@ -1097,6 +1192,72 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
     counts.getOrElse(address, 0)
   }
 
+  private def waitForConnectionSlot(listenerName: ListenerName,
+                                    acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
+    counts.synchronized {
+      if (!connectionSlotAvailable(listenerName)) {
+        val startNs = time.nanoseconds
+        do {
+          counts.wait()
+        } while (!connectionSlotAvailable(listenerName))
+        acceptorBlockedPercentMeter.mark(time.nanoseconds - startNs)
+      }
+    }
+  }
+
+  // This is invoked in every poll iteration and we close one LRU connection in an iteration
+  // if necessary
+  def maxConnectionsExceeded(listenerName: ListenerName): Boolean = {
+    totalCount > brokerMaxConnections && !protectedListener(listenerName)
+  }
+
+  private def connectionSlotAvailable(listenerName: ListenerName): Boolean = {
+    if (listenerCounts(listenerName) >= maxListenerConnections(listenerName))
+      false
+    else if (protectedListener(listenerName))
+      true
+    else
+      totalCount < brokerMaxConnections
+  }
+
+  private def protectedListener(listenerName: ListenerName): Boolean =
+    config.interBrokerListenerName == listenerName && config.listeners.size > 1
+
+  private def maxListenerConnections(listenerName: ListenerName): Int =
+    maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
+
+  class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable {
+    @volatile private var _maxConnections = Int.MaxValue
+    private val listenerPropName = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}"
+    def maxConnections: Int = _maxConnections
+
+    override def listenerName(): ListenerName = listener
+
+    override def configure(configs: util.Map[String, _]): Unit = {
+      _maxConnections = maxConnections(configs)
+    }
+
+    override def reconfigurableConfigs(): util.Set[String] = {
+      SocketServer.ListenerReconfigurableConfigs.asJava
+    }
+
+    override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+      val value = maxConnections(configs)
+      if (value <= 0)
+        throw new ConfigException("Invalid max.connections $listenerMax")
+    }
+
+    override def reconfigure(configs: util.Map[String, _]): Unit = {
+      lock.synchronized {
+        _maxConnections = maxConnections(configs)
+        lock.notifyAll()
+      }
+    }
+
+    private def maxConnections(configs: util.Map[String, _]): Int = {
+      Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue)
+    }
+  }
 }
 
 class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)")
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b02b842..4f27226 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogConfig, LogManager}
+import kafka.network.SocketServer
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -81,10 +82,11 @@ object DynamicBrokerConfig {
     DynamicThreadPool.ReconfigurableConfigs ++
     Set(KafkaConfig.MetricReporterClassesProp) ++
     DynamicListenerConfig.ReconfigurableConfigs ++
-    DynamicConnectionQuota.ReconfigurableConfigs
+    SocketServer.ReconfigurableConfigs
 
+  private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
-    DynamicListenerConfig.ReconfigurableConfigs
+    DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
   private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
 
   private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
@@ -135,7 +137,13 @@ object DynamicBrokerConfig {
 
   private def perBrokerConfigs(props: Properties): Set[String] = {
     val configNames = props.asScala.keySet
-    configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
+    def perBrokerListenerConfig(name: String): Boolean = {
+      name match {
+        case ListenerConfigRegex(baseName) => !ClusterLevelListenerConfigs.contains(baseName)
+        case _ => false
+      }
+    }
+    configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig)
   }
 
   private def nonDynamicConfigs(props: Properties): Set[String] = {
@@ -224,7 +232,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
-    addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
+    addBrokerReconfigurable(kafkaServer.socketServer)
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@@ -789,7 +797,10 @@ object DynamicListenerConfig {
     KafkaConfig.SaslLoginRefreshWindowFactorProp,
     KafkaConfig.SaslLoginRefreshWindowJitterProp,
     KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp,
-    KafkaConfig.SaslLoginRefreshBufferSecondsProp
+    KafkaConfig.SaslLoginRefreshBufferSecondsProp,
+
+    // Connection limit configs
+    KafkaConfig.MaxConnectionsProp
   )
 }
 
@@ -884,24 +895,5 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
 
 }
 
-object DynamicConnectionQuota {
-  val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp)
-}
-
-class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable {
-
-  override def reconfigurableConfigs: Set[String] = {
-    DynamicConnectionQuota.ReconfigurableConfigs
-  }
-
-  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-  }
-
-  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
-    server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides)
 
-    if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp)
-      server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp)
-  }
-}
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 02e1715..8ab7b43 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -75,6 +75,7 @@ object Defaults {
   val SocketRequestMaxBytes: Int = 100 * 1024 * 1024
   val MaxConnectionsPerIp: Int = Int.MaxValue
   val MaxConnectionsPerIpOverrides: String = ""
+  val MaxConnections: Int = Int.MaxValue
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
   val RequestTimeoutMs = 30000
   val FailedAuthenticationDelayMs = 100
@@ -293,6 +294,7 @@ object KafkaConfig {
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
   val MaxConnectionsPerIpProp = "max.connections.per.ip"
   val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
+  val MaxConnectionsProp = "max.connections"
   val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
   val FailedAuthenticationDelayMsProp = "connection.failed.authentication.delay.ms"
   /***************** rack configuration *************/
@@ -570,8 +572,15 @@ object KafkaConfig {
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used."
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
   val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " +
-    "configured using " + MaxConnectionsPerIpOverridesProp + " property"
-  val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is \"hostName:100,127.0.0.1:200\""
+    s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached."
+  val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " +
+    "An example value is \"hostName:100,127.0.0.1:200\""
+  val MaxConnectionsDoc = "The maximum number of connections we allow in the broker at any time. This limit is applied in addition " +
+    s"to any per-ip limits configured using $MaxConnectionsPerIpProp. Listener-level limits may also be configured by prefixing the " +
+    s"config name with the listener prefix, for example, <code>listener.name.internal.$MaxConnectionsProp</code>. Broker-wide limit " +
+    "should be configured based on broker capacity while listener limits should be configured based on application requirements. " +
+    "New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " +
+    "permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case."
   val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this"
   val FailedAuthenticationDelayMsDoc = "Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. " +
     s"This must be configured to be less than $ConnectionsMaxIdleMsProp to prevent connection timeout."
@@ -872,6 +881,7 @@ object KafkaConfig {
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
       .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
       .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
+      .define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc)
       .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
       .define(FailedAuthenticationDelayMsProp, INT, Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW, FailedAuthenticationDelayMsDoc)
 
@@ -1163,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
   val maxConnectionsPerIpOverrides: Map[String, Int] =
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
+  def maxConnections = getInt(KafkaConfig.MaxConnectionsProp)
   val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
   val failedAuthenticationDelayMs = getInt(KafkaConfig.FailedAuthenticationDelayMsProp)
 
@@ -1331,6 +1342,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     dynamicConfig.addReconfigurable(reconfigurable)
   }
 
+  def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
+    dynamicConfig.removeReconfigurable(reconfigurable)
+  }
+
   def logRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 78d9af7..514d879 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -21,18 +21,19 @@ package kafka.network
 import java.io.IOException
 import java.net.{InetAddress, Socket}
 import java.util.Properties
+import java.util.concurrent._
 
 import kafka.server.{BaseRequestTest, KafkaConfig}
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.Assert.assertEquals
-import org.junit.{Before, Test}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
 
@@ -41,6 +42,9 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   override def numBrokers = 1
 
   val topic = "test"
+  val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+  val localAddress = InetAddress.getByName("127.0.0.1")
+  var executor: ExecutorService = _
 
   @Before
   override def setUp(): Unit = {
@@ -48,76 +52,138 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
   }
 
-  @Test
-  def testDynamicConnectionQuota(): Unit = {
-    def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
-      new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
+  @After
+  override def tearDown(): Unit = {
+    try {
+      if (executor != null) {
+        executor.shutdownNow()
+        assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS))
+      }
+    } finally {
+      super.tearDown()
     }
+  }
 
-    val socketServer = servers.head.socketServer
-    val localAddress = InetAddress.getByName("127.0.0.1")
-    def connectionCount = socketServer.connectionCount(localAddress)
+  override protected def propertyOverrides(properties: Properties): Unit = {
+    super.propertyOverrides(properties)
+  }
+
+  @Test
+  def testDynamicConnectionQuota(): Unit = {
     val initialConnectionCount = connectionCount
     val maxConnectionsPerIP = 5
 
+    def connectAndVerify: Unit = {
+      val socket = connect()
+      try {
+        sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket)
+      } finally {
+        socket.close()
+      }
+    }
+
     val props = new Properties
     props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString))
 
-    //wait for adminClient connections to close
-    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
-
-    //create connections up to maxConnectionsPerIP - 1, leave space for one connection
-    var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer))
-
-    // produce should succeed
-    var produceResponse = sendProduceRequest()
-    assertEquals(1, produceResponse.responses.size)
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
-    assertEquals(Errors.NONE, partitionResponse.error)
-
-    TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed")
-    conns = conns :+ connect(socketServer)
-    // now try one more (should fail)
-    intercept[IOException](sendProduceRequest())
-
-    conns.foreach(conn => conn.close())
-    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
+    verifyMaxConnections(maxConnectionsPerIP, () => connectAndVerify)
 
     // Increase MaxConnectionsPerIpOverrides for localhost to 7
     val maxConnectionsPerIPOverride = 7
     props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride"))
 
-    //wait for adminClient connections to close
-    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
+    verifyMaxConnections(maxConnectionsPerIPOverride, () => connectAndVerify)
+  }
 
-    //create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection
-    conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer))
+  @Test
+  def testDynamicListenerConnectionQuota(): Unit = {
+    val socketServer = servers.head.socketServer
+    val initialConnectionCount = connectionCount
 
-    // send should succeed
-    produceResponse = sendProduceRequest()
-    assertEquals(1, produceResponse.responses.size)
-    val (tp1, partitionResponse1) = produceResponse.responses.asScala.head
-    assertEquals(Errors.NONE, partitionResponse1.error)
+    def connectAndVerify(): Unit = {
+      val socket = connect("PLAINTEXT")
+      socket.setSoTimeout(1000)
+      try {
+        val response = sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket)
+        assertEquals(0, response.remaining)
+      } finally {
+        socket.close()
+      }
+    }
 
-    TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed")
-    conns = conns :+ connect(socketServer)
-    // now try one more (should fail)
-    intercept[IOException](sendProduceRequest())
+    // Reduce total broker MaxConnections to 5 at the cluster level
+    val props = new Properties
+    props.put(KafkaConfig.MaxConnectionsProp, "5")
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsProp, "5"))
+    verifyMaxConnections(5, () => connectAndVerify)
 
-    //close one connection
-    conns.head.close()
-    TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed")
-    // send should succeed
-    sendProduceRequest()
+    // Create another listener and verify listener connection limit of 5 for each listener
+    val newListeners = "PLAINTEXT://localhost:0,INTERNAL://localhost:0"
+    props.put(KafkaConfig.ListenersProp, newListeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT")
+    props.put(KafkaConfig.MaxConnectionsProp, "10")
+    props.put("listener.name.internal.max.connections", "5")
+    props.put("listener.name.plaintext.max.connections", "5")
+    reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners))
+    waitForListener("INTERNAL")
+
+    var conns = (connectionCount until 5).map(_ => connect("PLAINTEXT"))
+    conns ++= (5 until 10).map(_ => connect("INTERNAL"))
+    conns.foreach(verifyConnection)
+    conns.foreach(_.close())
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
+
+    // Increase MaxConnections for PLAINTEXT listener to 7 at the broker level
+    val maxConnectionsPlaintext = 7
+    val listenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}"
+    props.put(listenerProp, maxConnectionsPlaintext.toString)
+    reconfigureServers(props, perBrokerConfig = true, (listenerProp, maxConnectionsPlaintext.toString))
+    verifyMaxConnections(maxConnectionsPlaintext, () => connectAndVerify)
+
+    // Verify that connection blocked on the limit connects successfully when an existing connection is closed
+    val plaintextConnections = (connectionCount until maxConnectionsPlaintext).map(_ => connect("PLAINTEXT"))
+    executor = Executors.newSingleThreadExecutor
+    val future = executor.submit(CoreUtils.runnable { createAndVerifyConnection() })
+    Thread.sleep(100)
+    assertFalse(future.isDone)
+    plaintextConnections.head.close()
+    future.get(30, TimeUnit.SECONDS)
+    plaintextConnections.foreach(_.close())
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
+
+    // Verify that connections on inter-broker listener succeed even if broker max connections has been
+    // reached by closing connections on another listener
+    var plaintextConns = (connectionCount until 5).map(_ => connect("PLAINTEXT"))
+    val internalConns = (5 until 10).map(_ => connect("INTERNAL"))
+    plaintextConns.foreach(verifyConnection)
+    internalConns.foreach(verifyConnection)
+    plaintextConns ++= (0 until 2).map(_ => connect("PLAINTEXT"))
+    TestUtils.waitUntilTrue(() => connectionCount <= 10, "Internal connections not closed")
+    plaintextConns.foreach(verifyConnection)
+    intercept[IOException](internalConns.foreach { socket => sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) })
+    plaintextConns.foreach(_.close())
+    internalConns.foreach(_.close())
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
   }
 
   private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = {
+    val initialConnectionCount = connectionCount
     val adminClient = createAdminClient()
     TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get()
     waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2)
     adminClient.close()
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Admin client connection not closed")
+  }
+
+  private def waitForListener(listenerName: String): Unit = {
+    TestUtils.retry(maxWaitMs = 10000) {
+      try {
+        assertTrue(servers.head.socketServer.boundPort(ListenerName.normalised(listenerName)) > 0)
+      } catch {
+        case e: KafkaException => throw new AssertionError(e)
+      }
+    }
   }
 
   private def createAdminClient(): AdminClient = {
@@ -135,12 +201,59 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  private def sendProduceRequest(): ProduceResponse = {
+  private def produceRequest: ProduceRequest = {
     val topicPartition = new TopicPartition(topic, 0)
     val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
     val partitionRecords = Map(topicPartition -> memoryRecords)
-    val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
-    val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer)
-    ProduceResponse.parse(response, request.version)
+    ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
+  }
+
+  def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress)
+
+  def connect(listener: String): Socket = {
+    val listenerName = ListenerName.normalised(listener)
+    new Socket("localhost", servers.head.socketServer.boundPort(listenerName))
+  }
+
+  private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = {
+    val socket = connect(listener)
+    try {
+      verifyConnection(socket)
+    } finally {
+      socket.close()
+    }
+  }
+
+  private def verifyConnection(socket: Socket): Unit = {
+    val request = produceRequest
+    val response = sendAndReceive(request, ApiKeys.PRODUCE, socket)
+    val produceResponse = ProduceResponse.parse(response, request.version)
+    assertEquals(1, produceResponse.responses.size)
+    val (_, partitionResponse) = produceResponse.responses.asScala.head
+    assertEquals(Errors.NONE, partitionResponse.error)
+  }
+
+  private def verifyMaxConnections(maxConnections: Int, connectWithFailure: () => Unit): Unit = {
+    val initialConnectionCount = connectionCount
+
+    //create connections up to maxConnectionsPerIP - 1, leave space for one connection
+    var conns = (connectionCount until (maxConnections - 1)).map(_ => connect("PLAINTEXT"))
+
+    // produce should succeed on a new connection
+    createAndVerifyConnection()
+
+    TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "produce request connection is not closed")
+    conns = conns :+ connect("PLAINTEXT")
+
+    // now try one more (should fail)
+    intercept[IOException](connectWithFailure.apply())
+
+    //close one connection
+    conns.head.close()
+    TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "connection is not closed")
+    createAndVerifyConnection()
+
+    conns.foreach(_.close())
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 45ef18f..5d20da6 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -186,6 +186,12 @@ class DynamicBrokerConfigTest extends JUnitSuite {
     //test invalid address
     verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true,
       expectFailure = true)
+
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = true, expectFailure = false)
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = false, expectFailure = false)
+    val listenerMaxConnectionsProp = s"listener.name.external.${KafkaConfig.MaxConnectionsProp}"
+    verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = true, expectFailure = false)
+    verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = false, expectFailure = false)
   }
 
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {