You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/15 08:40:38 UTC
[pulsar] 03/03: [ServerCnx] Close connection after receiving unexpected SendCommand (#12780)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cad93713f7267b420784446a775c2caaaa036619
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Sat Nov 13 17:09:39 2021 -0600
[ServerCnx] Close connection after receiving unexpected SendCommand (#12780)
(cherry picked from commit ba5809553344f074c5dce15618a70f3b20d368c7)
---
.../apache/pulsar/broker/service/ServerCnx.java | 4 +++-
.../pulsar/broker/service/ServerCnxTest.java | 22 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5be21cc..7762c22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1352,7 +1352,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
- log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
+ log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
+ remoteAddress, send.getProducerId());
+ close();
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2dc57b5..3d3a30e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -649,6 +649,28 @@ public class ServerCnxTest {
}
@Test(timeOut = 30000)
+ public void testSendCommandBeforeCreatingProducer() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ // test SEND before producer is created
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setPublishTime(System.currentTimeMillis())
+ .setProducerName("prod-name")
+ .setSequenceId(0);
+ ByteBuf data = Unpooled.buffer(1024);
+
+ ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
+ ChecksumType.None, messageMetadata, data));
+ channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
+ clientCommand.release();
+
+ // Then expect channel to close
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
public void testUseSameProducerName() throws Exception {
resetChannel();
setChannelConnected();