You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:05:48 UTC
[pulsar] 06/31: [Broker] Fix typo in enum name and handle closing of the channel properly since writeAndFlush is asynchronous (#15384)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f829ca99068736fee5b621e7464553b8d6a11feb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Apr 30 04:57:42 2022 +0300
[Broker] Fix typo in enum name and handle closing of the channel properly since writeAndFlush is asynchronous (#15384)
(cherry picked from commit cd3816aa351ba8a1f0e9876eefe019b7f0d282d8)
---
.../pulsar/broker/service/ConnectionController.java | 16 ++++++++--------
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 15 ++++++++-------
2 files changed, 16 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
index 51540e179be..65c3a6c4f2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConnectionController.java
@@ -36,7 +36,7 @@ public interface ConnectionController {
* @param remoteAddress
* @return
*/
- Sate increaseConnection(SocketAddress remoteAddress);
+ State increaseConnection(SocketAddress remoteAddress);
/**
* Decrease the number of connections counter.
@@ -44,7 +44,7 @@ public interface ConnectionController {
*/
void decreaseConnection(SocketAddress remoteAddress);
- enum Sate {
+ enum State {
OK, REACH_MAX_CONNECTION_PER_IP, REACH_MAX_CONNECTION;
}
@@ -68,13 +68,13 @@ public interface ConnectionController {
}
@Override
- public Sate increaseConnection(SocketAddress remoteAddress) {
+ public State increaseConnection(SocketAddress remoteAddress) {
if (!maxConnectionsLimitEnabled && !maxConnectionsLimitPerIpEnabled) {
- return Sate.OK;
+ return State.OK;
}
if (!(remoteAddress instanceof InetSocketAddress)
|| !isLegalIpAddress(((InetSocketAddress) remoteAddress).getHostString())) {
- return Sate.OK;
+ return State.OK;
}
lock.lock();
try {
@@ -88,20 +88,20 @@ public interface ConnectionController {
if (maxConnectionsLimitEnabled && totalConnectionNum > maxConnections) {
log.info("Reject connect request from {}, because reached the maximum number of connections {}",
remoteAddress, totalConnectionNum);
- return Sate.REACH_MAX_CONNECTION;
+ return State.REACH_MAX_CONNECTION;
}
if (maxConnectionsLimitPerIpEnabled && CONNECTIONS.get(ip).getValue() > maxConnectionPerIp) {
log.info("Reject connect request from {}, because reached the maximum number "
+ "of connections per Ip {}",
remoteAddress, CONNECTIONS.get(ip).getValue());
- return Sate.REACH_MAX_CONNECTION_PER_IP;
+ return State.REACH_MAX_CONNECTION_PER_IP;
}
} catch (Exception e) {
log.error("increase connection failed", e);
} finally {
lock.unlock();
}
- return Sate.OK;
+ return State.OK;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7fa6c9dde8a..10a4a2c412d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -28,6 +28,7 @@ import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
@@ -272,13 +273,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- ConnectionController.Sate sate = connectionController.increaseConnection(remoteAddress);
- if (!sate.equals(ConnectionController.Sate.OK)) {
- ctx.channel().writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
- sate.equals(ConnectionController.Sate.REACH_MAX_CONNECTION)
- ? "Reached the maximum number of connections"
- : "Reached the maximum number of connections on address" + remoteAddress));
- ctx.channel().close();
+ ConnectionController.State state = connectionController.increaseConnection(remoteAddress);
+ if (!state.equals(ConnectionController.State.OK)) {
+ ctx.writeAndFlush(Commands.newError(-1, ServerError.NotAllowedError,
+ state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of connections"
+ : "Reached the maximum number of connections on address" + remoteAddress))
+ .addListener(ChannelFutureListener.CLOSE);
return;
}
log.info("New connection from {}", remoteAddress);