You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/30 13:27:03 UTC

[pulsar] branch master updated: support async send msg return sequenceId when throw Exception (#6825)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 42ed9bf  support async send  msg return sequenceId when throw Exception (#6825)
42ed9bf is described below

commit 42ed9bf12e29099e5c1655484311441402dde3d7
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Thu Apr 30 21:26:54 2020 +0800

    support async send  msg return sequenceId when throw Exception (#6825)
    
    Master Issue: #6824
    ## Motivation
    
    When sending messages asynchronously fails, an exception will be thrown, but it is not known which message is abnormal, and the user does not know which messages need to be retried。
    
    ## Modifications
    
    This change can be supported on the client side,   when throwing an exception need to set sequenceId
    org.apache.pulsar.client.api.PulsarClientException
    
    
    ```java
    public class PulsarClientException extends IOException {
        private long sequenceId = -1;
    
        public PulsarClientException(String msg, long sequenceId) {
            super(msg);
            this.sequenceId = sequenceId;
        }
    ```
    Client examples
    ```java
      producer.newMessage().sequenceId(1).value(value.getBytes())
                    .sendAsync().thenAccept(msgId -> {
                        System.out.println(msgId);
                    }).exceptionally(ex -> {
                        System.out.println( ((PulsarClientException)ex.getCause()).getSequenceId());
                        return null;
                    });
    ```
---
 .../client/api/SimpleProducerConsumerTest.java     |  39 +++++++
 .../pulsar/client/api/PulsarClientException.java   | 121 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  33 +++---
 3 files changed, 177 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 51a7eef..a2fde6c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -47,6 +47,7 @@ import java.nio.file.Paths;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1143,6 +1144,44 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     }
 
     @Test
+    public void testtSendCallBackReturnSequenceId() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .enableBatching(false)
+                .topic("persistent://my-property/my-ns/my-topic5")
+                .sendTimeout(1, TimeUnit.SECONDS);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        final String message = "my-message";
+
+        // Trigger the send timeout
+        stopBroker();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<CompletableFuture<MessageId>>();
+        for(int i = 0 ; i < 3 ; i++) {
+             CompletableFuture<MessageId> future = producer.newMessage().sequenceId(i).value(message.getBytes()).sendAsync();
+             futures.add(future);
+        }
+        Thread.sleep(3000);
+        futures.get(0).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 0L);
+            return null;
+        });
+        futures.get(1).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 1L);
+            return null;
+        });
+        futures.get(2).exceptionally(ex -> {
+            long sequenceId = ((PulsarClientException) ex.getCause()).getSequenceId();
+            Assert.assertEquals(sequenceId, 2L);
+            return null;
+        });
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
     public void testSendCallBack() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 607c3db..16af009 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException;
  */
 @SuppressWarnings("serial")
 public class PulsarClientException extends IOException {
-
+    private long sequenceId = -1;
     /**
      * Constructs an {@code PulsarClientException} with the specified detail message.
      *
@@ -40,6 +40,20 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Constructs an {@code PulsarClientException} with the specified detail message.
+     *
+     * @param msg
+     *        The detail message (which is saved for later retrieval
+     *        by the {@link #getMessage()} method)
+     * @param sequenceId
+     *        The sequenceId of the message
+     */
+    public PulsarClientException(String msg, long sequenceId) {
+        super(msg);
+        this.sequenceId = sequenceId;
+    }
+
+    /**
      * Constructs an {@code PulsarClientException} with the specified cause.
      *
      * @param t
@@ -52,6 +66,21 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Constructs an {@code PulsarClientException} with the specified cause.
+     *
+     * @param t
+     *        The cause (which is saved for later retrieval by the
+     *        {@link #getCause()} method).  (A null value is permitted,
+     *        and indicates that the cause is nonexistent or unknown.)
+     * @param sequenceId
+     *        The sequenceId of the message
+     */
+    public PulsarClientException(Throwable t, long sequenceId) {
+        super(t);
+        this.sequenceId = sequenceId;
+    }
+
+    /**
      * Invalid Service URL exception thrown by Pulsar client.
      */
     public static class InvalidServiceURL extends PulsarClientException {
@@ -141,6 +170,20 @@ public class PulsarClientException extends IOException {
         }
 
         /**
+         * Constructs an {@code TimeoutException} with the specified cause.
+         *
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public TimeoutException(Throwable t, long sequenceId) {
+            super(t, sequenceId);
+        }
+
+        /**
          * Constructs an {@code TimeoutException} with the specified detail message.
          *
          * @param msg
@@ -150,6 +193,18 @@ public class PulsarClientException extends IOException {
         public TimeoutException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code TimeoutException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public TimeoutException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
+
     }
 
     /**
@@ -270,6 +325,19 @@ public class PulsarClientException extends IOException {
         public AlreadyClosedException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code AlreadyClosedException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public AlreadyClosedException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -286,6 +354,19 @@ public class PulsarClientException extends IOException {
         public TopicTerminatedException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code TopicTerminatedException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public TopicTerminatedException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -448,6 +529,10 @@ public class PulsarClientException extends IOException {
         public NotConnectedException() {
             super("Not connected to broker");
         }
+
+        public NotConnectedException(long sequenceId) {
+            super("Not connected to broker", sequenceId);
+        }
     }
 
     /**
@@ -464,6 +549,19 @@ public class PulsarClientException extends IOException {
         public InvalidMessageException(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code InvalidMessageException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public InvalidMessageException(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -512,6 +610,19 @@ public class PulsarClientException extends IOException {
         public ProducerQueueIsFullError(String msg) {
             super(msg);
         }
+
+        /**
+         * Constructs an {@code ProducerQueueIsFullError} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         * @param sequenceId
+         *        The sequenceId of the message
+         */
+        public ProducerQueueIsFullError(String msg, long sequenceId) {
+            super(msg, sequenceId);
+        }
     }
 
     /**
@@ -720,6 +831,14 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public long getSequenceId() {
+        return sequenceId;
+    }
+
+    public void setSequenceId(long sequenceId) {
+        this.sequenceId = sequenceId;
+    }
+
     public static boolean isRetriableError(Throwable t) {
         if (t instanceof AuthorizationException
                 || t instanceof InvalidServiceURL
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 6585842..f7768a4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -342,11 +342,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     public void sendAsync(Message<?> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
-        if (!isValidProducerState(callback)) {
+        if (!isValidProducerState(callback, message.getSequenceId())) {
             return;
         }
 
-        if (!canEnqueueRequest(callback)) {
+        if (!canEnqueueRequest(callback, message.getSequenceId())) {
             return;
         }
 
@@ -381,7 +381,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
             PulsarClientException.InvalidMessageException invalidMessageException =
                 new PulsarClientException.InvalidMessageException(
-                    format("The producer %s of the topic %s can not reuse the same message", producerName, topic));
+                    format("The producer %s of the topic %s can not reuse the same message", producerName, topic), msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, invalidMessageException);
             compressedPayload.release();
             return;
@@ -477,9 +477,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 }
             }
         } catch (PulsarClientException e) {
+            e.setSequenceId(msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, e);
         } catch (Throwable t) {
-            completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t));
+            completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId()));
         }
     }
 
@@ -492,7 +493,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
         if (!isMultiSchemaEnabled(true)) {
             PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
-                    format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic));
+                    format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)
+                    , msg.getSequenceId());
             completeCallbackAndReleaseSemaphore(callback, e);
             return false;
         }
@@ -626,7 +628,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private boolean isValidProducerState(SendCallback callback) {
+    private boolean isValidProducerState(SendCallback callback, long sequenceId) {
         switch (getState()) {
         case Ready:
             // OK
@@ -637,32 +639,32 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return true;
         case Closing:
         case Closed:
-            callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed"));
+            callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId));
             return false;
         case Terminated:
-            callback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated"));
+            callback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId));
             return false;
         case Failed:
         case Uninitialized:
         default:
-            callback.sendComplete(new PulsarClientException.NotConnectedException());
+            callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId));
             return false;
         }
     }
 
-    private boolean canEnqueueRequest(SendCallback callback) {
+    private boolean canEnqueueRequest(SendCallback callback, long sequenceId) {
         try {
             if (conf.isBlockIfQueueFull()) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
-                    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
+                    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
                     return false;
                 }
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            callback.sendComplete(new PulsarClientException(e));
+            callback.sendComplete(new PulsarClientException(e, sequenceId));
             return false;
         }
 
@@ -1352,7 +1354,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                     PulsarClientException te = new PulsarClientException.TimeoutException(
                         format("The producer %s can not send message to the topic %s within given timeout",
-                            producerName, topic));
+                            producerName, topic), firstMsg.sequenceId);
                     failPendingMessages(cnx(), te);
                     stats.incrementSendFailed(pendingMessages.size());
                     // Since the pending queue is cleared now, set timer to expire after configured value.
@@ -1380,6 +1382,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 try {
                     // Need to protect ourselves from any exception being thrown in the future handler from the
                     // application
+                    ex.setSequenceId(op.sequenceId);
                     op.callback.sendComplete(ex);
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
@@ -1529,13 +1532,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             Thread.currentThread().interrupt();
             releaseSemaphoreForSendOp(op);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(ie));
+                op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId));
             }
         } catch (Throwable t) {
             releaseSemaphoreForSendOp(op);
             log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(t));
+                op.callback.sendComplete(new PulsarClientException(t, op.sequenceId));
             }
         }
     }