You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/09/15 03:02:49 UTC
[pulsar] branch master updated: [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 256bd68770d [improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
256bd68770d is described below
commit 256bd68770de5e85a51a763cd0d843bc32407169
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 15 11:02:41 2022 +0800
[improve][broker]prevent partitioned metadata lookup request when broker is closing (#17315)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 12 ++++++++++++
.../org/apache/pulsar/broker/service/ServerCnxTest.java | 16 ++++++++++++++++
.../pulsar/broker/service/utils/ClientChannelHelper.java | 6 ++++++
3 files changed, 34 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 70f2e2bc40d..4fe703de878 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -530,6 +530,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
return;
}
+ if (!this.service.getPulsar().isRunning()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed PartitionMetadataLookup from {} for {} "
+ + "due to pulsar service is not ready: {} state",
+ partitionMetadata.getTopic(), remoteAddress, requestId,
+ this.service.getPulsar().getState().toString());
+ }
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+ "Failed due to pulsar service is not ready", requestId));
+ return;
+ }
+
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index fdc32cac750..78e994568e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -2307,4 +2308,19 @@ public class ServerCnxTest {
assertEquals(error.getError(), ServerError.ServiceNotReady);
channel.finish();
}
+
+ @Test(timeOut = 30000)
+ public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception {
+ resetChannel();
+ setChannelConnected();
+ doReturn(false).when(pulsar).isRunning();
+ assertTrue(channel.isActive());
+
+ ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1);
+ channel.writeInbound(clientCommand);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandPartitionedTopicMetadataResponse);
+ assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
+ channel.finish();
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index a1782278966..afd071a858b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -162,6 +163,11 @@ public class ClientChannelHelper {
protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) {
queue.offer(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
}
+
+ @Override
+ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
+ queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
+ }
};
}