You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/04 07:29:57 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

congbobo184 opened a new pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478


   ## Motivation
   detail in https://github.com/apache/pulsar/wiki/PIP-84-%3A-Pulsar-client%3A-Redeliver-command-add-epoch.
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (yes)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r795007153



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5;

Review comment:
       The default epoch is -1L, but the `consumer_epoch` defined in pb is uint64, it will be the max value of uint64?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -442,12 +454,21 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
         Message<T> message;
+        long callTime = System.currentTimeMillis();

Review comment:
       We'd better use NANOSECONDS instead of MILLISECONDS

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1734,29 +1786,54 @@ public int numMessagesInQueue() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established "
+                            + "for redelivery of unacknowledged messages", this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
+            // we need to keep both epochs the same
+            if (conf.getSubscriptionType() == SubscriptionType.Failover
+                    || conf.getSubscriptionType() == SubscriptionType.Exclusive) {
+                CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       For MultiTopicsConsumer, it increased CONSUMER_EPOCH, and then call `redeliverUnacknowledgedMessages` for each consumer, will it lead epoch in consistent between consumers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r794541931



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -545,15 +546,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
                 startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
-                Collections.emptyMap());
+                Collections.emptyMap(), -1L);

Review comment:
       can we use a constant for this "magic number" -1 ?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -352,11 +364,22 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
         Message<T> message;
+
+        long callTime = System.currentTimeMillis();

