You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/13 07:26:10 UTC

[pulsar] branch master updated: [fix][client] moving get sequenceId into the sync code segment (#17836)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e258aff76f [fix][client] moving get sequenceId into the sync code segment (#17836)
7e258aff76f is described below

commit 7e258aff76fd02b8ea08fcf26d161313a82e23c2
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Dec 13 15:26:03 2022 +0800

    [fix][client] moving get sequenceId into the sync code segment (#17836)
    
    ### Motivation
    When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id.
    The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null`
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409
    And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`.
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490
    https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560
    For example:
    We send 4 messages (msg1, msg2, msg3, msg4)  to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block.
    And then the msg3 with sequence ID 2 will never be persistent successfully.
    ### Modification
    Add a method to update `sequenceId` and move the method in the sync code.
    Via https://github.com/apache/pulsar/pull/16196 we should update message metadata before computing the message size.
---
 .../pulsar/client/api/ClientDeduplicationTest.java | 50 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 34 ++++++++-------
 2 files changed, 68 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 168886f733a..d2f9617a5fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -27,6 +27,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -372,4 +375,51 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
+        int totalMessage = 200;
+        int threadSize = 5;
+        String topicName = "subscription";
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+        conf.setBrokerDeduplicationEnabled(true);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        //Send messages in multiple-thread
+        for (int i = 0; i < threadSize; i++) {
+            executorService.submit(() -> {
+                try {
+                    for (int j = 0; j < totalMessage; j++) {
+                        //The message will be sent with out-of-order sequence ID.
+                        producer.newMessage().sendAsync();
+                    }
+                } catch (Exception e) {
+                    log.error("Failed to send/ack messages with transaction.", e);
+                } finally {
+                    countDownLatch.countDown();
+                }
+            });
+        }
+        //wait the all send op is executed and store its futures in the arraylist.
+        countDownLatch.await();
+
+        for (int i = 0; i < threadSize * totalMessage; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index dceaf502141..ef3ead790bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -101,7 +101,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     // Producer id, used to identify a producer within a single connection
     protected final long producerId;
 
-    // Variable is used through the atomic updater
+    // Variable is updated in a synchronized block
     private volatile long msgIdGenerator;
 
     private final OpSendMsgQueue pendingMessages;
@@ -169,10 +169,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private boolean errorState;
 
-    @SuppressWarnings("rawtypes")
-    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
-            .newUpdater(ProducerImpl.class, "msgIdGenerator");
-
     public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                         CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                         ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
@@ -487,7 +483,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
         // into chunks.
-        final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
+        updateMessageMetadata(msgMetadata, uncompressedSize);
 
         // send in chunks
         int totalChunks;
@@ -527,7 +523,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         try {
             int readStartIndex = 0;
-            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
             ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
             byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
                     ? msg.getMessageBuilder().getSchemaVersion() : null;
@@ -555,6 +550,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     return;
                 }
                 synchronized (this) {
+                    // Update the message metadata before computing the payload chunk size
+                    // to avoid a large message cannot be split into chunks.
+                    final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
+                    String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+
                     serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
                             readStartIndex, payloadChunkSize, compressedPayload, compressed,
                             compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
@@ -577,15 +577,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
      * @param uncompressedSize
      * @return the sequence id
      */
-    private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
-        final long sequenceId;
-        if (!msgMetadata.hasSequenceId()) {
-            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
-            msgMetadata.setSequenceId(sequenceId);
-        } else {
-            sequenceId = msgMetadata.getSequenceId();
-        }
-
+    private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
         if (!msgMetadata.hasPublishTime()) {
             msgMetadata.setPublishTime(client.getClientClock().millis());
 
@@ -599,6 +591,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             msgMetadata.setUncompressedSize(uncompressedSize);
         }
+    }
+
+    private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) {
+        final long sequenceId;
+        if (!msgMetadata.hasSequenceId()) {
+            sequenceId = msgIdGenerator++;
+            msgMetadata.setSequenceId(sequenceId);
+        } else {
+            sequenceId = msgMetadata.getSequenceId();
+        }
         return sequenceId;
     }