You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/03/16 08:54:40 UTC

[pulsar] 01/01: [fix][client] moving get sequenceId into the sync code segment (#17836)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch fix_deduplication
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 17b3f2aa97e2391a566a2363295e13551d7c6cb2
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Dec 13 15:26:03 2022 +0800

    [fix][client] moving get sequenceId into the sync code segment (#17836)
    
    When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id.
    The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null`
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409
    And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`.
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560
    For example:
    We send 4 messages (msg1, msg2, msg3, msg4)  to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block.
    And then the msg3 with sequence ID 2 will never be persistent successfully.
    Add a method to update `sequenceId` and move the method in the sync code.
    Via https://github.com/apache/pulsar/pull/16196 we should update message metadata before computing the message size.
    
    (cherry picked from commit 7e258aff76fd02b8ea08fcf26d161313a82e23c2)
---
 .../pulsar/client/api/ClientDeduplicationTest.java | 50 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 29 ++++++-------
 2 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index c8acc7d46f8..0ac0440a7a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -28,6 +28,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -373,4 +376,51 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
+        int totalMessage = 200;
+        int threadSize = 5;
+        String topicName = "subscription";
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+        conf.setBrokerDeduplicationEnabled(true);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        //Send messages in multiple-thread
+        for (int i = 0; i < threadSize; i++) {
+            executorService.submit(() -> {
+                try {
+                    for (int j = 0; j < totalMessage; j++) {
+                        //The message will be sent with out-of-order sequence ID.
+                        producer.newMessage().sendAsync();
+                    }
+                } catch (Exception e) {
+                    log.error("Failed to send/ack messages with transaction.", e);
+                } finally {
+                    countDownLatch.countDown();
+                }
+            });
+        }
+        //wait the all send op is executed and store its futures in the arraylist.
+        countDownLatch.await();
+
+        for (int i = 0; i < threadSize * totalMessage; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6f7d7e6a148..3969d1b2afe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -101,7 +101,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     // Producer id, used to identify a producer within a single connection
     protected final long producerId;
 
-    // Variable is used through the atomic updater
+    // Variable is updated in a synchronized block
     private volatile long msgIdGenerator;
 
     private final OpSendMsgQueue pendingMessages;
@@ -169,10 +169,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private boolean errorState;
 
-    @SuppressWarnings("rawtypes")
-    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
-            .newUpdater(ProducerImpl.class, "msgIdGenerator");
-
     public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                         CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                         ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
@@ -489,7 +485,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
         // into chunks.
-        final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
+        updateMessageMetadata(msgMetadata, uncompressedSize);
 
         // send in chunks
         int totalChunks;
@@ -529,6 +525,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         try {
             synchronized (this) {
                 int readStartIndex = 0;
+                final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
                 ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
                 byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
@@ -570,15 +567,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
      * @param uncompressedSize
      * @return the sequence id
      */
-    private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
-        final long sequenceId;
-        if (!msgMetadata.hasSequenceId()) {
-            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-            msgMetadata.setSequenceId(sequenceId);
-        } else {
-            sequenceId = msgMetadata.getSequenceId();
-        }
-
+    private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
         if (!msgMetadata.hasPublishTime()) {
             msgMetadata.setPublishTime(client.getClientClock().millis());
 
@@ -592,6 +581,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             msgMetadata.setUncompressedSize(uncompressedSize);
         }
+    }
+
+    private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) {
+        final long sequenceId;
+        if (!msgMetadata.hasSequenceId()) {
+            sequenceId = msgIdGenerator++;
+            msgMetadata.setSequenceId(sequenceId);
+        } else {
+            sequenceId = msgMetadata.getSequenceId();
+        }
         return sequenceId;
     }