You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/07 06:08:58 UTC

[GitHub] [pulsar] michaeljmarshall opened a new pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

michaeljmarshall opened a new pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159


   While investigating https://github.com/apache/pulsar/issues/9916, I noticed that the `deadLetterProducer` is never closed, which could lead to a leak if enough consumers are configured to deliver DLQ messages.
   
   This change does not fix the issue raised here: https://github.com/apache/pulsar/issues/9916.
   
   ### Motivation
   
   The `deadLetterProducer` should be closed when closing consumer tasks.
   
   ### Modifications
   
   Added an asynchronous close to the `deadLetterProducer`.
   
   ### Alternative Implementations
   
   The close could be synchronous, but the method, `closeConsumerTasks`, is void and contains other methods that asynchronously close resources, so I am matching that pattern. I'm happy to change the implementation if others think we want to enforce some type of ordering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r609228351



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();

Review comment:
       I'm happy to change it, but I was following the apparent convention in the file: the two other times where the line appears in this file, it is within the `try` block. Should I update those as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r609235715



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();

Review comment:
       I can see in other classes that we acquire the lock before entering the `try` block. I'll submit a new commit that changes the behavior for each reference in this file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#issuecomment-814631517


   @addisonj - I referenced this producer leak during Thursday's community meeting. Tagging you in case you're interested in reviewing this. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r609359089



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            createProducerLock.writeLock().lock();
+            try {
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);
+                    deadLetterProducer = null;
+                }
+            } catch (Exception e) {
+                log.error("Error closing deadLetterProducer for topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);

Review comment:
       should we handle InterruptedException here ?
   

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);

Review comment:
       +1 for blocking until the close operation finishes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r612122925



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);

Review comment:
       @eolivelli - given that this method incurs a network call to the broker, I think it'd be preferable to use the `closeAsync`, if possible. I added a commit to show what I'm thinking. Let me know if this works, or if you'd still prefer to block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall closed pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall closed pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r608795618



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();

Review comment:
       This should typically go before the `try {`

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);

Review comment:
       This would be done asynchronously. I think we should instead be waiting until the producer is close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r609239453



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);

Review comment:
       I wasn't sure whether or not this should be closed synchronously or asynchronously (I called this out in the PR's description). Thank you for this feedback. I am okay with changing the behavior but first want to explain why I originally proposed it this way. 
   
   Notice that on the line right after my addition, we call `acknowledgmentsGroupingTracker.close();`, which ultimately calls the `flushAsync()` method in the `PersistentAcknowledgmentsGroupingTracker` class. That method returns a future, but we do not capture it or wait for its completion. Further, this `closeConsumerTasks` method is called by the `closeAsync` method for the consumer, and it is on the calling thread, so it will block the end user calling `closeAsync`. That seems suboptimal to me.
   
   An alternate approach could be to use futures here to asynchronously close resources and then use call backs to enforce a proper ordering on the closure of different resources.
   
   @merlimat - let me know what you think. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #10159: [Client][Java] Close deadLetterProducer in Java consumer client (prevent potential leak)

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#issuecomment-968211965


   Closing for now. The underlying leak still exists.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org