You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:05:48 UTC

[pulsar] 06/31: [Broker] Fix typo in enum name and handle closing of the channel properly since writeAndFlush is asynchronous (#15384)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f829ca99068736fee5b621e7464553b8d6a11feb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Apr 30 04:57:42 2022 +0300

    [Broker] Fix typo in enum name and handle closing of the channel properly since writeAndFlush is asynchronous (#15384)
    
    (cherry picked from commit cd3816aa351ba8a1f0e9876eefe019b7f0d282d8)
---
 .../pulsar/broker/service/ConnectionController.java      | 16 ++++++++--------
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++-------
 2 files changed, 16 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
index 51540e179be..65c3a6c4f2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -36,7 +36,7 @@ public interface ConnectionController {
      * @param remoteAddress
      * @return
      */
-    Sate increaseConnection(SocketAddress remoteAddress);
+    State increaseConnection(SocketAddress remoteAddress);
 
     /**
      * Decrease the number of connections counter.
@@ -44,7 +44,7 @@ public interface ConnectionController {
      */
     void decreaseConnection(SocketAddress remoteAddress);
 
-    enum Sate {
+    enum State {
         OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
     }
 
@@ -68,13 +68,13 @@ public interface ConnectionController {
         }
 
         @Override
-        public Sate increaseConnection(SocketAddress remoteAddress) {
+        public State increaseConnection(SocketAddress remoteAddress) {
             if (!maxConnectionsLimitEnabled && !maxConnectionsLimitPerIpEnabled) {
-                return Sate.OK;
+                return State.OK;
             }
             if (!(remoteAddress instanceof InetSocketAddress)
                     || !isLegalIpAddress(((InetSocketAddress) remoteAddress).getHostString())) {
-                return Sate.OK;
+                return State.OK;
             }
             lock.lock();
             try {
@@ -88,20 +88,20 @@ public interface ConnectionController {
                 if (maxConnectionsLimitEnabled && totalConnectionNum > maxConnections) {
                     log.info("Reject connect request from {}, because reached the maximum number of connections {}",
                             remoteAddress, totalConnectionNum);
-                    return Sate.REACH_MAX_CONNECTION;
+                    return State.REACH_MAX_CONNECTION;
                 }
                 if (maxConnectionsLimitPerIpEnabled && CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
                     log.info("Reject connect request from {}, because reached the maximum number "
                                     + "of connections per Ip {}",
                             remoteAddress, CONNECTIONS.get(ip).getValue());
-                    return Sate.REACH_MAX_CONNECTION_PER_IP;
+                    return State.REACH_MAX_CONNECTION_PER_IP;
                 }
             } catch (Exception e) {
                 log.error("increase connection failed", e);
             } finally {
                 lock.unlock();
             }
-            return Sate.OK;
+            return State.OK;
         }
 
         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7fa6c9dde8a..10a4a2c412d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -28,6 +28,7 @@ import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
@@ -272,13 +273,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        ConnectionController.Sate sate = connectionController.increaseConnection(remoteAddress);
-        if (!sate.equals(ConnectionController.Sate.OK)) {
-            ctx.channel().writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
-                    sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
-                            ? "Reached the maximum number of connections"
-                            : "Reached the maximum number of connections on address" + remoteAddress));
-            ctx.channel().close();
+        ConnectionController.State state = connectionController.increaseConnection(remoteAddress);
+        if (!state.equals(ConnectionController.State.OK)) {
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
+                            state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                                    ? "Reached the maximum number of connections"
+                                    : "Reached the maximum number of connections on address" + remoteAddress))
+                    .addListener(ChannelFutureListener.CLOSE);
             return;
         }
         log.info("New connection from {}", remoteAddress);