You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/07/12 16:09:01 UTC

[GitHub] merlimat closed pull request #2103: Issue 1433: Expose batch flushAsync() and flush() methods in Producer

merlimat closed pull request #2103: Issue 1433: Expose batch flushAsync() and flush() methods in Producer
URL: https://github.com/apache/incubator-pulsar/pull/2103
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e3ea2a6de1..79166a18f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2552,4 +2552,76 @@ public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testFlushBatchEnabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic("persistent://my-property/my-ns/test-flush-enabled")
+            .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic("persistent://my-property/my-ns/test-flush-enabled")
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(10000);
+
+        try (Producer<byte[]> producer = producerBuilder.create()) {
+            for (int i = 0; i < 10; i++) {
+                String message = "my-message-" + i;
+                producer.sendAsync(message.getBytes());
+            }
+            producer.flush();
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testFlushBatchDisabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic("persistent://my-property/my-ns/test-flush-disabled")
+            .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic("persistent://my-property/my-ns/test-flush-disabled")
+                .enableBatching(false);
+
+        try (Producer<byte[]> producer = producerBuilder.create()) {
+            for (int i = 0; i < 10; i++) {
+                String message = "my-message-" + i;
+                producer.sendAsync(message.getBytes());
+            }
+            producer.flush();
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
index a98073c141..e523484e6e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -74,6 +74,24 @@
      */
     CompletableFuture<MessageId> sendAsync(T message);
 
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @throws PulsarClientException
+     * @since 2.1.0
+     * @see #flushAsync()
+     */
+    void flush() throws PulsarClientException;
+
+    /**
+     * Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
+     *
+     * @return a future that can be used to track when all the messages have been safely persisted.
+     * @since 2.1.0
+     * @see #flush()
+     */
+    CompletableFuture<Void> flushAsync();
+
     /**
      * Create a new message builder
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 894c875e82..3e159e8f48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -167,8 +168,15 @@ private void start() {
     }
 
     @Override
-    void flush() {
-        producers.forEach(ProducerImpl::flush);
+    public CompletableFuture<Void> flushAsync() {
+        List<CompletableFuture<Void>> flushFutures =
+            producers.stream().map(ProducerImpl::flushAsync).collect(Collectors.toList());
+        return CompletableFuture.allOf(flushFutures.toArray(new CompletableFuture[flushFutures.size()]));
+    }
+
+    @Override
+    void triggerFlush() {
+        producers.forEach(ProducerImpl::triggerFlush);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index a75aa01c38..eb02b6bd8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -78,8 +78,8 @@ public MessageId send(Message<T> message) throws PulsarClientException {
             CompletableFuture<MessageId> sendFuture = internalSendAsync(message);
 
             if (!sendFuture.isDone()) {
-                // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to flush it out
-                flush();
+                // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush it out
+                triggerFlush();
             }
 
             return sendFuture.get();
@@ -96,7 +96,24 @@ public MessageId send(Message<T> message) throws PulsarClientException {
         }
     }
 
-    abstract void flush();
+    @Override
+    public void flush() throws PulsarClientException {
+        try {
+            flushAsync().get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof PulsarClientException) {
+                throw (PulsarClientException) cause;
+            } else {
+                throw new PulsarClientException(cause);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    abstract void triggerFlush();
 
     @Override
     public void close() throws PulsarClientException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a621a0ec3e..12a49c0f0f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -70,6 +70,7 @@
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +92,7 @@
     private long createProducerTimeout;
     private final int maxNumMessagesInBatch;
     private final BatchMessageContainer batchMessageContainer;
+    private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
 
     // Globally unique producer name
     private String producerName;
@@ -333,6 +335,7 @@ public void sendAsync(Message<T> message, SendCallback callback) {
                     // batch size and/or max message size
                     if (batchMessageContainer.hasSpaceInBatch(msg)) {
                         batchMessageContainer.add(msg, callback);
+                        lastSendFuture = callback.getFuture();
                         payload.release();
                         if (batchMessageContainer.numMessagesInBatch == maxNumMessagesInBatch
                                 || batchMessageContainer.currentBatchSizeBytes >= BatchMessageContainer.MAX_MESSAGE_BATCH_SIZE_BYTES) {
@@ -353,6 +356,7 @@ public void sendAsync(Message<T> message, SendCallback callback) {
                     op.setNumMessagesInBatch(1);
                     op.setBatchSizeByte(encryptedPayload.readableBytes());
                     pendingMessages.put(op);
+                    lastSendFuture = callback.getFuture();
 
                     // Read the connection before validating if it's still connected, so that we avoid reading a null
                     // value
@@ -427,6 +431,7 @@ private void doBatchSendAndAdd(MessageImpl<T> msg, SendCallback callback, ByteBu
         }
         batchMessageAndSend();
         batchMessageContainer.add(msg, callback);
+        lastSendFuture = callback.getFuture();
         payload.release();
     }
 
@@ -1218,7 +1223,19 @@ public void run(Timeout timeout) throws Exception {
     };
 
     @Override
-    protected void flush() {
+    public CompletableFuture<Void> flushAsync() {
+        CompletableFuture<MessageId> lastSendFuture;
+        synchronized (ProducerImpl.this) {
+            if (isBatchMessagingEnabled()) {
+                batchMessageAndSend();
+            }
+            lastSendFuture = this.lastSendFuture;
+        }
+        return lastSendFuture.thenApply(ignored -> null);
+    }
+
+    @Override
+    protected void triggerFlush() {
         if (isBatchMessagingEnabled()) {
             synchronized (ProducerImpl.this) {
                 batchMessageAndSend();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services