You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/24 22:37:12 UTC

[GitHub] [pulsar] aahmed-se opened a new pull request #12181: [Draft] Limit the semaphore count of the pulsar producer to prevent overflow

aahmed-se opened a new pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] doc-required 
     
     (If you need help on updating docs, create a doc issue)
     
   - [ ] no-need-doc 
     
     (Please explain why)
     
   - [ ] doc 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718811046



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think it limits the issue. Simply it's masking it by making the semaphore useless, as in that the acquire/release are going to be completely either always blocking or never blocking. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#issuecomment-930944129


   Hi @aahmed-se for this PR, do we need to update docs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718786230



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1018,14 +1021,17 @@ private long getHighestSequenceId(OpSendMsg op) {
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
-        }
+        semaphore.ifPresent(semaphore -> semaphore.release(
+                Math.min(
+                        maxPermits - semaphore.availablePermits(),

Review comment:
       We are not trying to  solve all of the issues here, this is a simple overflow protection fix.
   The root cause still has to be figured out but this is simpler alternative to implementing a bounded semaphore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718819655



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       creating a bounded semaphore seems to be a popular approach to safeguard production that's what I tried to emulate.
   https://github.com/googleapis/gax-java/blob/49efdc38f2d6ca3838dcecee63dc6cd07f6c2b23/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718786230



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1018,14 +1021,17 @@ private long getHighestSequenceId(OpSendMsg op) {
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
-        }
+        semaphore.ifPresent(semaphore -> semaphore.release(
+                Math.min(
+                        maxPermits - semaphore.availablePermits(),

Review comment:
       We are not trying to  solve all of the issues here, this is a simple overflow protection fix.
   The root cause still has to be figured out but this is simpler alternative to implementing a bounded semaphore.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This limits this issue https://github.com/apache/pulsar/issues/12151 happening in prod. Since we implementing a max limit. 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger , that will prevent the blow up in production but still record the issue.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger, that will prevent the blow up in production but still record the issue.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       creating a bounded semaphore seems to be a popular approach to safeguard production that's what I tried to emulate.
   https://github.com/googleapis/gax-java/blob/49efdc38f2d6ca3838dcecee63dc6cd07f6c2b23/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I try and limit the total number of permits that can be held. Regardless of release requests.
   
   It's similar to this https://github.com/googleapis/gax-java/blob/49efdc38f2/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java but I am not throwing any exceptions but silently disgarding the requests. I think logging the makes sense though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718808933



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This limits this issue https://github.com/apache/pulsar/issues/12151 happening in prod. Since we implementing a max limit. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#issuecomment-927247949


   @aahmed-se Thanks for your contribution. For this PR, do we need to update docs?
   
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718811926



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       So, it's actually preferable to have an exception that identifies that we have an issue (provided that we can restart from there) rather than just masking the problem. That is, until a proper solution for the root cause is implemented.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718813746



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger , that will prevent the blow up in production but still record the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718806516



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This is just masking the problem. If the counts are out of sync, this won't be helping either as it will not put them back in sync. We need to identify the reason for why they're out of sync.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r716997559



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1018,14 +1021,17 @@ private long getHighestSequenceId(OpSendMsg op) {
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
-        }
+        semaphore.ifPresent(semaphore -> semaphore.release(
+                Math.min(
+                        maxPermits - semaphore.availablePermits(),

Review comment:
       I don't understand how we are dealing with concurrency here.
   When you capture the number of available permits another thread could have operated on the semaphore 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718806516



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       This is just masking the problem. If the counts are out of sync, this won't be helping either as it will not put them back in sync. We need to identify the reason for why they're out of sync.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think it limits the issue. Simply it's masking it by making the semaphore useless, as in that the acquire/release are going to be completely either always blocking or never blocking. 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       So, it's actually preferable to have an exception that identifies that we have an issue (provided that we can restart from there) rather than just masking the problem. That is, until a proper solution for the root cause is implemented.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think limiting the counter is a good idea. Making sure we recover from the exception is a better approach. 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       The terminology is not correct here. The semaphore *is* bounded (it's its only purpose): it only allows a number of acquire.
   
   The problem is if we're trying to release it more times than it was acquired, but that doesn't make it unbounded.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718831496



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I try and limit the total number of permits that can be held. Regardless of release requests.
   
   It's similar to this https://github.com/googleapis/gax-java/blob/49efdc38f2/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java but I am not throwing any exceptions but silently disgarding the requests. I think logging the makes sense though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r716997559



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1018,14 +1021,17 @@ private long getHighestSequenceId(OpSendMsg op) {
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
-        }
+        semaphore.ifPresent(semaphore -> semaphore.release(
+                Math.min(
+                        maxPermits - semaphore.availablePermits(),

Review comment:
       I don't understand how we are dealing with concurrency here.
   When you capture the number of available permits another thread could have operated on the semaphore 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#issuecomment-931494293


   > Hi @aahmed-se for this PR, do we need to update docs?
   
   No we don't


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718813746



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       Let's limit the count but add a logger, that will prevent the blow up in production but still record the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718818635



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       I don't think limiting the counter is a good idea. Making sure we recover from the exception is a better approach. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181#discussion_r718826974



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1790,9 +1796,15 @@ private void batchMessageAndSend() {
                 }
             } catch (PulsarClientException e) {
                 Thread.currentThread().interrupt();
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphore.ifPresent(s -> s.release(Math.min(
+                        maxPermits - s.availablePermits(),
+                        batchMessageContainer.getNumMessagesInBatch()))

Review comment:
       The terminology is not correct here. The semaphore *is* bounded (it's its only purpose): it only allows a number of acquire.
   
   The problem is if we're trying to release it more times than it was acquired, but that doesn't make it unbounded.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aahmed-se closed pull request #12181: Limit the semaphore count of the producer to prevent int overflow

Posted by GitBox <gi...@apache.org>.
aahmed-se closed pull request #12181:
URL: https://github.com/apache/pulsar/pull/12181


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org