You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/15 07:10:14 UTC

[incubator-pulsar] branch master updated: Improve sync send logic for allowing batching (#1386)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 323f4f1  Improve sync send logic for allowing batching (#1386)
323f4f1 is described below

commit 323f4f1ca2d0dee0b9b1aac55533ca8b36a30586
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Mar 15 00:10:12 2018 -0700

    Improve sync send logic for allowing batching (#1386)
    
    *Motivation*
    
    Batching is required for achieving high performance messaging. If we want to flip the default behaviour of publishers into batch publishing,
    we need to improve current send logic to flush out batches after enqueuing the send requests. so batch settings won't impact sync sends.
    
    *Modification*
    
    - add a flush() method on single partition producer
    - break the send logic on single partition producer to `enqueue-flush-wait`
    - change multi-partitions producer to use single partition producer
---
 .../client/impl/PartitionedProducerImpl.java       |  8 +++++
 .../apache/pulsar/client/impl/ProducerBase.java    | 17 -----------
 .../apache/pulsar/client/impl/ProducerImpl.java    | 34 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 494fbce..ba6fd28 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -139,6 +139,14 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     }
 
     @Override
+    public MessageId send(Message<T> message) throws PulsarClientException {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+            "Illegal partition index chosen by the message routing policy");
+        return producers.get(partition).send(message);
+    }
+
+    @Override
     public CompletableFuture<MessageId> sendAsync(Message<T> message) {
 
         switch (getState()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 45453ae..0cf8bab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -55,23 +55,6 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
     }
 
     @Override
-    public MessageId send(Message<T> message) throws PulsarClientException {
-        try {
-            return sendAsync(message).get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
     abstract public CompletableFuture<MessageId> sendAsync(Message<T> message);
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 75661cb..82c0876 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1153,6 +1154,39 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     };
 
+    @Override
+    public MessageId send(Message<T> message) throws PulsarClientException {
+        try {
+            // enqueue the message to the buffer
+            CompletableFuture<MessageId> sendFuture = sendAsync(message);
+
+            if (!sendFuture.isDone()) {
+                // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to flush it out
+                flush();
+            }
+
+            return sendFuture.get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    private void flush() {
+        if (isBatchMessagingEnabled()) {
+            synchronized (ProducerImpl.this) {
+                batchMessageAndSend();
+            }
+        }
+    }
+
     // must acquire semaphore before enqueuing
     private void batchMessageAndSend() {
         if (log.isDebugEnabled()) {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.