You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/10/06 13:05:19 UTC
[pulsar] branch master updated: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 83309edab53 [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
83309edab53 is described below
commit 83309edab53e090c4126dca8a46b6a5499a3b257
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 6 21:05:10 2022 +0800
[refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
---
.../client/impl/ConsumerAckResponseTest.java | 100 --------
.../apache/pulsar/client/impl/ConsumerAckTest.java | 256 +++++++++++++++++++++
.../pulsar/client/impl/BatchMessageIdImpl.java | 6 +
.../PersistentAcknowledgmentsGroupingTracker.java | 139 +++++------
4 files changed, 333 insertions(+), 168 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
deleted file mode 100644
index f86bbabdd88..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.fail;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker-impl")
-public class ConsumerAckResponseTest extends ProducerConsumerBase {
-
- private TransactionImpl transaction;
-
- @BeforeClass(alwaysRun = true)
- public void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- transaction = mock(TransactionImpl.class);
- doReturn(1L).when(transaction).getTxnIdLeastBits();
- doReturn(1L).when(transaction).getTxnIdMostBits();
- doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
- CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
- doNothing().when(transaction).registerAckOp(any());
- doReturn(true).when(transaction).checkIfOpen(any());
- doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
-
- Thread.sleep(1000 * 3);
- }
-
- @AfterClass(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Test
- public void testAckResponse() throws PulsarClientException, InterruptedException {
- String topic = "testAckResponse";
- @Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
- @Cleanup
- ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("sub")
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscribe();
- producer.send(1);
- producer.send(2);
- try {
- consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
- fail();
- } catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
- }
- Message<Integer> message = consumer.receive();
-
- try {
- consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
- fail();
- } catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
- }
- }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
new file mode 100644
index 00000000000..ea2815641f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConsumerAckTest extends ProducerConsumerBase {
+
+ private TransactionImpl transaction;
+ private PulsarClient clientWithStats;
+
+ @BeforeClass(alwaysRun = true)
+ public void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ this.clientWithStats = newPulsarClient(lookupUrl.toString(), 30);
+ transaction = mock(TransactionImpl.class);
+ doReturn(1L).when(transaction).getTxnIdLeastBits();
+ doReturn(1L).when(transaction).getTxnIdMostBits();
+ doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
+ CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
+ doNothing().when(transaction).registerAckOp(any());
+ doReturn(true).when(transaction).checkIfOpen(any());
+ doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ this.clientWithStats.close();
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testAckResponse() throws PulsarClientException, InterruptedException {
+ String topic = "testAckResponse";
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ @Cleanup
+ ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscribe();
+ producer.send(1);
+ producer.send(2);
+ try {
+ consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
+ fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
+ }
+ Message<Integer> message = consumer.receive();
+
+ try {
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+ fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
+ }
+ }
+
+ @Test
+ public void testIndividualAck() throws Exception {
+ @Cleanup AckTestData data = prepareDataForAck("test-individual-ack");
+ for (MessageId messageId : data.messageIds) {
+ data.consumer.acknowledge(messageId);
+ }
+ assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds);
+ assertEquals(data.consumer.getStats().getNumAcksSent(), data.size());
+ assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+ }
+
+ @Test
+ public void testIndividualAckList() throws Exception {
+ @Cleanup AckTestData data = prepareDataForAck("test-individual-ack-list");
+ data.consumer.acknowledge(data.messageIds);
+ assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds);
+ assertEquals(data.consumer.getStats().getNumAcksSent(), data.size());
+ assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+ }
+
+ @Test
+ public void testCumulativeAck() throws Exception {
+ @Cleanup AckTestData data = prepareDataForAck("test-cumulative-ack");
+ System.out.println(data.size());
+ data.consumer.acknowledgeCumulative(data.messageIds.get(data.size() - 1));
+ assertEquals(data.interceptor.cumulativeAckedMessageIdList.get(0),
+ data.messageIds.get(data.messageIds.size() - 1));
+ assertEquals(data.consumer.getStats().getNumAcksSent(), 2);
+ assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+ }
+
+ // Send 1 non-batched message, then send N-1 messages that are in the same batch
+ private AckTestData prepareDataForAck(String topic) throws PulsarClientException {
+ final int numMessages = 10;
+ @Cleanup Producer<String> batchProducer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(numMessages - 1)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .create();
+ @Cleanup Producer<String> nonBatchProducer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ AckStatsInterceptor interceptor = new AckStatsInterceptor();
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>) clientWithStats.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionName("sub").intercept(interceptor).ackTimeout(10, TimeUnit.SECONDS).subscribe();
+
+ nonBatchProducer.send("msg-0");
+ for (int i = 1; i < numMessages; i++) {
+ batchProducer.sendAsync("msg-" + i);
+ }
+ List<MessageId> messageIds = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(message);
+ messageIds.add(message.getMessageId());
+ }
+ MessageId firstEntryMessageId = messageIds.get(0);
+ MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
+ // Verify messages 2 to N must be in the same entry
+ for (int i = 2; i < messageIds.size(); i++) {
+ assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
+ }
+
+ assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
+ assertTrue(interceptor.cumulativeAckedMessageIdList.isEmpty());
+ assertEquals(consumer.getStats().getNumAcksSent(), 0);
+ assertNotNull(consumer.getUnAckedMessageTracker().messageIdPartitionMap);
+ assertEquals(consumer.getUnAckedMessageTracker().messageIdPartitionMap.keySet(),
+ Sets.newHashSet(firstEntryMessageId, secondEntryMessageId));
+ return new AckTestData(consumer, interceptor, messageIds);
+ }
+
+ // Send 10 messages, the 1st message is a non-batched message, the other messages are in the same batch
+ @AllArgsConstructor
+ private static class AckTestData implements Closeable {
+
+ private final ConsumerImpl<String> consumer;
+ private final AckStatsInterceptor interceptor;
+ private final List<MessageId> messageIds;
+
+ public int size() {
+ return messageIds.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ interceptor.close();
+ consumer.close();
+ }
+ }
+
+ private static class AckStatsInterceptor implements ConsumerInterceptor<String> {
+
+ private final List<MessageId> individualAckedMessageIdList = new CopyOnWriteArrayList<>();
+ private final List<MessageId> cumulativeAckedMessageIdList = new CopyOnWriteArrayList<>();
+
+ @Override
+ public void close() {
+ // No ops
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable exception) {
+ if (exception != null) {
+ log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
+ return;
+ }
+ individualAckedMessageIdList.add(messageId);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable exception) {
+ if (exception != null) {
+ log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
+ return;
+ }
+ cumulativeAckedMessageIdList.add(messageId);
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // No ops
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // No ops
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index 104d36b4b2f..ee25d504cf9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -147,6 +147,12 @@ public class BatchMessageIdImpl extends MessageIdImpl {
ledgerId, entryId - 1, partitionIndex);
}
+ // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to
+ // have the correct hash code.
+ public MessageIdImpl toMessageIdImpl() {
+ return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+ }
+
public BatchMessageAcker getAcker() {
return acker;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index ae1ac5f7649..f7f43076466 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -35,6 +35,8 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
@@ -162,18 +164,20 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
private void addListAcknowledgment(List<MessageId> messageIds) {
for (MessageId messageId : messageIds) {
- consumer.onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- if (!batchMessageId.ackIndividual()) {
- doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
- } else {
- messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
- doIndividualAckAsync((MessageIdImpl) messageId);
- }
+ addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(),
+ batchMessageId,
+ this::doIndividualAckAsync,
+ this::doIndividualBatchAckAsync);
+ } else if (messageId instanceof MessageIdImpl) {
+ addIndividualAcknowledgment((MessageIdImpl) messageId,
+ null,
+ this::doIndividualAckAsync,
+ this::doIndividualBatchAckAsync);
} else {
- modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
- doIndividualAckAsync((MessageIdImpl) messageId);
+ throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: "
+ + messageId.getClass().getCanonicalName());
}
}
}
@@ -183,67 +187,65 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
Map<String, Long> properties) {
if (msgId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
- if (ackType == AckType.Individual) {
- consumer.onAcknowledge(msgId, null);
- // ack this ack carry bitSet index and judge bit set are all ack
- if (batchMessageId.ackIndividual()) {
- MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
- return doIndividualAck(messageId, properties);
- } else if (batchIndexAckEnabled){
- return doIndividualBatchAck(batchMessageId, properties);
- } else {
- // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
- // all ack complete
- return CompletableFuture.completedFuture(null);
- }
- } else {
- consumer.onAcknowledgeCumulative(msgId, null);
- if (batchMessageId.ackCumulative()) {
- return doCumulativeAck(msgId, properties, null);
- } else {
- if (batchIndexAckEnabled) {
- return doCumulativeBatchIndexAck(batchMessageId, properties);
- } else {
- // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
- // ack
- if (AckType.Cumulative == ackType
- && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
- doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
- batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
- }
- return CompletableFuture.completedFuture(null);
- }
- }
- }
+ return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
} else {
- if (ackType == AckType.Individual) {
- consumer.onAcknowledge(msgId, null);
- modifyMessageIdStatesInConsumer(msgId);
- return doIndividualAck(msgId, properties);
- } else {
- consumer.onAcknowledgeCumulative(msgId, null);
- return doCumulativeAck(msgId, properties, null);
- }
+ return addAcknowledgment(msgId, ackType, properties, null);
}
}
- private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
- MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
- batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
- consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
- clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
- return messageId;
- }
-
- private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
- consumer.getStats().incrementNumAcksSent(1);
- clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+ private CompletableFuture<Void> addIndividualAcknowledgment(
+ MessageIdImpl msgId,
+ @Nullable BatchMessageIdImpl batchMessageId,
+ Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+ Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+ if (batchMessageId != null) {
+ consumer.onAcknowledge(batchMessageId, null);
+ } else {
+ consumer.onAcknowledge(msgId, null);
+ }
+ if (batchMessageId == null || batchMessageId.ackIndividual()) {
+ consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1);
+ consumer.getUnAckedMessageTracker().remove(msgId);
+ if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+ consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);
+ }
+ return individualAckFunction.apply(msgId);
+ } else if (batchIndexAckEnabled) {
+ return batchAckFunction.apply(batchMessageId);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
}
- private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageId) {
- consumer.getUnAckedMessageTracker().remove(messageId);
- if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
- consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+ private CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
+ AckType ackType,
+ Map<String, Long> properties,
+ @Nullable BatchMessageIdImpl batchMessageId) {
+ switch (ackType) {
+ case Individual:
+ return addIndividualAcknowledgment(msgId,
+ batchMessageId,
+ __ -> doIndividualAck(__, properties),
+ __ -> doIndividualBatchAck(__, properties));
+ case Cumulative:
+ if (batchMessageId != null) {
+ consumer.onAcknowledgeCumulative(batchMessageId, null);
+ } else {
+ consumer.onAcknowledgeCumulative(msgId, null);
+ }
+ if (batchMessageId == null || batchMessageId.ackCumulative()) {
+ return doCumulativeAck(msgId, properties, null);
+ } else if (batchIndexAckEnabled) {
+ return doCumulativeBatchIndexAck(batchMessageId, properties);
+ } else {
+ if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+ doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+ batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ default:
+ throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
@@ -278,9 +280,10 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
- private void doIndividualAckAsync(MessageIdImpl messageId) {
+ private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
pendingIndividualAcks.add(messageId);
pendingIndividualBatchIndexAcks.remove(messageId);
+ return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId,
@@ -343,10 +346,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
}
- private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+ private CompletableFuture<Void> doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
- new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
- batchMessageId.getPartitionIndex()), (v) -> {
+ batchMessageId.toMessageIdImpl(), __ -> {
ConcurrentBitSetRecyclable value;
if (batchMessageId.getAcker() != null
&& !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
@@ -358,6 +360,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return value;
});
bitSet.clear(batchMessageId.getBatchIndex());
+ return CompletableFuture.completedFuture(null);
}
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {