You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/04/26 11:17:45 UTC

[GitHub] [pulsar] ivankelly commented on a change in pull request #4062: Delayed message delivery implementation

ivankelly commented on a change in pull request #4062: Delayed message delivery implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r278906399
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sent
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been corrupted
+            MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+            if (msgMetadata == null) {
+                // Message metadata was corrupted
                 iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
                 entry.release();
                 subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, Collections.emptyMap());
                 continue;
+            } else if (msgMetadata.hasDeliverAtTime()
 
 Review comment:
   This is strange. The dispatcher gets a list of entries from the managed ledger, passes it to the consumer which then passes it back to an interface on the dispatcher which for all but one case is a noop.
   
   It's also weird that tracking happens in a method called updatePermitsAndPendingAcks. And this method doesn't look like it should mutate it's argument, but it does.
   
   The only reason I can see to do this is that you're trying to only call peekMessageMetadata once. However, the mutation means you are having to copy the list before the call to sendMessages, which is creating extra GC pressure even when tracking is disabled.
   
   I think it would be better to contain it all in the dispatcher, and do the second peek. peekMessageMetadata is memory-neutral. So there would be more CPU usage but only in the case where delay is enabled. ```trackDelayedDelivery``` interface is no longer needed, as it can all be done before the call to sendMessages.
   
   Something like:
   ```
   readEntriesComplete(List<Entry> entries) {
       ...
       if (messagesForC > 0) {
           List<Entry> toSend = trackDelayedMessages(entries.subList(...));
           c.sendMessages(toSend);
       ...
   }
   
   private List<Entry> trackDelayedMessages(List<Entries> entries) {
       if (!trackingEnabled) {
           return entries;
       }
       // peek metadata for each entry
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services