You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/28 03:43:26 UTC

[GitHub] [pulsar] tisonkun commented on a diff in pull request #17854: [improve][java-client]Shrink BatchMessageContainer maxBatchSize

tisonkun commented on code in PR #17854:
URL: https://github.com/apache/pulsar/pull/17854#discussion_r981912860


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -98,7 +98,8 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
                 messageMetadata.setSequenceId(msg.getSequenceId());
                 lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
                 this.firstCallback = callback;
-                batchedMessageMetadataAndPayload = allocator.compositeBuffer();
+                batchedMessageMetadataAndPayload = allocator.buffer(
+                        Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));

Review Comment:
   How can you guarantee that this buffer won't overflow?
   
   BTW, it's interesting that `maxBatchSize` actually unused before this change :)



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java:
##########
@@ -46,6 +47,8 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
     protected long currentTxnidLeastBits = -1L;
 
     protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
+    protected static final int SHRINK_COOLING_OFF_PERIOD = 10;
+    protected int consecutiveShrinkTime = 0;

Review Comment:
   Shall these two fields private fields in `BatchMessageContainerImpl`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -167,10 +168,28 @@ private ByteBuf getCompressedBatchMetadataAndPayload() {
 
         // Update the current max batch size using the uncompressed size, which is what we need in any case to
         // accumulate the batch content
-        maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+        updateMaxBatchSize(uncompressedSize);
         return compressedPayload;
     }
 
+    @VisibleForTesting
+    public void updateMaxBatchSize(int uncompressedSize) {

Review Comment:
   ```suggestion
       void updateMaxBatchSize(int uncompressedSize) {
   ```
   
   ditto



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java:
##########
@@ -76,6 +79,11 @@ public long getCurrentBatchSize() {
         return currentBatchSizeBytes;
     }
 
+    @VisibleForTesting
+    public int getMaxBatchSize() {

Review Comment:
   ```suggestion
      int getMaxBatchSize() {
   ```
   
   Testing method is better to have the narrowest visibility :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org