You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:10:44 UTC

[pulsar] 03/17: [Java Reader Client] Start reader inside batch result in read first message in batch. (#6345)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8a64dc6669cec5698bb0e2ef4a7fb75631c0d72c
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Feb 24 12:27:08 2020 +0800

    [Java Reader Client] Start reader inside batch result in read first message in batch. (#6345)
    
    Fixes #6344
    Fixes #6350
    
    The bug was brought in https://github.com/apache/pulsar/pull/5622 by changing the skip logic wrongly.
    
    (cherry picked from commit 63ccd43e1a3294d696a4af37c76261eed1bb3124)
---
 .../client/api/SimpleProducerConsumerTest.java     | 38 ++++++++---
 .../apache/pulsar/client/api/TopicReaderTest.java  | 74 ++++++++++++++++++++--
 .../pulsar/client/api/BatchReceivePolicy.java      |  2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 31 +++++++--
 5 files changed, 123 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 59910a8..e907197 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -47,6 +47,8 @@ import java.nio.file.Paths;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -69,8 +71,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import lombok.Cleanup;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -1419,7 +1423,6 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
      * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
      *
      *
-     * @param batchMessageDelayMs
      * @throws Exception
      */
     @Test
@@ -1736,7 +1739,6 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
      * Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
      *
      *
-     * @param batchMessageDelayMs
      * @throws Exception
      */
     @Test
@@ -3219,7 +3221,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     }
 
     @Test(dataProvider = "variationsForExpectedPos")
-    public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
+    public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
             throws Exception {
         final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
         final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
@@ -3230,14 +3232,31 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
                 .enableBatching(batching)
                 .create();
 
-        MessageId resetPos = null;
+        CountDownLatch latch = new CountDownLatch(numOfMessages);
+
+        final AtomicReference<MessageId> resetPos = new AtomicReference<>();
+
         for (int i = 0; i < numOfMessages; i++) {
-            MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
-            if (resetIndex == i) {
-                resetPos = msgId;
-            }
+
+            final int j = i;
+
+            producer.sendAsync(String.format("msg num %d", i).getBytes())
+                    .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
+                    .whenComplete((p, e) -> {
+                        if (e != null) {
+                            fail("send msg failed due to " + e.getMessage());
+                        } else {
+                            log.info("send msg with id {}", p.getRight());
+                            if (p.getLeft() == resetIndex) {
+                                resetPos.set(p.getRight());
+                            }
+                        }
+                        latch.countDown();
+                    });
         }
 
+        latch.await();
+
         ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
                 .topic(topicName);
 
@@ -3246,7 +3265,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
 
         Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
-        consumer.seek(resetPos);
+        consumer.seek(resetPos.get());
+        log.info("reset cursor to {}", resetPos.get());
         Set<String> messageSet = Sets.newHashSet();
         for (int i = firstMessage; i < numOfMessages; i++) {
             Message<byte[]> message = consumer.receive();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 69139b5..afc8871 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -30,12 +31,16 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ReaderImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -699,17 +704,33 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .enableBatching(batching)
                 .create();
 
-        MessageId resetPos = null;
+        CountDownLatch latch = new CountDownLatch(numOfMessages);
+
+        final AtomicReference<MessageId> resetPos = new AtomicReference<>();
+
         for (int i = 0; i < numOfMessages; i++) {
-            MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
-            if (resetIndex == i) {
-                resetPos = msgId;
-            }
+
+            final int j = i;
+
+            producer.sendAsync(String.format("msg num %d", i).getBytes())
+                    .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
+                    .whenComplete((p, e) -> {
+                        if (e != null) {
+                            fail("send msg failed due to " + e.getMessage());
+                        } else {
+                            if (p.getLeft() == resetIndex) {
+                                resetPos.set(p.getRight());
+                            }
+                        }
+                        latch.countDown();
+                    });
         }
 
+        latch.await();
+
         ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
                 .topic(topicName)
-                .startMessageId(resetPos);
+                .startMessageId(resetPos.get());
 
         if (startInclusive) {
             readerBuilder.startMessageIdInclusive();
@@ -761,4 +782,43 @@ public class TopicReaderTest extends ProducerConsumerBase {
             producers.get(i).close();
         }
     }
+
+    @Test
+    public void testReaderStartInMiddleOfBatch() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
+        final int numOfMessage = 100;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxMessages(10)
+                .create();
+
+        CountDownLatch latch = new CountDownLatch(100);
+
+        List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
+                if (e != null) {
+                    fail();
+                } else {
+                    allIds.add(mid);
+                }
+                latch.countDown();
+            });
+        }
+
+        latch.await();
+
+        for (MessageId id : allIds) {
+            Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                    .startMessageId(id).startMessageIdInclusive().create();
+            MessageId idGot = reader.readNext().getMessageId();
+            assertEquals(idGot, id);
+            reader.close();
+        }
+
+        producer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
index 2b9be71..7a5a1bd 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
@@ -46,7 +46,7 @@ public class BatchReceivePolicy {
     /**
      * Default batch receive policy.
      *
-     * <p>Max number of messages: 100
+     * <p>Max number of messages: no limit
      * Max number of bytes: 10MB
      * Timeout: 100ms<p/>
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 86e7fb2..0f5219b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -464,7 +464,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements TimerTask,
     }
 
     protected boolean hasEnoughMessagesForBatchReceive() {
-        if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) {
+        if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
             return false;
         }
         return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 08049a7..9431496 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -123,6 +124,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final SubscriptionMode subscriptionMode;
     private volatile BatchMessageIdImpl startMessageId;
 
+    private volatile BatchMessageIdImpl seekMessageId;
+    private final AtomicBoolean duringSeek;
+
     private final BatchMessageIdImpl initialStartMessageId;
     private final long startMessageRollbackDurationInSec;
 
@@ -205,6 +209,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             stats = ConsumerStatsDisabled.INSTANCE;
         }
 
+        duringSeek = new AtomicBoolean(false);
+
         if (conf.getAckTimeoutMillis() != 0) {
             if (conf.getTickDurationMillis() > 0) {
                 this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
@@ -667,6 +673,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
         INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+
+        if (duringSeek.compareAndSet(true, false)) {
+            return seekMessageId;
+        } else if (subscriptionMode == SubscriptionMode.Durable) {
+            return null;
+        }
+
         if (!currentMessageQueue.isEmpty()) {
             MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
             BatchMessageIdImpl previousMessage;
@@ -867,7 +880,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
 
-            if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+            if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
                 // We need to discard entries that were prior to startMessageId
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
@@ -1018,7 +1031,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
                         singleMessageMetadataBuilder, i, batchSize);
 
-                if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) {
+                if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
                     // If we are receiving a batch message, we need to discard messages that were prior
                     // to the startMessageId
                     if (log.isDebugEnabled()) {
@@ -1091,8 +1104,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
     }
 
-    private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) {
-        return !resetIncludeHead && startMessageId != null
+    private boolean isSameEntry(MessageIdData messageId) {
+        return startMessageId != null
                 && messageId.getLedgerId() == startMessageId.getLedgerId()
                 && messageId.getEntryId() == startMessageId.getEntryId();
     }
@@ -1477,7 +1490,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
             acknowledgmentsGroupingTracker.flushAndClean();
-            lastDequeuedMessage = MessageId.earliest;
+
+            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
+            duringSeek.set(true);
+
             incomingMessages.clear();
             INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
             seekFuture.complete(null);
@@ -1520,7 +1536,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
             acknowledgmentsGroupingTracker.flushAndClean();
-            lastDequeuedMessage = messageId;
+
+            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
+            duringSeek.set(true);
+
             incomingMessages.clear();
             INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
             seekFuture.complete(null);