You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/22 07:26:52 UTC

[GitHub] [pulsar] RobertIndie opened a new pull request, #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

RobertIndie opened a new pull request, #17795:
URL: https://github.com/apache/pulsar/pull/17795

   
   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   Fixes #17446
   
   
   ### Motivation
   
   When the `isBlockIfQueueFull` is enabled for the producer, if the producer sends a very large message(# of total chunks > maxPendingMessages) or the producer sends too many large messages concurrently, there will be deadlock when the producer tries too acquire the permit before publishing these messages.
   
   The root cause is that the producer will try to acquire all permits for all chunks before publishing them. It will be blocked if there are not enough permits. For example, if the payload size of the message is 55 MB which will be split into 11 chunks and the maxPendingMessages size is 10, it will be blocked forever. Even if the message is not too large(like 30MB) but sends 5 such messages concurrently, it may also be blocked forever.
   
   ### Modifications
   
   * If the `BlockIfQueueFull` is enabled, the producer will only acquire a single permit for each chunk before publishing them. 
   
   This PR only affects the case of `BlockIfQueueFull` enabled when sending chunked messages, and it will not affect other existing behaviors.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/RobertIndie/pulsar/pull/5
   
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991779036


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   It's already imported. The compilation was fine in my repo: https://github.com/RobertIndie/pulsar/pull/5 I am investigating the CI problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991784145


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   @BewareMyPower Yes. Chunks are sent asynchronously one by one in the same thread. But from your example, it seems that they are sent synchronously one by one. My understanding is that there may be a case like this in your approach:
   
   Chunk 0 before send: Acquire 1 semaphore and max(size...) memory
   Chunk 1 before send: 
   // Here we already occupy 2 semaphores and (size0+size1) memory. It's not the correct behavior.
   Chunk 0 after send: 
   Chunk 1 after send: Release 1 semaphore and max(size...) memory.
   ...
   
   Please correct if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17795:
URL: https://github.com/apache/pulsar/pull/17795


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991522490


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   The root cause is that for a chunked message with N chunks, since chunks are sent asynchronously one by one in the same thread, the time order will be:
   
   1. Chunk 0 before send: Acquire 1 semaphore and `size0` memory.
   2. Chunk 0 after send: Release 1 semaphore and `size0` memory.
   3. Chunk 1 before send: Acquire 1 semaphore and `size1` memory.
   4. Chunk 1 after send: Release 1 semaphore and `size1` memory. 
   5. ...
   
   In this case, we don't need N spots from the semaphore and `sum(size0, size1, ...)` bytes from the memory limiter. Instead, we only need 1 spot from the semaphore and `max(size0, size1, ...)` bytes from the memory limiter.
   
   Therefore, we should not call `canEnqueueRequest` for chunk 1 and later, no matter if BlockIfQueueFUll is enabled. Instead, we should call `canEnqueueRequest` in each iteration in the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991783023


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   Just see your branch is far behind the master, please rebase to latest master.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991804936


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   Oh I see. Mark it as resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991511684


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }

Review Comment:
   `canEnqueueRequest` always return true when `conf.isBlockIfQueueFull()` returns true. So it could never go into this if block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991784916


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }

Review Comment:
   I see now. However, when an `InterruptedException` is thrown, it means `Semaphore#acquire` or `MemoryLimitController#reserveMemory` is interrupted.
   - If `Semaphore` is interrupted, we should not release both.
   - If `MemoryLimitController` is interrupted, we should only release the semaphore.
   
   BTW, I think it's a bad design to just return false in this case for the different cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991784145


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   @BewareMyPower Yes. Chunks are sent asynchronously one by one in the same thread. But from your example, it seems that they are sent synchronously one by one. My understanding is that there may be a case like this in your approach:
   
   Chunk 0 before send: Acquire 1 semaphore and max(size...) memory
   Chunk 1 before send: 
   // Here we already occupy 2 semaphores and (size0+size1) memory. It's not the correct behavior.
   Chunk 0 after send: 
   Chunk 1 after send: Release 1 semaphore and max(size...) memory.
   ...
   
   This doesn't seem a correct behavior. Please correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991792328


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   If the thread is blocked at `canEnqueueRequest`, the IO thread will try to acquire this lock after receiving the ack of the send request. And this will be a deadlock [here]()https://github.com/RobertIndie/pulsar/blob/c6ebe063fc1c4088f5500ffd7ab5645c14bd581c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1126.
   
   I can also reproduce the deadlock after rebasing to the latest master branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991792328


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   If the thread is blocked at `canEnqueueRequest`, the IO thread will try to acquire this lock after receiving the ack of the send request. And this will be a deadlock [here]()https://github.com/RobertIndie/pulsar/blob/c6ebe063fc1c4088f5500ffd7ab5645c14bd581c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1126.
   
   I can reproduce the deadlock after rebasing to the latest master branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991805850


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   I see. Yeah, it should be careful to include a blocking operation in a synchronized block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r977318601


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   We need to narrow down the synchronized block and should not acquire permits in the synchronized code. Otherwise, it will cause another deadlock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r977318601


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   We need to narrow down the synchronized block and should not acquire permits in the synchronized. Otherwise, it will cause another deadlock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991778411


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }

