You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/29 00:50:16 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17167: [feat][proxy] Support proxy limit maximum connections per IP

codelipenghui commented on code in PR #17167:
URL: https://github.com/apache/pulsar/pull/17167#discussion_r956810174


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -144,22 +146,29 @@ public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAdd
         this.dnsAddressResolverGroup = dnsAddressResolverGroup;
         this.state = State.Init;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
+        this.connectionController = proxyService.getConnectionController();
     }
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
         ProxyService.ACTIVE_CONNECTIONS.inc();
-        if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            state = State.Closing;
-            ctx.close();
+        SocketAddress rmAddress = ctx.channel().remoteAddress();
+        ConnectionController.State state = connectionController.increaseConnection(rmAddress);
+        if (!state.equals(ConnectionController.State.OK)) {
+            ctx.writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
+                    state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                            ? "Reached the maximum number of connections"
+                            : "Reached the maximum number of connections on address" + rmAddress))
+                            .addListener(result -> ctx.close());
             ProxyService.REJECTED_CONNECTIONS.inc();
         }
     }
 
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
         super.channelUnregistered(ctx);
+        connectionController.decreaseConnection(ctx.channel().remoteAddress());

Review Comment:
   We should add tests to ensure the connection count of the same remote address will decrease after the connection is closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org