You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/20 06:27:04 UTC

[incubator-pulsar] branch master updated: Timeout task for batchMessageAndSendTask is not cancelled when close (#1405)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef87567  Timeout task for batchMessageAndSendTask is not cancelled when close (#1405)
ef87567 is described below

commit ef87567b11c8d1a1b5c3291cccf077b7a64a563e
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Mon Mar 19 23:26:59 2018 -0700

    Timeout task for batchMessageAndSendTask is not cancelled when close (#1405)
    
    * Timeout task for batchMessageAndSendTask is not cancelled when close
    
    * fix potential race
    
    * also fix the potential race in SendTimeout messages
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 82c0876..e1a505a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -83,6 +83,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     private final BlockingQueue<OpSendMsg> pendingCallbacks;
     private final Semaphore semaphore;
     private volatile Timeout sendTimeout = null;
+    private volatile Timeout batchMessageAndSendTimeout = null;
     private long createProducerTimeout;
     private final int maxNumMessagesInBatch;
     private final BatchMessageContainer batchMessageContainer;
@@ -520,6 +521,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             sendTimeout = null;
         }
 
+        Timeout batchTimeout = batchMessageAndSendTimeout;
+        if (batchTimeout != null) {
+            batchTimeout.cancel();
+            batchMessageAndSendTimeout = null;
+        }
+
         if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
             keyGeneratorTask.cancel(false);
         }
@@ -1046,6 +1053,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         long timeToWaitMs;
 
         synchronized (this) {
+            // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
+            if (getState() == State.Closing || getState() == State.Closed) {
+                return;
+            }
+
             OpSendMsg firstMsg = pendingMessages.peek();
             if (firstMsg == null) {
                 // If there are no pending messages, reset the timeout to the configured value.
@@ -1071,9 +1083,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     timeToWaitMs = diff;
                 }
             }
-        }
 
-        sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+            sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+        }
     }
 
     /**
@@ -1147,10 +1159,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             // semaphore acquired when message was enqueued to container
             synchronized (ProducerImpl.this) {
+                // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
+                if (getState() == State.Closing || getState() == State.Closed) {
+                    return;
+                }
+
                 batchMessageAndSend();
+                // schedule the next batch message task
+                batchMessageAndSendTimeout = client.timer()
+                    .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
             }
-            // schedule the next batch message task
-            client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
         }
     };
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.