You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/20 06:27:01 UTC

[GitHub] merlimat closed pull request #1405: Timeout task for batchMessageAndSendTask is not cancelled when close

merlimat closed pull request #1405: Timeout task for batchMessageAndSendTask is not cancelled when close
URL: https://github.com/apache/incubator-pulsar/pull/1405
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 82c087634..e1a505aa0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -83,6 +83,7 @@
     private final BlockingQueue<OpSendMsg> pendingCallbacks;
     private final Semaphore semaphore;
     private volatile Timeout sendTimeout = null;
+    private volatile Timeout batchMessageAndSendTimeout = null;
     private long createProducerTimeout;
     private final int maxNumMessagesInBatch;
     private final BatchMessageContainer batchMessageContainer;
@@ -520,6 +521,12 @@ protected WriteInEventLoopCallback newObject(Handle<WriteInEventLoopCallback> ha
             sendTimeout = null;
         }
 
+        Timeout batchTimeout = batchMessageAndSendTimeout;
+        if (batchTimeout != null) {
+            batchTimeout.cancel();
+            batchMessageAndSendTimeout = null;
+        }
+
         if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
             keyGeneratorTask.cancel(false);
         }
@@ -1046,6 +1053,11 @@ public void run(Timeout timeout) throws Exception {
         long timeToWaitMs;
 
         synchronized (this) {
+            // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
+            if (getState() == State.Closing || getState() == State.Closed) {
+                return;
+            }
+
             OpSendMsg firstMsg = pendingMessages.peek();
             if (firstMsg == null) {
                 // If there are no pending messages, reset the timeout to the configured value.
@@ -1071,9 +1083,9 @@ public void run(Timeout timeout) throws Exception {
                     timeToWaitMs = diff;
                 }
             }
-        }
 
-        sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+            sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+        }
     }
 
     /**
@@ -1147,10 +1159,16 @@ public void run(Timeout timeout) throws Exception {
             }
             // semaphore acquired when message was enqueued to container
             synchronized (ProducerImpl.this) {
+                // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
+                if (getState() == State.Closing || getState() == State.Closed) {
+                    return;
+                }
+
                 batchMessageAndSend();
+                // schedule the next batch message task
+                batchMessageAndSendTimeout = client.timer()
+                    .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
             }
-            // schedule the next batch message task
-            client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
         }
     };
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services