You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/11 10:21:44 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #8037: Pending ack client implement

congbobo184 opened a new pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037


   to solve #7981 
   
   ### Implement
   
   #### cumulative ack with transaction
   
   if we wan to use cumulative ack with transaction, you must to configure cumulativeAckWithTxn with this consumer
   ```
       @Override
       public ConsumerBuilder<T> cumulativeAckWithTxn() {
           conf.setCumulativeAckWithTxn(true);
           return this;
       }
   ```
   in ConsumerBuilderImpl.
   this only support one thread consume message with transaction and this operation is sync .
   
   #### individual ack with transaction
   
   the batch message with transaction just carry their own batchIndex and client must enable batch index ack.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694250105


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692545667


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692554200


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694599757


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692554200


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-693836269


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692554871


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692543981


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694279353


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692555429


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692545583


   pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694044854


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694238026


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694043709


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694599494


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694314294


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692554871


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692544947


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694092196


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694238026






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692543981






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692543597


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wolfstudy merged pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692545583






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#discussion_r489316644



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -326,6 +388,86 @@ public void flush() {
             }
         }
 
+        if (!pendingIndividualTransactionAcks.isEmpty()) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
+                // We can send 1 single protobuf command with all individual acks
+                while (true) {
+                    Triple<Long, Long, MessageIdImpl> entry = pendingIndividualTransactionAcks.pollFirst();
+                    if (entry == null) {
+                        break;
+                    }
+
+                    // if messageId is checked then all the chunked related to that msg also processed so, ack all of
+                    // them
+                    MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(entry.getRight());
+                    long mostSigBits = entry.getLeft();
+                    long leastSigBits = entry.getMiddle();
+                    MessageIdImpl messageId = entry.getRight();
+                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+                        for (MessageIdImpl cMsgId : chunkMsgIds) {
+                            if (cMsgId != null) {
+                                newAckCommand(consumer.consumerId, cMsgId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits);
+                            }
+                        }
+                        // messages will be acked so, remove checked message sequence
+                        this.consumer.unAckedChunckedMessageIdSequenceMap.remove(messageId);
+                    } else {
+                        newAckCommand(consumer.consumerId, messageId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits);
+                    }
+                }
+            } else {
+                // When talking to older brokers, send the acknowledgements individually
+                while (true) {
+                    Triple<Long, Long, MessageIdImpl> entry = pendingIndividualTransactionAcks.pollFirst();
+                    if (entry == null) {
+                        break;
+                    }
+
+                    newAckCommand(consumer.consumerId, entry.getRight(), null, AckType.Individual,
+                            null, Collections.emptyMap(), cnx, false, entry.getLeft(), entry.getMiddle());
+                    shouldFlush = true;
+                }
+            }
+        }
+
+        if (!pendingIndividualTransactionBatchIndexAcks.isEmpty()) {
+            Iterator<Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>>> transactionIterator = pendingIndividualTransactionBatchIndexAcks.entrySet().iterator();
+            while (transactionIterator.hasNext()) {
+                Map.Entry<TransactionImpl, ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>> transactionEntry = transactionIterator.next();
+                TransactionImpl txn = transactionEntry.getKey();
+                synchronized (txn) {
+                    if (pendingIndividualTransactionBatchIndexAcks.containsKey(txn)) {
+                        List<Triple<Long, Long, ConcurrentBitSetRecyclable>> messageIdBitSetList = new ArrayList<>();
+                        transactionEntriesToAck.put(txn, messageIdBitSetList);
+                        Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> messageIdIterator = transactionEntry.getValue().entrySet().iterator();
+                        while (messageIdIterator.hasNext()) {
+                            Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> messageIdEntry = messageIdIterator.next();
+                            ConcurrentBitSetRecyclable concurrentBitSetRecyclable =
+                                    ConcurrentBitSetRecyclable.create(messageIdEntry.getValue());
+                            MessageIdImpl messageId = messageIdEntry.getKey();
+                            messageIdBitSetList.add(Triple.of(messageId.ledgerId, messageId.entryId, concurrentBitSetRecyclable));
+                            messageIdEntry.getValue().set(0, messageIdEntry.getValue().size());
+                            messageIdIterator.remove();
+                        }
+                        transactionIterator.remove();
+                        pendingIndividualTransactionBatchIndexAcks.remove(transactionEntry.getKey());

Review comment:
       Looks don't need to remove again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-692544947


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694256305


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8037: Pending ack client implement

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8037:
URL: https://github.com/apache/pulsar/pull/8037#issuecomment-694043709


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org