You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 11:11:11 UTC

[pulsar] 01/02: [improve][broker]prevent new connection request when broker is closing (#17314)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 544538ace2bb153b4c4d6bb118e144c0fde4403f
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 1 15:22:26 2022 +0800

    [improve][broker]prevent new connection request when broker is closing (#17314)
---
 .../org/apache/pulsar/broker/service/ServerCnx.java   | 11 +++++++++++
 .../apache/pulsar/broker/service/ServerCnxTest.java   | 19 +++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40ec486133d..0e427436211 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -795,6 +795,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null);
         }
 
+        if (!this.service.getPulsar().isRunning()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed CONNECT from {} due to pulsar service is not ready: {} state", remoteAddress,
+                        this.service.getPulsar().getState().toString());
+            }
+            ctx.writeAndFlush(
+                    Commands.newError(-1, ServerError.ServiceNotReady, "Failed due to pulsar service is not ready"));
+            close();
+            return;
+        }
+
         String clientVersion = connect.getClientVersion();
         int clientProtocolVersion = connect.getProtocolVersion();
         features = new FeatureFlags();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e09087f2a91..fdc32cac750 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2288,4 +2288,23 @@ public class ServerCnxTest {
         stateUpdater.set(serverCnx, ServerCnx.State.Failed);
         serverCnx.handleCommandWatchTopicListClose(any());
     }
+
+    @Test(timeOut = 30000)
+    public void handleConnectWithServiceNotReady() throws Exception {
+        resetChannel();
+        doReturn(false).when(pulsar).isRunning();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // test server response to CONNECT
+        ByteBuf clientCommand = Commands.newConnect("none", "", null);
+        channel.writeInbound(clientCommand);
+
+        assertEquals(serverCnx.getState(), State.Start);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandError);
+        CommandError error = (CommandError) response;
+        assertEquals(error.getError(), ServerError.ServiceNotReady);
+        channel.finish();
+    }
 }