You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:30 UTC
[pulsar] 21/25: Fix batch index filter issue in Consumer. (#7654)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0be16ea566084706ae34b388e7c1d5b0fcca9e4d
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 27 21:12:10 2020 +0800
Fix batch index filter issue in Consumer. (#7654)
### Motivation
Fix batch index filter issue in Consumer. The previous logic is wrong at https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1344, this should be opposite.
(cherry picked from commit e9a0fd1e9415b1d19e877e315261f923a86fe073)
---
.../client/impl/BatchMessageIndexAckTest.java | 23 +++++++++++++++++-----
.../apache/pulsar/client/impl/ConsumerImpl.java | 10 +++++++++-
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 582d461..8f76561 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -84,22 +84,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
}
FutureUtil.waitForAll(futures).get();
+ List<MessageId> acked = new ArrayList<>(50);
for (int i = 0; i < messages; i++) {
+ Message<Integer> msg = consumer.receive();
if (i % 2 == 0) {
- consumer.acknowledge(consumer.receive());
+ consumer.acknowledge(msg);
+ acked.add(msg.getMessageId());
} else {
consumer.negativeAcknowledge(consumer.receive());
}
}
- List<Message<Integer>> received = new ArrayList<>(50);
+ List<MessageId> received = new ArrayList<>(50);
for (int i = 0; i < 50; i++) {
- received.add(consumer.receive());
+ received.add(consumer.receive().getMessageId());
}
Assert.assertEquals(received.size(), 50);
+ acked.retainAll(received);
+ Assert.assertEquals(acked.size(), 0);
- Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
+ for (MessageId messageId : received) {
+ consumer.acknowledge(messageId);
+ }
+
+ Thread.sleep(1000);
+
+ consumer.redeliverUnacknowledgedMessages();
+
+ Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(moreMessage);
futures.clear();
@@ -109,7 +122,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
FutureUtil.waitForAll(futures).get();
for (int i = 0; i < 50; i++) {
- received.add(consumer.receive());
+ received.add(consumer.receive().getMessageId());
}
// Ensure the flow permit is work well since the client skip the acked batch index,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 28995ad..f9546e9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
@@ -1304,6 +1305,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleToDeadLetter = new ArrayList<>();
}
int skippedMessages = 0;
+ BitSetRecyclable ackBitSet = null;
+ if (ackSet != null && ackSet.size() > 0) {
+ ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
+ }
try {
for (int i = 0; i < batchSize; ++i) {
if (log.isDebugEnabled()) {
@@ -1337,7 +1342,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
continue;
}
- if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) {
+ if (ackBitSet != null && !ackBitSet.get(i)) {
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
++skippedMessages;
@@ -1367,6 +1372,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
}
+ if (ackBitSet != null) {
+ ackBitSet.recycle();
+ }
} catch (IOException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);