You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:10:44 UTC
[pulsar] 03/17: [Java Reader Client] Start reader inside batch
result in read first message in batch. (#6345)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8a64dc6669cec5698bb0e2ef4a7fb75631c0d72c
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Feb 24 12:27:08 2020 +0800
[Java Reader Client] Start reader inside batch result in read first message in batch. (#6345)
Fixes #6344
Fixes #6350
The bug was brought in https://github.com/apache/pulsar/pull/5622 by changing the skip logic wrongly.
(cherry picked from commit 63ccd43e1a3294d696a4af37c76261eed1bb3124)
---
.../client/api/SimpleProducerConsumerTest.java | 38 ++++++++---
.../apache/pulsar/client/api/TopicReaderTest.java | 74 ++++++++++++++++++++--
.../pulsar/client/api/BatchReceivePolicy.java | 2 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 31 +++++++--
5 files changed, 123 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 59910a8..e907197 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -47,6 +47,8 @@ import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -69,8 +71,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -1419,7 +1423,6 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
- * @param batchMessageDelayMs
* @throws Exception
*/
@Test
@@ -1736,7 +1739,6 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
- * @param batchMessageDelayMs
* @throws Exception
*/
@Test
@@ -3219,7 +3221,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
@Test(dataProvider = "variationsForExpectedPos")
- public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
+ public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
@@ -3230,14 +3232,31 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
.enableBatching(batching)
.create();
- MessageId resetPos = null;
+ CountDownLatch latch = new CountDownLatch(numOfMessages);
+
+ final AtomicReference<MessageId> resetPos = new AtomicReference<>();
+
for (int i = 0; i < numOfMessages; i++) {
- MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
- if (resetIndex == i) {
- resetPos = msgId;
- }
+
+ final int j = i;
+
+ producer.sendAsync(String.format("msg num %d", i).getBytes())
+ .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
+ .whenComplete((p, e) -> {
+ if (e != null) {
+ fail("send msg failed due to " + e.getMessage());
+ } else {
+ log.info("send msg with id {}", p.getRight());
+ if (p.getLeft() == resetIndex) {
+ resetPos.set(p.getRight());
+ }
+ }
+ latch.countDown();
+ });
}
+ latch.await();
+
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic(topicName);
@@ -3246,7 +3265,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
- consumer.seek(resetPos);
+ consumer.seek(resetPos.get());
+ log.info("reset cursor to {}", resetPos.get());
Set<String> messageSet = Sets.newHashSet();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 69139b5..afc8871 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -30,12 +31,16 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -699,17 +704,33 @@ public class TopicReaderTest extends ProducerConsumerBase {
.enableBatching(batching)
.create();
- MessageId resetPos = null;
+ CountDownLatch latch = new CountDownLatch(numOfMessages);
+
+ final AtomicReference<MessageId> resetPos = new AtomicReference<>();
+
for (int i = 0; i < numOfMessages; i++) {
- MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
- if (resetIndex == i) {
- resetPos = msgId;
- }
+
+ final int j = i;
+
+ producer.sendAsync(String.format("msg num %d", i).getBytes())
+ .thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
+ .whenComplete((p, e) -> {
+ if (e != null) {
+ fail("send msg failed due to " + e.getMessage());
+ } else {
+ if (p.getLeft() == resetIndex) {
+ resetPos.set(p.getRight());
+ }
+ }
+ latch.countDown();
+ });
}
+ latch.await();
+
ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
.topic(topicName)
- .startMessageId(resetPos);
+ .startMessageId(resetPos.get());
if (startInclusive) {
readerBuilder.startMessageIdInclusive();
@@ -761,4 +782,43 @@ public class TopicReaderTest extends ProducerConsumerBase {
producers.get(i).close();
}
}
+
+ @Test
+ public void testReaderStartInMiddleOfBatch() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
+ final int numOfMessage = 100;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxMessages(10)
+ .create();
+
+ CountDownLatch latch = new CountDownLatch(100);
+
+ List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
+
+ for (int i = 0; i < numOfMessage; i++) {
+ producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
+ if (e != null) {
+ fail();
+ } else {
+ allIds.add(mid);
+ }
+ latch.countDown();
+ });
+ }
+
+ latch.await();
+
+ for (MessageId id : allIds) {
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+ .startMessageId(id).startMessageIdInclusive().create();
+ MessageId idGot = reader.readNext().getMessageId();
+ assertEquals(idGot, id);
+ reader.close();
+ }
+
+ producer.close();
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
index 2b9be71..7a5a1bd 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
@@ -46,7 +46,7 @@ public class BatchReceivePolicy {
/**
* Default batch receive policy.
*
- * <p>Max number of messages: 100
+ * <p>Max number of messages: no limit
* Max number of bytes: 10MB
* Timeout: 100ms<p/>
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 86e7fb2..0f5219b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -464,7 +464,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements TimerTask,
}
protected boolean hasEnoughMessagesForBatchReceive() {
- if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) {
+ if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 08049a7..9431496 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -123,6 +124,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final SubscriptionMode subscriptionMode;
private volatile BatchMessageIdImpl startMessageId;
+ private volatile BatchMessageIdImpl seekMessageId;
+ private final AtomicBoolean duringSeek;
+
private final BatchMessageIdImpl initialStartMessageId;
private final long startMessageRollbackDurationInSec;
@@ -205,6 +209,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
stats = ConsumerStatsDisabled.INSTANCE;
}
+ duringSeek = new AtomicBoolean(false);
+
if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
@@ -667,6 +673,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+
+ if (duringSeek.compareAndSet(true, false)) {
+ return seekMessageId;
+ } else if (subscriptionMode == SubscriptionMode.Durable) {
+ return null;
+ }
+
if (!currentMessageQueue.isEmpty()) {
MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId();
BatchMessageIdImpl previousMessage;
@@ -867,7 +880,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
- if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
+ if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
@@ -1018,7 +1031,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);
- if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) {
+ if (isSameEntry(messageId) && isPriorBatchIndex(i)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
@@ -1091,8 +1104,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}
- private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) {
- return !resetIncludeHead && startMessageId != null
+ private boolean isSameEntry(MessageIdData messageId) {
+ return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
}
@@ -1477,7 +1490,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessage = MessageId.earliest;
+
+ seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
+ duringSeek.set(true);
+
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);
@@ -1520,7 +1536,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessage = messageId;
+
+ seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
+ duringSeek.set(true);
+
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
seekFuture.complete(null);