You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/19 01:08:55 UTC

[GitHub] [pulsar] rdhabalia opened a new pull request #14382: [pulsar-broker] support client configurable message chunk size

rdhabalia opened a new pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382


   ### Modification
   Right now, producer-client supports chunking to publish large messages. Producer chunks the message based on broker's `maxMessageSize` configuration which is by default 5MB. However, 5MB is still large block size for topic with higher throughput which can directly impact overall bookie's performance. In that case, client wants to publish smaller chunk (eg: 100KB) without changing broker's maxMessageSize because other low throughput tenant can still utilize 5MB message limit. So, user needs a feature where user can control max-chunk size by considering msg-throughput and without impacting broker's performance.
   
   ### Modification
   Add `chunkMaxMessageSize` config for producer to control max-chunk size by a user.
   
   ### Result
   User can publish custom sized chunk messages based on user requirements.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r823038542



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1525,6 +1530,7 @@ public int messagesCount() {
     @Override
     public void connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
+        chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize());

Review comment:
       no, the concern is not changing chunk on new connection but pulsar-client first sets `ClientCnx.getMaxMessageSize()` on first connection opened when it receives an update from the broker. anyway, max-size is not something broker specific but it's bk specific so, no need to handle such a scenario where it can change broker to broker.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r823039070



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -492,6 +496,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 compressedPayload.release();
                 return;
             }
+            payloadChunkSize = Math.min(chunkMaxMessageSize, payloadChunkSize);

Review comment:
       config can have default value `-1` and it needs to calculate right value based on user-configured value. so, we can't directly use from config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#issuecomment-1045480464


   @rdhabalia:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r812692432



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -448,7 +452,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                         new PulsarClientException.InvalidMessageException(
                                 format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
                                                 + " %d bytes",
-                        producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
+                        producerName, topic, compressedStr, compressedSize, chunkMaxMessageSize));

Review comment:
       It seems that we should not change here.  If user set `chunkMaxMessageSize` but `isChunkingEnabled` is false, this error log is not accurate.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -471,8 +475,8 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // send in chunks
         int totalChunks = canAddToBatch(msg) ? 1
-                : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
-                        + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
+                : Math.max(1, compressedPayload.readableBytes()) / chunkMaxMessageSize

Review comment:
       We should apply `chunkMaxMessageSize` only if chunkingEnable == true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r810464635



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -439,7 +443,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
             // validate msg-size (For batching this will be check at the batch completion size)
             int compressedSize = compressedPayload.readableBytes();
-            if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
+            if (compressedSize > chunkMaxMessageSize && !this.conf.isChunkingEnabled()) {

Review comment:
       This should not be changed to `chunkMaxMessageSize`, otherwise we won't be able to send message larger than `chunkMaxMessageSize` if chunking is disabled.

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -168,7 +168,8 @@
      * will fail unless {@code blockIfQueueFull=true}. Use {@link #blockIfQueueFull(boolean)}
      * to change the blocking behavior.
      *
-     * <p>The producer queue size also determines the max amount of memory that will be required by
+     * <p>The producer queue size also determin@Override
+    es the max amount of memory that will be required by

Review comment:
       Seems not right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r810481445



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -343,6 +344,15 @@
      */
     ProducerBuilder<T> enableChunking(boolean enableChunking);
 
+    /**
+     * Max chunk message size in bytes. Producer chunks the message if chunking is enabled and message size is larger
+     * than max chunk-message size. By default, producer chunks based on max-message size configured at broker.
+     *
+     * @param chunkMaxMessageSize
+     * @return
+     */

Review comment:
       Which is the default value? Which value can be used to disable this feature? Can you please document it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r822234916



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -492,6 +496,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 compressedPayload.release();
                 return;
             }
+            payloadChunkSize = Math.min(chunkMaxMessageSize, payloadChunkSize);

Review comment:
       I think we can remove the field `chunkMaxMessageSize` and use the value from conf directly.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1525,6 +1530,7 @@ public int messagesCount() {
     @Override
     public void connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
+        chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize());

Review comment:
       If we assume that `ClientCnx.getMaxMessageSize()` may change on new connections. Consider the case :
   1. Previous connection: `conf.getChunkMaxMessageSize()` == 100 and `ClientCnx.getMaxMessageSize() == 50`, `chunkMaxMessageSize` init as 50;
   2. New connection: `chunkMaxMessageSize` == 50 and `ClientCnx.getMaxMessageSize()` == 200, the result is 50, but it should be 100.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r823265673



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -492,6 +496,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
                 compressedPayload.release();
                 return;
             }
+            payloadChunkSize = Math.min(chunkMaxMessageSize, payloadChunkSize);

Review comment:
       What I mean is check the config is enabled (!= -1) first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia merged pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
rdhabalia merged pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r823268153



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1525,6 +1530,7 @@ public int messagesCount() {
     @Override
     public void connectionOpened(final ClientCnx cnx) {
         previousExceptions.clear();
+        chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize());

Review comment:
       > max-size is not something broker specific but it's bk specific
   
   We may have different settings on max-size both in broker and bookie, as long as broker limit < bookie limit. 
   And we can update broker level setting by changing the config file and restart. So it should be handled.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #14382: [pulsar-broker] support client configurable message chunk size

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #14382:
URL: https://github.com/apache/pulsar/pull/14382#discussion_r812254050



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -168,7 +168,8 @@
      * will fail unless {@code blockIfQueueFull=true}. Use {@link #blockIfQueueFull(boolean)}
      * to change the blocking behavior.
      *
-     * <p>The producer queue size also determines the max amount of memory that will be required by
+     * <p>The producer queue size also determin@Override
+    es the max amount of memory that will be required by

Review comment:
       fixed.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -439,7 +443,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
             // validate msg-size (For batching this will be check at the batch completion size)
             int compressedSize = compressedPayload.readableBytes();
-            if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
+            if (compressedSize > chunkMaxMessageSize && !this.conf.isChunkingEnabled()) {

Review comment:
       fixed.

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
##########
@@ -343,6 +344,15 @@
      */
     ProducerBuilder<T> enableChunking(boolean enableChunking);
 
+    /**
+     * Max chunk message size in bytes. Producer chunks the message if chunking is enabled and message size is larger
+     * than max chunk-message size. By default, producer chunks based on max-message size configured at broker.
+     *
+     * @param chunkMaxMessageSize
+     * @return
+     */

Review comment:
       added info.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org