You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/12/16 18:26:06 UTC

[pulsar] branch master updated: [fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb052a5747 [fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)
3bb052a5747 is described below

commit 3bb052a5747f96b1b699509e0a14cb335dd804e7
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Dec 16 19:25:56 2022 +0100

    [fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)
---
 .../java/org/apache/pulsar/broker/service/Producer.java |  4 +++-
 .../pulsar/broker/service/PersistentTopicTest.java      | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e7034391b14..bc101e31d27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -800,7 +800,9 @@ public class Producer {
 
     public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
                                   ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
-        checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null);
+        if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) {
+            return;
+        }
         MessagePublishContext messagePublishContext =
                 MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
                         headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 909d6031928..adb31f2d1a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -2363,4 +2364,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         topic.initialize();
         assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), namespaceClusters);
     }
+
+    @Test
+    public void testSendProducerTxnPrechecks() throws Exception {
+        PersistentTopic topic = mock(PersistentTopic.class);
+        String role = "appid1";
+        Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null, SchemaVersion.Latest, 0, true,
+                ProducerAccessMode.Shared, Optional.empty(), true);
+        producer1.close(false).get();
+        producer1.publishTxnMessage(
+                new TxnID(1L, 0L),
+                1, 1, 1, null, 1, false, false
+        );
+        verify(topic, times(0)).publishTxnMessage(any(), any(), any());
+    }
+
 }