You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "315157973 (via GitHub)" <gi...@apache.org> on 2023/03/01 01:28:58 UTC

[GitHub] [pulsar] 315157973 commented on a diff in pull request #19585: [fix] [client] fix memory leak if enabled pooled messages

315157973 commented on code in PR #19585:
URL: https://github.com/apache/pulsar/pull/19585#discussion_r1121006549


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1075,6 +1076,45 @@ private void closeConsumerTasks() {
         }
         negativeAcksTracker.close();
         stats.getStatTimeout().ifPresent(Timeout::cancel);
+        releaseMsgIfEnabledPooledMsg();
+    }
+
+    /**
+     * If enabled pooled messages, we should release the messages after closing consumer.
+     *   1. Call "clearIncomingMessages" regardless of whether "internalPinnedExecutor" has shutdown or not.
+     *   2. Increment epoch will auto release all messages which received after close.
+     *   3. Call "clearIncomingMessages" in "internalPinnedExecutor" is used to clear messages in flight.
+     */
+    private void releaseMsgIfEnabledPooledMsg() {
+        if (!poolMessages) {
+            return;
+        }
+        // Increment epoch.
+        CONSUMER_EPOCH.incrementAndGet(this);
+        // Try clear the incoming queue in internalPinnedExecutor-thread.
+        boolean executorIsShutdown = false;
+        try {
+            if (!internalPinnedExecutor.isShutdown()) {
+                internalPinnedExecutor.execute(this::clearIncomingMessages);
+            }
+        } catch (RejectedExecutionException rejectedExecutionException) {
+            executorIsShutdown = true;
+        }
+        // If internalPinnedExecutor is shutdown, try await termination and clear the incoming queue.
+        if (executorIsShutdown) {
+            boolean awaitTerminationSuccess = false;
+            try {
+                awaitTerminationSuccess = internalPinnedExecutor.awaitTermination(1, TimeUnit.SECONDS);

Review Comment:
   You are right, because this operation is not overall atomic.
   I hope to find a 100% guaranteed. 
   Can we disable the `incomingMessages`?
   For example: let the size of `GrowableArrayBlockingQueue` become -1, because internal operations are all locked, so atomicity can be maintained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org