You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/08/04 23:50:10 UTC

[pulsar] branch master updated: Perform producer compression from IO threads (#7733)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf4549  Perform producer compression from IO threads (#7733)
7bf4549 is described below

commit 7bf4549727333d7d6bb37eda852af5b3b47c37f9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 4 16:49:46 2020 -0700

    Perform producer compression from IO threads (#7733)
    
    * Added `zookeeper.snapshot.trust.empty=true` for ZK 3.5 upgrade
    
    * Perform producer compression from IO threads
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 59 +++++++++-------------
 1 file changed, 25 insertions(+), 34 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b2f00b2..2609060 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -97,7 +97,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     private final BlockingQueue<OpSendMsg> pendingCallbacks;
     private final Semaphore semaphore;
     private volatile Timeout sendTimeout = null;
-    private volatile Timeout batchMessageAndSendTimeout = null;
     private long createProducerTimeout;
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
@@ -133,6 +132,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private final ConnectionHandler connectionHandler;
 
+    private ScheduledFuture<?> batchTimerTask;
+
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -406,7 +407,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 return;
             }
         }
-        
+
         try {
             synchronized (this) {
                 int readStartIndex = 0;
@@ -795,10 +796,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             sendTimeout = null;
         }
 
-        Timeout batchTimeout = batchMessageAndSendTimeout;
-        if (batchTimeout != null) {
-            batchTimeout.cancel();
-            batchMessageAndSendTimeout = null;
+        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+        if (batchTimerTask != null) {
+            batchTimerTask.cancel(false);
+            this.batchTimerTask = null;
         }
 
         if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
@@ -1229,8 +1230,24 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                         if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
                             // schedule the first batch message task
-                            client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
-                                    TimeUnit.MICROSECONDS);
+                            batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
+                                if (log.isTraceEnabled()) {
+                                    log.trace(
+                                            "[{}] [{}] Batching the messages from the batch container from timer thread",
+                                            topic,
+                                            producerName);
+                                }
+                                // semaphore acquired when message was enqueued to container
+                                synchronized (ProducerImpl.this) {
+                                    // If it's closing/closed we need to ignore the send batch timer and not
+                                    // schedule next timeout.
+                                    if (getState() == State.Closing || getState() == State.Closed) {
+                                        return;
+                                    }
+
+                                    batchMessageAndSend();
+                                }
+                            }, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                         }
                         resendMessages(cnx);
                     }
@@ -1504,32 +1521,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         semaphore.release(numMessagesInBatch);
     }
 
-    TimerTask batchMessageAndSendTask = new TimerTask() {
-
-        @Override
-        public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled()) {
-                return;
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", topic,
-                        producerName);
-            }
-            // semaphore acquired when message was enqueued to container
-            synchronized (ProducerImpl.this) {
-                // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
-                if (getState() == State.Closing || getState() == State.Closed) {
-                    return;
-                }
-
-                batchMessageAndSend();
-                // schedule the next batch message task
-                batchMessageAndSendTimeout = client.timer()
-                    .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
-            }
-        }
-    };
-
     @Override
     public CompletableFuture<Void> flushAsync() {
         CompletableFuture<MessageId> lastSendFuture;