You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/19 05:59:14 UTC
[pulsar] branch master updated: [improve][broker]add ServerCnx state check before server handle request (#17084)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 694aa135833 [improve][broker]add ServerCnx state check before server handle request (#17084)
694aa135833 is described below
commit 694aa1358333f609a7afc6c00c85a28ea02bf1d9
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Fri Aug 19 13:59:07 2022 +0800
[improve][broker]add ServerCnx state check before server handle request (#17084)
---
.../apache/pulsar/broker/service/ServerCnx.java | 15 +++
.../pulsar/broker/service/ServerCnxTest.java | 135 +++++++++++++++++++++
2 files changed, 150 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a47fc99d824..40ec486133d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -434,6 +434,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleLookup(CommandLookupTopic lookup) {
+ checkArgument(state == State.Connected);
final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.isAuthoritative();
@@ -504,6 +505,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
+ checkArgument(state == State.Connected);
final long requestId = partitionMetadata.getRequestId();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
@@ -580,6 +582,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
+ checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("Received CommandConsumerStats call from {}", remoteAddress);
}
@@ -1988,6 +1991,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+ checkArgument(state == State.Connected);
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
@@ -2076,6 +2080,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
+ checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
if (commandGetSchema.hasSchemaVersion()) {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
@@ -2123,6 +2128,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
+ checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
}
@@ -2158,6 +2164,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleTcClientConnectRequest(CommandTcClientConnectRequest command) {
+ checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
@@ -2219,6 +2226,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
@Override
protected void handleNewTxn(CommandNewTxn command) {
+ checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
@@ -2260,6 +2268,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
+ checkArgument(state == State.Connected);
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
final long requestId = command.getRequestId();
@@ -2297,6 +2306,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleEndTxn(CommandEndTxn command) {
+ checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
@@ -2327,6 +2337,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
+ checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
@@ -2397,6 +2408,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
+ checkArgument(state == State.Connected);
final long requestId = command.getRequestId();
final long txnidMostBits = command.getTxnidMostBits();
final long txnidLeastBits = command.getTxnidLeastBits();
@@ -2503,6 +2515,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
+ checkArgument(state == State.Connected);
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
@@ -2541,6 +2554,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
+ checkArgument(state == State.Connected);
final long requestId = commandWatchTopicList.getRequestId();
final long watcherId = commandWatchTopicList.getWatcherId();
final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
@@ -2590,6 +2604,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) {
+ checkArgument(state == State.Connected);
topicListService.handleWatchTopicListClose(commandWatchTopicListClose);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 363b740c91e..d9a8ce25369 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2153,4 +2153,139 @@ public class ServerCnxTest {
verify(authResponse, times(1)).hasClientVersion();
verify(authResponse, times(0)).getClientVersion();
}
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleLookup() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleLookup(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandlePartitionMetadataRequest() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handlePartitionMetadataRequest(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleConsumerStats() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleConsumerStats(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleGetTopicsOfNamespace() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleGetTopicsOfNamespace(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleGetSchema() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleGetSchema(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleGetOrCreateSchema() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleGetOrCreateSchema(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleTcClientConnectRequest() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleTcClientConnectRequest(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleNewTxn() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleNewTxn(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleAddPartitionToTxn() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleAddPartitionToTxn(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleEndTxn() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleEndTxn(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleEndTxnOnPartition() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleEndTxnOnPartition(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleEndTxnOnSubscription() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleEndTxnOnSubscription(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleAddSubscriptionToTxn() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleAddSubscriptionToTxn(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleCommandWatchTopicList() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleCommandWatchTopicList(any());
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void shouldFailHandleCommandWatchTopicListClose() throws Exception {
+ ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+ Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+ serverCnx.handleCommandWatchTopicListClose(any());
+ }
}