You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/20 06:27:04 UTC
[incubator-pulsar] branch master updated: Timeout task for
batchMessageAndSendTask is not cancelled when close (#1405)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef87567 Timeout task for batchMessageAndSendTask is not cancelled when close (#1405)
ef87567 is described below
commit ef87567b11c8d1a1b5c3291cccf077b7a64a563e
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Mon Mar 19 23:26:59 2018 -0700
Timeout task for batchMessageAndSendTask is not cancelled when close (#1405)
* Timeout task for batchMessageAndSendTask is not cancelled when close
* fix potential race
* also fix the potential race in SendTimeout messages
---
.../apache/pulsar/client/impl/ProducerImpl.java | 26 ++++++++++++++++++----
1 file changed, 22 insertions(+), 4 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 82c0876..e1a505a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -83,6 +83,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final BlockingQueue<OpSendMsg> pendingCallbacks;
private final Semaphore semaphore;
private volatile Timeout sendTimeout = null;
+ private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final int maxNumMessagesInBatch;
private final BatchMessageContainer batchMessageContainer;
@@ -520,6 +521,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
sendTimeout = null;
}
+ Timeout batchTimeout = batchMessageAndSendTimeout;
+ if (batchTimeout != null) {
+ batchTimeout.cancel();
+ batchMessageAndSendTimeout = null;
+ }
+
if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
keyGeneratorTask.cancel(false);
}
@@ -1046,6 +1053,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
long timeToWaitMs;
synchronized (this) {
+ // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return;
+ }
+
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg == null) {
// If there are no pending messages, reset the timeout to the configured value.
@@ -1071,9 +1083,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
timeToWaitMs = diff;
}
}
- }
- sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+ sendTimeout = client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
+ }
}
/**
@@ -1147,10 +1159,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
+ // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return;
+ }
+
batchMessageAndSend();
+ // schedule the next batch message task
+ batchMessageAndSendTimeout = client.timer()
+ .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
- // schedule the next batch message task
- client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
};
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.