You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 11:11:11 UTC
[pulsar] 01/02: [improve][broker]prevent new connection request when broker is closing (#17314)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 544538ace2bb153b4c4d6bb118e144c0fde4403f
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 1 15:22:26 2022 +0800
[improve][broker]prevent new connection request when broker is closing (#17314)
---
.../org/apache/pulsar/broker/service/ServerCnx.java | 11 +++++++++++
.../apache/pulsar/broker/service/ServerCnxTest.java | 19 +++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40ec486133d..0e427436211 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -795,6 +795,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null);
}
+ if (!this.service.getPulsar().isRunning()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed CONNECT from {} due to pulsar service is not ready: {} state", remoteAddress,
+ this.service.getPulsar().getState().toString());
+ }
+ ctx.writeAndFlush(
+ Commands.newError(-1, ServerError.ServiceNotReady, "Failed due to pulsar service is not ready"));
+ close();
+ return;
+ }
+
String clientVersion = connect.getClientVersion();
int clientProtocolVersion = connect.getProtocolVersion();
features = new FeatureFlags();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e09087f2a91..fdc32cac750 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2288,4 +2288,23 @@ public class ServerCnxTest {
stateUpdater.set(serverCnx, ServerCnx.State.Failed);
serverCnx.handleCommandWatchTopicListClose(any());
}
+
+ @Test(timeOut = 30000)
+ public void handleConnectWithServiceNotReady() throws Exception {
+ resetChannel();
+ doReturn(false).when(pulsar).isRunning();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // test server response to CONNECT
+ ByteBuf clientCommand = Commands.newConnect("none", "", null);
+ channel.writeInbound(clientCommand);
+
+ assertEquals(serverCnx.getState(), State.Start);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandError);
+ CommandError error = (CommandError) response;
+ assertEquals(error.getError(), ServerError.ServiceNotReady);
+ channel.finish();
+ }
}