You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/22 03:08:15 UTC
[pulsar] 02/04: [Issue 8260] Support reset cursor to a batch index
of the batching message (#8285)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ba3f21ce95f129fbdd2fd30595daf50700deb92b
Author: Renkai <ga...@gmail.com>
AuthorDate: Wed Oct 21 19:16:29 2020 +0800
[Issue 8260] Support reset cursor to a batch index of the batching message (#8285)
### Motivation
Make reset cursor command able to reset to a specific index in batch mode.
### Modifications
Now reset offset command change index info on broker side
(cherry picked from commit a5b4146ca0f6f333741763ed3a1dda3495dccb65)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++
.../apache/pulsar/broker/service/ServerCnx.java | 10 +++-
.../broker/service/SubscriptionSeekTest.java | 62 ++++++++++++++++++++++
.../pulsar/client/impl/BatchMessageAcker.java | 8 +++
.../apache/pulsar/client/impl/ConsumerImpl.java | 24 +++++++--
.../apache/pulsar/common/protocol/Commands.java | 9 ++--
6 files changed, 109 insertions(+), 9 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 26f1bf3..19a9e9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -967,6 +967,11 @@ public class ManagedCursorImpl implements ManagedCursor {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
+ long[] resetWords = newPosition.ackSet;
+ if (resetWords != null) {
+ BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
+ batchDeletedIndexes.put(newPosition, ackSet);
+ }
}
PositionImpl oldReadPosition = readPosition;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5d49f87..2e1df72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -121,6 +121,8 @@ import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -1331,7 +1333,13 @@ public class ServerCnx extends PulsarHandler {
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();
- Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
+ long[] ackSet = null;
+ if (msgIdData.getAckSetCount() > 0) {
+ ackSet = SafeCollectionUtils.longListToArray(msgIdData.getAckSetList());
+ }
+
+ Position position = new PositionImpl(msgIdData.getLedgerId(),
+ msgIdData.getEntryId(), ackSet);
subscription.resetCursor(position).thenRun(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 0f5986d..31a46cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -21,23 +21,28 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
+
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -54,6 +59,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
super.baseSetup();
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@AfterClass
@@ -117,6 +123,62 @@ public class SubscriptionSeekTest extends BrokerTestBase {
}
@Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+ MessageId messageId = futureMessageId.get();
+ messageIds.add(messageId);
+ }
+
+ producer.close();
+
+
+ org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+
+ consumer.seek(MessageId.earliest);
+ Message<String> receiveBeforEarliest = consumer.receive();
+ assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+ consumer.seek(MessageId.latest);
+ Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
+ assertNull(receiveAfterLatest);
+
+ for (MessageId messageId : messageIds) {
+ consumer.seek(messageId);
+ MessageId receiveId = consumer.receive().getMessageId();
+ assertEquals(receiveId, messageId);
+ }
+ }
+
+
+ @Test
public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
final String subscriptionName = "test-sub-name";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index d46d3b3..86dbfe1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -77,4 +77,12 @@ class BatchMessageAcker {
return prevBatchCumulativelyAcked;
}
+ @Override
+ public String toString() {
+ return "BatchMessageAcker{" +
+ "batchSize=" + batchSize +
+ ", bitSet=" + bitSet +
+ ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked +
+ '}';
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f9546e9..74fe79d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1823,15 +1823,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (!isConnected()) {
return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the " +
- "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
+ String.format("The client is not connected to the broker when seeking the subscription %s of the " +
+ "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
}
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
long requestId = client.newRequestId();
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- ByteBuf seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
+ ByteBuf seek = null;
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+ // Initialize ack set
+ BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ long[] ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
+
+ seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+ seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+ }
+
ClientCnx cnx = cnx();
log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
@@ -2259,4 +2273,4 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d867139..488460f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -634,17 +634,18 @@ public class Commands {
public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) {
CommandActiveConsumerChange.Builder changeBuilder = CommandActiveConsumerChange.newBuilder()
.setConsumerId(consumerId)
- .setIsActive(isActive);
+ .setIsActive(isActive);
CommandActiveConsumerChange change = changeBuilder.build();
ByteBuf res = serializeWithSize(
- BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+ BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
changeBuilder.recycle();
change.recycle();
return res;
}
- public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) {
+ public static ByteBuf newSeek(long consumerId, long requestId,
+ long ledgerId, long entryId, long[] ackSet) {
CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
seekBuilder.setConsumerId(consumerId);
seekBuilder.setRequestId(requestId);
@@ -652,6 +653,8 @@ public class Commands {
MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
messageIdBuilder.setLedgerId(ledgerId);
messageIdBuilder.setEntryId(entryId);
+ messageIdBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet));
+
MessageIdData messageId = messageIdBuilder.build();
seekBuilder.setMessageId(messageId);