You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/20 00:45:30 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #8285: [Issue 8260] Support reset cursor to a batch index of the batching message

codelipenghui commented on a change in pull request #8285:
URL: https://github.com/apache/pulsar/pull/8285#discussion_r508138203



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -36,10 +36,7 @@
 import io.netty.handler.ssl.SslHandler;
 
 import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
+import java.util.*;

Review comment:
       Avoid use import .* here.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -25,20 +25,21 @@
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.*;

Review comment:
       Avoid use import .* here.

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -770,14 +770,15 @@ public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive)
         return res;
     }
 
-    public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) {
+    public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId, int batchIndex) {
         CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
         seekBuilder.setConsumerId(consumerId);
         seekBuilder.setRequestId(requestId);
 
         MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
         messageIdBuilder.setLedgerId(ledgerId);
         messageIdBuilder.setEntryId(entryId);
+        messageIdBuilder.setBatchIndex(batchIndex);

Review comment:
       The `batchIndex` of the MessageIdData is an optional field, we shouldn't always use '0' here. It better to change the params type of the `batchIndex` to `Integer`. If the the `batchIndex` no a null value, set the value for the `MessageIdBuilder`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1349,8 +1347,14 @@ protected void handleSeek(CommandSeek seek) {
             Consumer consumer = consumerFuture.getNow(null);
             Subscription subscription = consumer.getSubscription();
             MessageIdData msgIdData = seek.getMessageId();
+            BitSetRecyclable ackSet = BitSetRecyclable.create();
+            if (msgIdData.hasBatchIndex()) {
+                ackSet.set(0, Math.max(msgIdData.getBatchIndex(), 0));
+            }
 
-            Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
+            Position position = new PositionImpl(msgIdData.getLedgerId(),
+                    msgIdData.getEntryId(), ackSet.toLongArray());
+            ackSet.recycle();

Review comment:
       It's better tot check `msgIdData.hasBatchIndex()` first, If the conditions are not met, we don't need to create the BitSet instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org