You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/19 05:59:14 UTC

[pulsar] branch master updated: [improve][broker]add ServerCnx state check before server handle request (#17084)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 694aa135833 [improve][broker]add ServerCnx state check before server handle request (#17084)
694aa135833 is described below

commit 694aa1358333f609a7afc6c00c85a28ea02bf1d9
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Fri Aug 19 13:59:07 2022 +0800

    [improve][broker]add ServerCnx state check before server handle request (#17084)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  15 +++
 .../pulsar/broker/service/ServerCnxTest.java       | 135 +++++++++++++++++++++
 2 files changed, 150 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a47fc99d824..40ec486133d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -434,6 +434,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
+        checkArgument(state == State.Connected);
         final long requestId = lookup.getRequestId();
         final boolean authoritative = lookup.isAuthoritative();
 
@@ -504,6 +505,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
+        checkArgument(state == State.Connected);
         final long requestId = partitionMetadata.getRequestId();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
@@ -580,6 +582,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
+        checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
             log.debug("Received CommandConsumerStats call from {}", remoteAddress);
         }
@@ -1988,6 +1991,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+        checkArgument(state == State.Connected);
         final long requestId = commandGetTopicsOfNamespace.getRequestId();
         final String namespace = commandGetTopicsOfNamespace.getNamespace();
         final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
@@ -2076,6 +2080,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+        checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
             if (commandGetSchema.hasSchemaVersion()) {
                 log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
@@ -2123,6 +2128,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
+        checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
             log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
         }
@@ -2158,6 +2164,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleTcClientConnectRequest(CommandTcClientConnectRequest command) {
+        checkArgument(state == State.Connected);
         final long requestId = command.getRequestId();
         final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
@@ -2219,6 +2226,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
+        checkArgument(state == State.Connected);
         final long requestId = command.getRequestId();
         final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
@@ -2260,6 +2268,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
+        checkArgument(state == State.Connected);
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
         final long requestId = command.getRequestId();
@@ -2297,6 +2306,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleEndTxn(CommandEndTxn command) {
+        checkArgument(state == State.Connected);
         final long requestId = command.getRequestId();
         final int txnAction = command.getTxnAction().getValue();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
@@ -2327,6 +2337,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
+        checkArgument(state == State.Connected);
         final long requestId = command.getRequestId();
         final String topic = command.getTopic();
         final int txnAction = command.getTxnAction().getValue();
@@ -2397,6 +2408,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
+        checkArgument(state == State.Connected);
         final long requestId = command.getRequestId();
         final long txnidMostBits = command.getTxnidMostBits();
         final long txnidLeastBits = command.getTxnidLeastBits();
@@ -2503,6 +2515,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
+        checkArgument(state == State.Connected);
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
@@ -2541,6 +2554,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
+        checkArgument(state == State.Connected);
         final long requestId = commandWatchTopicList.getRequestId();
         final long watcherId = commandWatchTopicList.getWatcherId();
         final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
@@ -2590,6 +2604,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) {
+        checkArgument(state == State.Connected);
         topicListService.handleWatchTopicListClose(commandWatchTopicListClose);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 363b740c91e..d9a8ce25369 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2153,4 +2153,139 @@ public class ServerCnxTest {
         verify(authResponse, times(1)).hasClientVersion();
         verify(authResponse, times(0)).getClientVersion();
     }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleLookup() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleLookup(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandlePartitionMetadataRequest() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handlePartitionMetadataRequest(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleConsumerStats() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleConsumerStats(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetTopicsOfNamespace() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetTopicsOfNamespace(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetSchema() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetSchema(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetOrCreateSchema() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetOrCreateSchema(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleTcClientConnectRequest() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleTcClientConnectRequest(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleNewTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleNewTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleAddPartitionToTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleAddPartitionToTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxnOnPartition() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxnOnPartition(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxnOnSubscription() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxnOnSubscription(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleAddSubscriptionToTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleAddSubscriptionToTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleCommandWatchTopicList() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleCommandWatchTopicList(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleCommandWatchTopicListClose() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleCommandWatchTopicListClose(any());
+    }
 }