You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/08/04 23:50:10 UTC
[pulsar] branch master updated: Perform producer compression from
IO threads (#7733)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf4549 Perform producer compression from IO threads (#7733)
7bf4549 is described below
commit 7bf4549727333d7d6bb37eda852af5b3b47c37f9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 4 16:49:46 2020 -0700
Perform producer compression from IO threads (#7733)
* Added `zookeeper.snapshot.trust.empty=true` for ZK 3.5 upgrade
* Perform producer compression from IO threads
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/client/impl/ProducerImpl.java | 59 +++++++++-------------
1 file changed, 25 insertions(+), 34 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b2f00b2..2609060 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -97,7 +97,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final BlockingQueue<OpSendMsg> pendingCallbacks;
private final Semaphore semaphore;
private volatile Timeout sendTimeout = null;
- private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
@@ -133,6 +132,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final ConnectionHandler connectionHandler;
+ private ScheduledFuture<?> batchTimerTask;
+
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -406,7 +407,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return;
}
}
-
+
try {
synchronized (this) {
int readStartIndex = 0;
@@ -795,10 +796,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
sendTimeout = null;
}
- Timeout batchTimeout = batchMessageAndSendTimeout;
- if (batchTimeout != null) {
- batchTimeout.cancel();
- batchMessageAndSendTimeout = null;
+ ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+ if (batchTimerTask != null) {
+ batchTimerTask.cancel(false);
+ this.batchTimerTask = null;
}
if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
@@ -1229,8 +1230,24 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
- client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
- TimeUnit.MICROSECONDS);
+ batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "[{}] [{}] Batching the messages from the batch container from timer thread",
+ topic,
+ producerName);
+ }
+ // semaphore acquired when message was enqueued to container
+ synchronized (ProducerImpl.this) {
+ // If it's closing/closed we need to ignore the send batch timer and not
+ // schedule next timeout.
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return;
+ }
+
+ batchMessageAndSend();
+ }
+ }, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
@@ -1504,32 +1521,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
semaphore.release(numMessagesInBatch);
}
- TimerTask batchMessageAndSendTask = new TimerTask() {
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled()) {
- return;
- }
- if (log.isTraceEnabled()) {
- log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", topic,
- producerName);
- }
- // semaphore acquired when message was enqueued to container
- synchronized (ProducerImpl.this) {
- // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
- if (getState() == State.Closing || getState() == State.Closed) {
- return;
- }
-
- batchMessageAndSend();
- // schedule the next batch message task
- batchMessageAndSendTimeout = client.timer()
- .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
- }
- }
- };
-
@Override
public CompletableFuture<Void> flushAsync() {
CompletableFuture<MessageId> lastSendFuture;