You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/22 03:08:15 UTC

[pulsar] 02/04: [Issue 8260] Support reset cursor to a batch index of the batching message (#8285)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ba3f21ce95f129fbdd2fd30595daf50700deb92b
Author: Renkai <ga...@gmail.com>
AuthorDate: Wed Oct 21 19:16:29 2020 +0800

    [Issue 8260] Support reset cursor to a batch index of the batching message (#8285)
    
    ### Motivation
    
    Make reset cursor command able to reset to a specific index in batch mode.
    
    ### Modifications
    
    Now reset offset command change index info on broker side
    
    (cherry picked from commit a5b4146ca0f6f333741763ed3a1dda3495dccb65)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../apache/pulsar/broker/service/ServerCnx.java    | 10 +++-
 .../broker/service/SubscriptionSeekTest.java       | 62 ++++++++++++++++++++++
 .../pulsar/client/impl/BatchMessageAcker.java      |  8 +++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 24 +++++++--
 .../apache/pulsar/common/protocol/Commands.java    |  9 ++--
 6 files changed, 109 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 26f1bf3..19a9e9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -967,6 +967,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                     if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
                         batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
                         batchDeletedIndexes.clear();
+                        long[] resetWords = newPosition.ackSet;
+                        if (resetWords != null) {
+                            BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
+                            batchDeletedIndexes.put(newPosition, ackSet);
+                        }
                     }
 
                     PositionImpl oldReadPosition = readPosition;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5d49f87..2e1df72 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -121,6 +121,8 @@ import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -1331,7 +1333,13 @@ public class ServerCnx extends PulsarHandler {
             Subscription subscription = consumer.getSubscription();
             MessageIdData msgIdData = seek.getMessageId();
 
-            Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
+            long[] ackSet = null;
+            if (msgIdData.getAckSetCount() > 0) {
+                ackSet = SafeCollectionUtils.longListToArray(msgIdData.getAckSetList());
+            }
+
+            Position position = new PositionImpl(msgIdData.getLedgerId(),
+                    msgIdData.getEntryId(), ackSet);
 
 
             subscription.resetCursor(position).thenRun(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 0f5986d..31a46cf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -21,23 +21,28 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -54,6 +59,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     @Override
     protected void setup() throws Exception {
         super.baseSetup();
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
     }
 
     @AfterClass
@@ -117,6 +123,62 @@ public class SubscriptionSeekTest extends BrokerTestBase {
     }
 
     @Test
+    public void testSeekForBatch() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
+        String subscriptionName = "my-subscription-batch";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                .topic(topicName).create();
+
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        producer.close();
+
+
+        org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .startMessageIdInclusive()
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+
+        consumer.seek(MessageId.earliest);
+        Message<String> receiveBeforEarliest = consumer.receive();
+        assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+        consumer.seek(MessageId.latest);
+        Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
+        assertNull(receiveAfterLatest);
+
+        for (MessageId messageId : messageIds) {
+            consumer.seek(messageId);
+            MessageId receiveId = consumer.receive().getMessageId();
+            assertEquals(receiveId, messageId);
+        }
+    }
+
+
+    @Test
     public void testConcurrentResetCursor() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
         final String subscriptionName = "test-sub-name";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index d46d3b3..86dbfe1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -77,4 +77,12 @@ class BatchMessageAcker {
         return prevBatchCumulativelyAcked;
     }
 
+    @Override
+    public String toString() {
+        return "BatchMessageAcker{" +
+                "batchSize=" + batchSize +
+                ", bitSet=" + bitSet +
+                ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked +
+                '}';
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f9546e9..74fe79d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1823,15 +1823,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         if (!isConnected()) {
             return FutureUtil.failedFuture(new PulsarClientException(
-                String.format("The client is not connected to the broker when seeking the subscription %s of the " +
-                    "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
+                    String.format("The client is not connected to the broker when seeking the subscription %s of the " +
+                            "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
         }
 
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
 
         long requestId = client.newRequestId();
-        MessageIdImpl msgId = (MessageIdImpl) messageId;
-        ByteBuf seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
+        ByteBuf seek = null;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+            // Initialize ack set
+            BitSetRecyclable ackSet = BitSetRecyclable.create();
+            ackSet.set(0, msgId.getBatchSize());
+            ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+            long[] ackSetArr = ackSet.toLongArray();
+            ackSet.recycle();
+
+            seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+        } else {
+            MessageIdImpl msgId = (MessageIdImpl) messageId;
+            seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+        }
+
         ClientCnx cnx = cnx();
 
         log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
@@ -2259,4 +2273,4 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d867139..488460f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -634,17 +634,18 @@ public class Commands {
     public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) {
         CommandActiveConsumerChange.Builder changeBuilder = CommandActiveConsumerChange.newBuilder()
             .setConsumerId(consumerId)
-            .setIsActive(isActive);
+                .setIsActive(isActive);
 
         CommandActiveConsumerChange change = changeBuilder.build();
         ByteBuf res = serializeWithSize(
-            BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+                BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
         changeBuilder.recycle();
         change.recycle();
         return res;
     }
 
-    public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) {
+    public static ByteBuf newSeek(long consumerId, long requestId,
+                                  long ledgerId, long entryId, long[] ackSet) {
         CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
         seekBuilder.setConsumerId(consumerId);
         seekBuilder.setRequestId(requestId);
@@ -652,6 +653,8 @@ public class Commands {
         MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
         messageIdBuilder.setLedgerId(ledgerId);
         messageIdBuilder.setEntryId(entryId);
+        messageIdBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet));
+
         MessageIdData messageId = messageIdBuilder.build();
         seekBuilder.setMessageId(messageId);