You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/11 12:12:33 UTC

[GitHub] [pulsar] andrekramer1 opened a new issue #14242: WebSocket persistent subscriptions have batch messages not consumed.

andrekramer1 opened a new issue #14242:
URL: https://github.com/apache/pulsar/issues/14242


   With acknowledgmentAtBatchIndexLevelEnabled = false (default) and the Web Socket consumer builder set to builder.enableBatchIndexAcknowledgment(false) (again the default) and all messages acknowledged by the web socket client, 
   we see some messages not properly acknowledged and deleted from the subscription. If the subscribing web socket client disconnects and then reconnects to Web Socket tier it will see those messages again and if it reconnect again then again, etc.
   
   The bug can be side stepped by setting both configs above to true so that "batch index level acks" are fully enabled. 
   If not enabled in the Web Socket tier's consumer (regardless of broker setting) then the following happens in Web Socket servlet:
   
   A message acknowledgement is de-serialized and the message id object gets it's own bitset. So even if an ack is for the same
   batch as others it will have a separate bitmap and so batches with more than 1 message are  never acknowledged to the broker 
   (as the bitsets will never become all unset - just one unset out of expected size of batch). 
   
   We attempted a fix that makes the incoming (to Web Socket tier) message ids share bitsets if in the same batch (after message ack 
   deserialization). This seems to work and is similar to what happens in the batch index case, where the bitsets are duplicated and shared at the batch level. The suggested fix is outlined below.
   
   Also, for the true/true case (batch index acks enabled) we did spot a data race in that the shared bitmap has a bit set which could be missed because the flush has just read the bits from and then removed the same bitset from the concurrent hash map. We would advice the readwritelock that is in the PersistentAcknowledgmentsGroupingTracker already (but used only if isAckReceiptEnabled(consumer.getClientCnx()). Occasional dropped acks could otherwise result.
   
   
   **To Reproduce**
   
   We had 2 producers sending in messages (say 10) that ended up in batches of usually 2 messages. And one Web Socket subscribing consumer connected to the Web Proxy tier. It would see all messages but after disconnecting and re-connecting it would see some messages again. These could not be consumed via the web socket tier but can be consumed with a normal/binary Pulsar client (say bin/pulsar-client) and we noted that the unconsumed messages were in batches of more than one. Ledger(s) could also not be delete while the messages are unconsumed so even if the application could handle the replays we quickly run out of "bookie" storage space.
   
   **Expected behavior**
   All messages that are acked to be marked as consumed in subscription with some occasional replays to be allowed (at least once semantics). 
   
   **Desktop (please complete the following information):**
    - All deployed in Kubernetes. Private build of 2.8.0.
   
   **Additional context**
   We observed the problem in 2.8.0 but it should be similar for later versions as the code has not changed in the Web Socket or parts of the Pulsar client used inside the WebSocket servlet.
   
   -------------------------------------------------------------------------
   Suggested fix: 
   
   ConsumerHandler.java 
   
   current:
           MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                   topic.toString());
           consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
   
   with fix:
                  MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(message),
                           topic.toString());
   
                   if (!enableBatchIndexAcknowledgment && msgId instanceof BatchMessageIdImpl) {
                       BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
                       BatchMessageAcker acker = batchMsgId.getAcker();
                       int size = acker.getBatchSize();
                       BitSet bitset = ackers.computeIfAbsent(Pair.of(batchMsgId.getLedgerId(), batchMsgId.getEntryId()), key -> {
                           BitSet bitSet = new BitSet(size);
                           bitSet.set(0, size);
                           return bitSet;
                       });
                       acker.setBitSet(bitset);
                   }
   
                   consumer.acknowledgeAsync(msgId).thenAccept(consumer -> {
                       if (log.isDebugEnabled()) {
                           log.debug("[{}] Ack'ed asynchronously", msgId);
                       }
   
                       if (!enableBatchIndexAcknowledgment && msgId instanceof BatchMessageIdImpl) {
                           BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
                           ackers.computeIfPresent(Pair.of(batchMsgId.getLedgerId(), batchMsgId.getEntryId()), (key, value) -> {
                               if (value.isEmpty()) {
                                  // infer ack was sent so drop bitset from map.
                                   return null;
                               }
                               return value;
                           });
                       }
                       numMsgsAcked.increment();
                   });
   
   where:
       private final ConcurrentHashMap<Pair<Long, Long>, BitSet> ackers = new ConcurrentHashMap<>(); // class member
       private final boolean enableBatchIndexAcknowledgment = false; // could be a new config for Web Socket proxy to set 
       consumer builder.enableBatchIndexAcknowledgment in Web Socket servlet.
   
   --------------------------------------------------------------------------
   Race condition:
   
   currently have 2 code segments that conditionally take lock in PersistentAcknowledgesGroupingTracker.java :
   
             if (isAckReceiptEnabled(consumer.getClientCnx())) {
                   ...
                   this.lock.readLock().lock();
             ... 
   
   suggest always taking the lock for doINdividualAck and flush (which is called on scheduled task).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #14242: WebSocket persistent subscriptions have batch messages not consumed.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #14242:
URL: https://github.com/apache/pulsar/issues/14242#issuecomment-1066259086


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org