You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/12/16 18:26:06 UTC
[pulsar] branch master updated: [fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3bb052a5747 [fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)
3bb052a5747 is described below
commit 3bb052a5747f96b1b699509e0a14cb335dd804e7
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Dec 16 19:25:56 2022 +0100
[fix][txn] producer may reject a message from client but actually writing it to the topic (#18913)
---
.../java/org/apache/pulsar/broker/service/Producer.java | 4 +++-
.../pulsar/broker/service/PersistentTopicTest.java | 17 +++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e7034391b14..bc101e31d27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -800,7 +800,9 @@ public class Producer {
public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
- checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null);
+ if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) {
+ return;
+ }
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 909d6031928..adb31f2d1a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -2363,4 +2364,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), namespaceClusters);
}
+
+ @Test
+ public void testSendProducerTxnPrechecks() throws Exception {
+ PersistentTopic topic = mock(PersistentTopic.class);
+ String role = "appid1";
+ Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+ role, false, null, SchemaVersion.Latest, 0, true,
+ ProducerAccessMode.Shared, Optional.empty(), true);
+ producer1.close(false).get();
+ producer1.publishTxnMessage(
+ new TxnID(1L, 0L),
+ 1, 1, 1, null, 1, false, false
+ );
+ verify(topic, times(0)).publishTxnMessage(any(), any(), any());
+ }
+
}