Review Comment:
   It does not always return true in that case. We need to handle the exception case. https://github.com/RobertIndie/pulsar/blob/c6ebe063fc1c4088f5500ffd7ab5645c14bd581c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L937-L940



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991782699


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   ![image](https://user-images.githubusercontent.com/18204803/194990864-54d6f5e5-6c68-405f-b4a7-a768923ccdf7.png)
   
   It also failed in my Intellij Idea on Windows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991293309


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,54 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)

Review Comment:
   ```suggestion
           @Cleanup
           final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991387944


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   It looks like `Lists` is not imported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991786119


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }

Review Comment:
   Anyway, it's just a corner case. It's not much related to this PR, I will push a new PR for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991788004


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   It's imported here: https://github.com/RobertIndie/pulsar/blob/c6ebe063fc1c4088f5500ffd7ab5645c14bd581c/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java#L26
   
   I think it's related to the branch. I am rebasing the branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991788004


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   It's imported here: https://github.com/RobertIndie/pulsar/blob/c6ebe063fc1c4088f5500ffd7ab5645c14bd581c/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java#L26
   
   Wired. I will rebase the branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991809725


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   ```
   [INFO] --- modernizer-maven-plugin:2.3.0:modernizer (modernizer) @ pulsar-broker ---
   Error:  /home/runner/work/pulsar/pulsar/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:609: Prefer java.util.ArrayList<>()
   ```
   
   It looks like the modernizer-maven-plugin prefers using `ArrayList` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991350988


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {

Review Comment:
   ```suggestion
       public void testBlockIfQueueFullWhenChunking() throws Exception {
   ```
   
   A typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991503395


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }
 
         try {
-            synchronized (this) {
-                int readStartIndex = 0;
-                String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
-                ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
-                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
-                        ? msg.getMessageBuilder().getSchemaVersion() : null;
-                byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
-                        ? msg.getMessageBuilder().getOrderingKey() : null;
-                for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
-                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
-                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
-                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
-                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0) {
-                        if (schemaVersion != null) {
-                            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
-                        }
-                        if (orderingKey != null) {
-                            msg.getMessageBuilder().setOrderingKey(orderingKey);
-                        }
+            int readStartIndex = 0;
+            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
+            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
+                    ? msg.getMessageBuilder().getSchemaVersion() : null;
+            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
+                    ? msg.getMessageBuilder().getOrderingKey() : null;
+            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                if (chunkId > 0) {
+                    if (schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
+                    if (orderingKey != null) {
+                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                     }
+                }
+                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
+                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
+                    semaphoreRelease(totalChunks - chunkId);
+                    return;
+                }
+                synchronized (this) {

Review Comment:
   I agree that the synchronized block should be shrinked. But could you explain in details about why will it cause another deadlock? With the previous synchronized block, the test still succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991734795


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback callback) {
 
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
-            if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
+                    0 /* The memory was already reserved */)) {
                 client.getMemoryLimitController().releaseMemory(uncompressedSize);
                 semaphoreRelease(i + 1);
                 return;
             }
         }

Review Comment:
   I think it's a great point to make the implementation simple and easy to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991781099


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   Could you point out in which line it's imported? I cannot find any `import xx.xx.Lists` in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17795: [fix][client] Fix deadlock when sending chunked messages with BlockIFQueueFull enabled

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991792887


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -573,6 +574,55 @@ public void testChunkSize() throws Exception {
         }
     }
 
+    @Test
+    public void testBlockIfQueueFUllWhenChunking() throws Exception {
+        this.conf.setMaxMessageSize(50);
+
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("my-property/my-ns/test-chunk-size")
+                .enableChunking(true)
+                .enableBatching(false)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(3)
+                .create();
+
+        // Test sending large message (totalChunks > maxPendingMessages) should not cause deadlock
+        // We need to use a separate thread to send the message instead of using the sendAsync, because the deadlock
+        // might happen before publishing messages to the broker.
+        CompletableFuture<Void> sendMsg = CompletableFuture.runAsync(() -> {
+            try {
+                producer.send(createMessagePayload(200));
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            sendMsg.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            Assert.fail("Deadlock detected when sending large message.");
+        }
+
+        // Test sending multiple large messages (For every message, totalChunks < maxPendingMessages) concurrently
+        // should not cause the deadlock.
+        List<CompletableFuture<Void>> sendMsgFutures = Lists.newArrayList();

Review Comment:
   Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org