Review comment:
       what about using nanoTime ?

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -479,7 +480,7 @@ public static BaseCommand newMessageCommand(long consumerId, long ledgerId, long
     public static ByteBufPair newMessage(long consumerId, long ledgerId, long entryId, int partition,
             int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet) {
         return serializeCommandMessageWithSize(
-                newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount, ackSet),
+                newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount, ackSet, 0L),

Review comment:
       can we use a constant for this "magic number" 0 ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776394773



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -1114,11 +1114,13 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Consumer consumer) {
+    public CompletableFuture<Void> redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
         Dispatcher dispatcher = getDispatcher();
         if (dispatcher != null) {
-            dispatcher.redeliverUnacknowledgedMessages(consumer);
+            return dispatcher.redeliverUnacknowledgedMessages(consumer, consumerEpoch);
         }
+        return FutureUtil.failedFuture(

Review comment:
       should be a behavior change, is it necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r795071110



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1734,29 +1786,54 @@ public int numMessagesInQueue() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established "
+                            + "for redelivery of unacknowledged messages", this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
+            // we need to keep both epochs the same
+            if (conf.getSubscriptionType() == SubscriptionType.Failover
+                    || conf.getSubscriptionType() == SubscriptionType.Exclusive) {
+                CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       every `ConsumerImpl` in `MultiTopicsConsumerImpl` will increase the CONSUMER_EPOCH, they will change at the same time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776647452



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
     public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes,

Review comment:
       Add an overloading method here can avoid many changes
   
   ```java
   public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
                                        int totalMessages, long totalBytes, long totalChunkedMessages,
                                        RedeliveryTracker redeliveryTracker) {
       sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes, totalChunkedMessages, redeliveryTracker, this.consumerEpoch);
   }
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -284,21 +284,35 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
     // Must be called from the internalPinnedExecutor thread
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
         checkArgument(message instanceof MessageImpl);
-        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
-                consumer.getTopicNameWithoutPartition(), message, consumer);
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}][{}] Received message from topics-consumer {}",
-                    topic, subscription, message.getMessageId());
-        }
+        // Mutually exclusive with redeliver to prevent the message
+        //from being cleared after being added to the incomingMessages
+        lock.readLock().lock();

Review comment:
       We can only add a check when polling out the message from the receiver queue, looks like the queue might have messages with lower epoch, but will be skipped when polling out from the queue.
   
   I think it will simplify the implementation.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -561,7 +562,16 @@ private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMar
                         if (cumulativeAckOfTransaction.getKey().equals(txnId)) {
                             cumulativeAckOfTransaction = null;
                         }
-                        persistentSubscription.redeliverUnacknowledgedMessages(consumer);
+                        // pendingAck handle next pr will fix

Review comment:
       Can you add a //TODO here and add some description of what should we do here?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -95,9 +97,10 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+            ReadEntriesCallBackWrapper wrapper = ReadEntriesCallBackWrapper.create(consumer, DEFAULT_READ_EPOCH);

Review comment:
       Can you add a //TODO here? the compacted topic also needs this fix. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1456,18 +1456,42 @@ protected void handleFlow(CommandFlow flow) {
     @Override
     protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
         checkArgument(state == State.Connected);
+        CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
+        final boolean hasRequestId = redeliver.hasRequestId();
+        final long requestId = hasRequestId ? redeliver.getRequestId() : 0;
+        final long consumerId = redeliver.getConsumerId();
+
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Resend Command from consumer {} ", remoteAddress, redeliver.getConsumerId());
+            log.debug("[{}] redeliverUnacknowledged from consumer {} , requestId {}, consumerEpoch {}",
+                    remoteAddress, redeliver.getConsumerId(), requestId, redeliver.getConsumerEpoch());
         }
 
-        CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
-
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
             if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
-                consumer.redeliverUnacknowledgedMessages();
+                consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch()).whenComplete((v, e) -> {
+                    if (hasRequestId) {
+                        if (e != null) {
+                            log.error("redeliverUnacknowledgedMessages error! "
+                                    + "consumerId : {}, requestId: {}", consumerId, requestId, e);
+                            ctx.writeAndFlush(Commands.newError(requestId,
+                                    BrokerServiceException.getClientErrorCode(e), e.getMessage()));
+                        } else {
+                            ctx.writeAndFlush(Commands.newSuccess(requestId));

Review comment:
       Any reason for introducing the redelivery response here? If it is not directly related to the consumer epoch introduced in PIP 84, we can separate it into 2 parts.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -217,7 +224,7 @@ public boolean readCompacted() {
     public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes,

Review comment:
       And the protocol handlers call this method, also can avoid modifications from the protocol handlers.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -269,7 +269,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
                 // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
                 receiveMessageFromConsumer(consumer);
             }
-        }, internalPinnedExecutor).exceptionally(ex -> {
+        }, externalPinnedExecutor).exceptionally(ex -> {

Review comment:
       why change to the external pinned executor, the `externalPinnedExecutor` is used to trigger message listeners.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r719464344



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
##########
@@ -46,10 +46,11 @@
      * @param callback              callback object
      * @param ctx                   opaque context
      * @param maxPosition           max position can read
+     * @param epoch                 epoch of this read
      * @see #readEntries(int)
      */
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
-                          Object ctx, PositionImpl maxPosition);
+                          Object ctx, PositionImpl maxPosition, long epoch);

Review comment:
       Can we move the epoch into the ctx? I think it should be a context or a carrier, if want to add more ctx in the future, we do not want to add more params, maybe we can try to create a ReadEntryContext(with recycle), the consumer ref, and the epoch should ship to the ReadEntryContext




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#issuecomment-956098211


   @codelipenghui  This is a feature, it should not be put into 2.8.x ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780122613



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       I see the DEFAULT_CONSUMER_EPOCH is -1, but here is 0. I think to make them consistent or don't assign the default value in the protocol is simpler for us?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776373051



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,6 +129,11 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_READ_EPOCH;
+    public static final long DEFAULT_READ_EPOCH = 0L;

Review comment:
       Can we remove this one, just provide a getter method for `consumerEpoch`? 
   It looks like:
   
   1. get epoch from consumer
   2. use the epoch to read data
   3. dispatch messages along with the epoch
   
   

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -575,4 +595,47 @@ public boolean checkAndUnblockIfStuck() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+    public static class ReadEntriesCallBackWrapper {

Review comment:
       ```suggestion
       public static class ReadEntriesCtx {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -267,7 +268,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
                 consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
                         sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
-                        getRedeliveryTracker()).addListener(future -> {
+                        getRedeliveryTracker(), DEFAULT_READ_EPOCH).addListener(future -> {

Review comment:
       Key shared also needs to avoid the out-order messages. Separately fixed in another PR is ok, I'm not sure if there are other ordering issues with Key_Shared subscription when redelivery messages, need more further investigated

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -115,16 +117,18 @@ protected void scheduleReadOnActiveConsumer() {
         }
 
         readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
-                        serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+            // in order to prevent redeliverUnacknowledgedMessages rewind cursor again
+            synchronized (PersistentDispatcherSingleActiveConsumer.this) {

Review comment:
       Could you please provide more context here? I'm more sure why adding `synchronized` here can prevent rewinding the cursor again. The Runnable instance waiting here but after the previous one is complete, it will rewind the cursor.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -604,6 +605,8 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];
+    optional uint64 request_id = 4;

Review comment:
       @congbobo184 do we need this one?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -575,4 +595,47 @@ public boolean checkAndUnblockIfStuck() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+    public static class ReadEntriesCallBackWrapper {

Review comment:
       We don't wrapper the read entry callback, just provide a combined ctx.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r795070658



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5;

Review comment:
       To prevent use of epoch in other ways in future so we set this to uint64




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787417655



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -760,75 +785,86 @@ public void connectionOpened(final ClientCnx cnx) {
         }
         // startMessageRollbackDurationInSec should be consider only once when consumer connects to first time
         long startMessageRollbackDuration = (startMessageRollbackDurationInSec > 0
-                && startMessageId != null && startMessageId.equals(initialStartMessageId)) ? startMessageRollbackDurationInSec : 0;
-        ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted,
-                conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
-                startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
-                conf.getSubscriptionProperties());
-
-        cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-            synchronized (ConsumerImpl.this) {
-                if (changeToReadyState()) {
-                    consumerIsReconnectedToBroker(cnx, currentSize);
-                } else {
+                && startMessageId != null
+                && startMessageId.equals(initialStartMessageId)) ? startMessageRollbackDurationInSec : 0;
+
+        // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
+        synchronized (this) {
+            setClientCnx(cnx);
+            ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
+                    priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
+                    conf.isReplicateSubscriptionState(),
+                    InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+                    startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
+                    // this.consumerEpoch will increase

Review comment:
       Use the current epoch to subscribe.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1705,29 +1762,52 @@ public int numMessagesInQueue() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established " +
+                            "for redelivery of unacknowledged messages", this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // is channel is connected, we should send redeliver command to broker
+            if (isConnected(cnx) && cnx != null) {

Review comment:
       ```suggestion
               if (cnx != null && isConnected(cnx)) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r718509808



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -604,6 +605,8 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];
+    optional uint64 request_id = 4;

Review comment:
       Do we need the request_id, looks like we don't have a redelivery response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780097534



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {

Review comment:
       I noticed there are lots of changes from the tests are related to the newly added param, it's better to keep the old constructor to avoid the many changes for this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780077192



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -546,15 +547,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
                 startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
-                Collections.emptyMap());
+                Collections.emptyMap(), -1L);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
                SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
                Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
                InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
                SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
-               Map<String, String> subscriptionProperties) {
+               Map<String, String> subscriptionProperties, long epoch) {

Review comment:
       ```suggestion
                  Map<String, String> subscriptionProperties, long consumerEpoch) {
   ```

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5 [default = 0];

Review comment:
       I think we don't need the default value?
   
   If the `consumer_epoch` does not present, it means the broker does not assign a `consumer_epoch` for the message or the message from the old version broker without the consumer epoch feature. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -94,9 +96,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+
+            // TODO: redeliver epoch

Review comment:
       Could you please create an issue for tracking the task?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -937,10 +937,13 @@ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Ex
         retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
         MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
         assertNotNull(msg);
-        ByteBuf payload = ((MessageImpl) msg).getPayload();
+        ByteBuf payload = msg.getPayload();
         assertNotEquals(payload.refCnt(), 0);
         consumer.redeliverUnacknowledgedMessages();
-        assertEquals(payload.refCnt(), 0);
+        consumer.clearIncomingMessagesAndGetMessageNumber();
+        if (payload.refCnt() != 0) {

Review comment:
       We have released the payload in `clearIncomingMessagesAndGetMessageNumber`, why need this check here? Or any cases the payload ref count is not 0 after released the message?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -616,6 +620,7 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];

Review comment:
       We don't need the default value?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       ```suggestion
       optional uint64 consumer_epoch = 19 [default = 0];
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       We should avoid situations where the epoch can be reduced

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one consumer future and do such as redeliver op

Review comment:
       I think the comment here is not only for this change, we will see the comment in the codebase in the future. I think the correct description is
   
   ```
   Make sure the consumer future is put into the consumers map first to avoid the same consumer ID using different consumer futures, and only remove the consumer future from the map if subscribe failed .
   ```
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -412,13 +411,20 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         try {
             message = incomingMessages.take();
             messageProcessed(message);
+            if (checkMessageImplConsumerEpochIsSmallerThanConsumer(message)) {

Review comment:
       ```suggestion
               if (!isValidConsumerEpoch(message)) {
   ```
   
   More straightforward and understandable

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {

Review comment:
       I noticed there are lots of changes from the tests are related to the newly added param, it's better to keep the old constructor to avoid the many changes for this PR.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;

Review comment:
       The constructor already init the `consumerEpoch`.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one consumer future and do such as redeliver op
+        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
+                consumerFuture);

Review comment:
       How about:
   
   ```java
   CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
                   new CompletableFuture<>());
   ```
   
   And, `computeIfAbsent` is more elegant here.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover

Review comment:
       We should print a warn log here, which will help with troubleshooting.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover
+                || getSubType() == CommandSubscribe.SubType.Exclusive)
+                && (message).getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH

Review comment:
       why need `(message)` here? any difference with `message.getConsumerEpoch`?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {

Review comment:
       For more safety, we should not filter out any message for a broker with an old version protocol version.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       we don't need the default value?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       For the non-persistent topic and shared subscription, we have disabled the consumer epoch, this also means we should prevent the modification for the consumer epoch through the message redelivery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780077192



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -546,15 +547,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
                 startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
-                Collections.emptyMap());
+                Collections.emptyMap(), -1L);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
                SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
                Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
                InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
                SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
-               Map<String, String> subscriptionProperties) {
+               Map<String, String> subscriptionProperties, long epoch) {

Review comment:
       ```suggestion
                  Map<String, String> subscriptionProperties, long consumerEpoch) {
   ```

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5 [default = 0];

Review comment:
       I think we don't need the default value?
   
   If the `consumer_epoch` does not present, it means the broker does not assign a `consumer_epoch` for the message or the message from the old version broker without the consumer epoch feature. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -94,9 +96,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+
+            // TODO: redeliver epoch

Review comment:
       Could you please create an issue for tracking the task?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -937,10 +937,13 @@ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Ex
         retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
         MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
         assertNotNull(msg);
-        ByteBuf payload = ((MessageImpl) msg).getPayload();
+        ByteBuf payload = msg.getPayload();
         assertNotEquals(payload.refCnt(), 0);
         consumer.redeliverUnacknowledgedMessages();
-        assertEquals(payload.refCnt(), 0);
+        consumer.clearIncomingMessagesAndGetMessageNumber();
+        if (payload.refCnt() != 0) {

Review comment:
       We have released the payload in `clearIncomingMessagesAndGetMessageNumber`, why need this check here? Or any cases the payload ref count is not 0 after released the message?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -616,6 +620,7 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];

Review comment:
       We don't need the default value?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       ```suggestion
       optional uint64 consumer_epoch = 19 [default = 0];
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       We should avoid situations where the epoch can be reduced

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one consumer future and do such as redeliver op

Review comment:
       I think the comment here is not only for this change, we will see the comment in the codebase in the future. I think the correct description is
   
   ```
   Make sure the consumer future is put into the consumers map first to avoid the same consumer ID using different consumer futures, and only remove the consumer future from the map if subscribe failed .
   ```
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -412,13 +411,20 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         try {
             message = incomingMessages.take();
             messageProcessed(message);
+            if (checkMessageImplConsumerEpochIsSmallerThanConsumer(message)) {

Review comment:
       ```suggestion
               if (!isValidConsumerEpoch(message)) {
   ```
   
   More straightforward and understandable

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {

Review comment:
       I noticed there are lots of changes from the tests are related to the newly added param, it's better to keep the old constructor to avoid the many changes for this PR.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;

Review comment:
       The constructor already init the `consumerEpoch`.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one consumer future and do such as redeliver op
+        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
+                consumerFuture);

Review comment:
       How about:
   
   ```java
   CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
                   new CompletableFuture<>());
   ```
   
   And, `computeIfAbsent` is more elegant here.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover

Review comment:
       We should print a warn log here, which will help with troubleshooting.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover
+                || getSubType() == CommandSubscribe.SubType.Exclusive)
+                && (message).getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH

Review comment:
       why need `(message)` here? any difference with `message.getConsumerEpoch`?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {

Review comment:
       For more safety, we should not filter out any message for a broker with an old version protocol version.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       we don't need the default value?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       For the non-persistent topic and shared subscription, we have disabled the consumer epoch, this also means we should prevent the modification for the consumer epoch through the message redelivery.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       I see the DEFAULT_CONSUMER_EPOCH is -1, but here is 0. I think to make them consistent or don't assign the default value in the protocol is simpler for us?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -86,6 +90,13 @@
     protected final Lock reentrantLock = new ReentrantLock();
     private volatile boolean isListenerHandlingMessage = false;
 
+    @Getter
+    protected final AtomicLong consumerEpoch = new AtomicLong(0);

Review comment:
       AtomicUpdater is more efficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r718509808



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -604,6 +605,8 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];
+    optional uint64 request_id = 4;

Review comment:
       Do we need the request_id, looks like we don't have a redelivery response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780123526



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -86,6 +90,13 @@
     protected final Lock reentrantLock = new ReentrantLock();
     private volatile boolean isListenerHandlingMessage = false;
 
+    @Getter
+    protected final AtomicLong consumerEpoch = new AtomicLong(0);

Review comment:
       AtomicUpdater is more efficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787353171



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -643,6 +670,7 @@ String getHandlerName() {
     public void redeliverUnacknowledgedMessages() {
         lock.writeLock().lock();
         try {
+            CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       The `CONSUMER_EPOCH` of `MultiTopicsConsumerImpl` might have a different sequence with `ConsumerImpl` due to the epoch will increase if `ConsumerImpl` reconnects to the topic, here we assume the `MultiTopicsConsumerImpl` and `ConsumerImpl` with same the value.
   
   We can check the message epoch with the epoch of internal consumer?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {

Review comment:
       Or we should return false if the epoch is invalid. I prefer to use `isValidConsumerEpoch` and return false if the epoch is invalid.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch

Review comment:
       It not for the redelivery, it's for the consumer reconnection?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -328,13 +327,26 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
         }
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    private boolean checkTopicMessageConsumerEpochIsSmallerThanConsumer(Message<T> message) {

Review comment:
       ```suggestion
       private boolean isValidConsumerEpoch(Message<T> message) {
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {

Review comment:
       If true means the message is invalid, we should use `isInvalidConsumerEpoch`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10478: [Broker] PIP:84 Redeliver command add epoch.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787441104



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -643,6 +670,7 @@ String getHandlerName() {
     public void redeliverUnacknowledgedMessages() {
         lock.writeLock().lock();
         try {
+            CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       Another point is if users can `MultiTopicsConsumerImpl.redeliverUnacknowledgedMessages()` and `ConsumerImpl.redeliverUnacknowledgedMessages()` will also make the consumer epoch inconsistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org