You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/09/15 03:02:49 UTC

[pulsar] branch master updated: [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 256bd68770d [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
256bd68770d is described below

commit 256bd68770de5e85a51a763cd0d843bc32407169
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 15 11:02:41 2022 +0800

    [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 12 ++++++++++++
 .../org/apache/pulsar/broker/service/ServerCnxTest.java  | 16 ++++++++++++++++
 .../pulsar/broker/service/utils/ClientChannelHelper.java |  6 ++++++
 3 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 70f2e2bc40d..4fe703de878 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -530,6 +530,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             return;
         }
 
+        if (!this.service.getPulsar().isRunning()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed PartitionMetadataLookup from {} for {} "
+                                + "due to pulsar service is not ready: {} state",
+                        partitionMetadata.getTopic(), remoteAddress, requestId,
+                        this.service.getPulsar().getState().toString());
+            }
+            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                    "Failed due to pulsar service is not ready", requestId));
+            return;
+        }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index fdc32cac750..78e994568e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -2307,4 +2308,19 @@ public class ServerCnxTest {
         assertEquals(error.getError(), ServerError.ServiceNotReady);
         channel.finish();
     }
+
+    @Test(timeOut = 30000)
+    public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception {
+        resetChannel();
+        setChannelConnected();
+        doReturn(false).when(pulsar).isRunning();
+        assertTrue(channel.isActive());
+
+        ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1);
+        channel.writeInbound(clientCommand);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandPartitionedTopicMetadataResponse);
+        assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
+        channel.finish();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index a1782278966..afd071a858b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
 import java.util.Queue;
 
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -162,6 +163,11 @@ public class ClientChannelHelper {
         protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) {
             queue.offer(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
         }
+
+        @Override
+        protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
+            queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
+        }
     };
 
 }