You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 11:11:10 UTC

[pulsar] branch branch-2.11 updated (aff59a7ed19 -> eb5f5b001f0)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from aff59a7ed19 [fix][broker] Extract additional servlets to the default directory by… (#17477)
     new 544538ace2b [improve][broker]prevent new connection request when broker is closing (#17314)
     new eb5f5b001f0 [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/service/ServerCnx.java    | 23 ++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       | 35 ++++++++++++++++++++++
 .../broker/service/utils/ClientChannelHelper.java  |  6 ++++
 3 files changed, 64 insertions(+)


[pulsar] 02/02: [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb5f5b001f0fad10b1e18bc48bbe1861db139799
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 15 11:02:41 2022 +0800

    [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 12 ++++++++++++
 .../org/apache/pulsar/broker/service/ServerCnxTest.java  | 16 ++++++++++++++++
 .../pulsar/broker/service/utils/ClientChannelHelper.java |  6 ++++++
 3 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0e427436211..e3df87eeb02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -517,6 +517,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             return;
         }
 
+        if (!this.service.getPulsar().isRunning()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed PartitionMetadataLookup from {} for {} "
+                                + "due to pulsar service is not ready: {} state",
+                        partitionMetadata.getTopic(), remoteAddress, requestId,
+                        this.service.getPulsar().getState().toString());
+            }
+            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    "Failed due to pulsar service is not ready", requestId));
+            return;
+        }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index fdc32cac750..78e994568e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -2307,4 +2308,19 @@ public class ServerCnxTest {
         assertEquals(error.getError(), ServerError.ServiceNotReady);
         channel.finish();
     }
+
+    @Test(timeOut = 30000)
+    public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception {
+        resetChannel();
+        setChannelConnected();
+        doReturn(false).when(pulsar).isRunning();
+        assertTrue(channel.isActive());
+
+        ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1);
+        channel.writeInbound(clientCommand);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandPartitionedTopicMetadataResponse);
+        assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
+        channel.finish();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index a1782278966..afd071a858b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
 import java.util.Queue;
 
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -162,6 +163,11 @@ public class ClientChannelHelper {
         protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) {
             queue.offer(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
         }
+
+        @Override
+        protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
+            queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
+        }
     };
 
 }


[pulsar] 01/02: [improve][broker]prevent new connection request when broker is closing (#17314)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 544538ace2bb153b4c4d6bb118e144c0fde4403f
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 1 15:22:26 2022 +0800

    [improve][broker]prevent new connection request when broker is closing (#17314)
---
 .../org/apache/pulsar/broker/service/ServerCnx.java   | 11 +++++++++++
 .../apache/pulsar/broker/service/ServerCnxTest.java   | 19 +++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40ec486133d..0e427436211 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -795,6 +795,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null);
         }
 
+        if (!this.service.getPulsar().isRunning()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed CONNECT from {} due to pulsar service is not ready: {} state", remoteAddress,
+                        this.service.getPulsar().getState().toString());
+            }
+            ctx.writeAndFlush(
+                    Commands.newError(-1, ServerError.ServiceNotReady, "Failed due to pulsar service is not ready"));
+            close();
+            return;
+        }
+
         String clientVersion = connect.getClientVersion();
         int clientProtocolVersion = connect.getProtocolVersion();
         features = new FeatureFlags();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e09087f2a91..fdc32cac750 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2288,4 +2288,23 @@ public class ServerCnxTest {
         stateUpdater.set(serverCnx, ServerCnx.State.Failed);
         serverCnx.handleCommandWatchTopicListClose(any());
     }
+
+    @Test(timeOut = 30000)
+    public void handleConnectWithServiceNotReady() throws Exception {
+        resetChannel();
+        doReturn(false).when(pulsar).isRunning();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // test server response to CONNECT
+        ByteBuf clientCommand = Commands.newConnect("none", "", null);
+        channel.writeInbound(clientCommand);
+
+        assertEquals(serverCnx.getState(), State.Start);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandError);
+        CommandError error = (CommandError) response;
+        assertEquals(error.getError(), ServerError.ServiceNotReady);
+        channel.finish();
+    }
 }