You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 13:05:45 UTC

[pulsar] 01/02: Add more information in send timeout exception (#8931)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dd60e6e45969eb2cd0e6332dc5c5dfe1d9b542a4
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Dec 15 16:12:09 2020 -0800

    Add more information in send timeout exception (#8931)
    
    *Motivation*
    
    Currently the TimeoutException doesn't provide any useful information
    for troubleshooting. This change adds more information for troubleshooting.
    
    
    (cherry picked from commit c87557310615969114c7b4e9a257ae542d5bee07)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 57 +++++++++++++++++++---
 1 file changed, 50 insertions(+), 7 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3ab4207..cc76656 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -743,6 +744,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         private ByteBufPair cmd;
         private long sequenceId;
         private ClientCnx cnx;
+        private OpSendMsg op;
 
         static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
             WriteInEventLoopCallback c = RECYCLER.get();
@@ -750,6 +752,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             c.cnx = cnx;
             c.sequenceId = op.sequenceId;
             c.cmd = op.cmd;
+            c.op = op;
             return c;
         }
 
@@ -762,6 +765,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
             try {
                 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                op.updateSentTimestamp();
             } finally {
                 recycle();
             }
@@ -772,6 +776,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             cnx = null;
             cmd = null;
             sequenceId = -1;
+            op = null;
             recyclerHandle.recycle(this);
         }
 
@@ -830,7 +835,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     format("The producer %s of the topic %s was already closed when closing the producers",
                         producerName, topic));
                 pendingMessages.forEach(msg -> {
-                    msg.callback.sendComplete(ex);
+                    msg.sendComplete(ex);
                     msg.cmd.release();
                     msg.recycle();
                 });
@@ -954,7 +959,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                             // Need to protect ourselves from any exception being thrown in the future handler from the
                             // application
-                            op.callback.sendComplete(null);
+                            op.sendComplete(null);
                         } catch (Throwable t) {
                             log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
                                     producerName, sequenceId, t);
@@ -1011,7 +1016,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     pendingMessages.remove();
                     releaseSemaphoreForSendOp(op);
                     try {
-                        op.callback.sendComplete(
+                        op.sendComplete(
                             new PulsarClientException.ChecksumException(
                                 format("The checksum of the message which is produced by producer %s to the topic " +
                                     "%s is corrupted", producerName, topic)));
@@ -1045,7 +1050,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             pendingMessages.remove();
             releaseSemaphoreForSendOp(op);
             try {
-                op.callback.sendComplete(
+                op.sendComplete(
                         new PulsarClientException.NotAllowedException(
                                 format("The size of the message which is produced by producer %s to the topic " +
                                         "%s is not allowed", producerName, topic)));
@@ -1107,6 +1112,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         Runnable rePopulate;
         long sequenceId;
         long createdAt;
+        long firstSentAt;
+        long lastSentAt;
+        int retryCount;
         long batchSizeByte = 0;
         int numMessagesInBatch = 1;
         long highestSequenceId;
@@ -1145,6 +1153,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return op;
         }
 
+        void updateSentTimestamp() {
+            this.lastSentAt = System.nanoTime();
+            if (this.firstSentAt == -1L) {
+                this.firstSentAt = this.lastSentAt;
+            }
+            ++this.retryCount;
+        }
+
+        void sendComplete(final Exception e) {
+            SendCallback callback = this.callback;
+            if (null != callback) {
+                Exception finalEx = e;
+                if (finalEx != null && finalEx instanceof TimeoutException) {
+                    TimeoutException te = (TimeoutException) e;
+                    long sequenceId = te.getSequenceId();
+                    long ns = System.nanoTime();
+                    String errMsg = String.format(
+                        "%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s",
+                        te.getMessage(),
+                        ns - this.createdAt,
+                        ns - this.firstSentAt,
+                        ns - this.lastSentAt,
+                        retryCount
+                    );
+
+                    finalEx = new TimeoutException(errMsg, sequenceId);
+                }
+
+                callback.sendComplete(finalEx);
+            }
+        }
+
         void recycle() {
             msg = null;
             msgs = null;
@@ -1153,6 +1193,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             rePopulate = null;
             sequenceId = -1L;
             createdAt = -1L;
+            firstSentAt = -1L;
+            lastSentAt = -1L;
             highestSequenceId = -1L;
             totalChunks = 0;
             chunkId = -1;
@@ -1521,7 +1563,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
                         // Need to protect ourselves from any exception being thrown in the future handler from the
                         // application
-                        op.callback.sendComplete(ex);
+                        op.sendComplete(ex);
                     }
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
@@ -1645,13 +1687,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             Thread.currentThread().interrupt();
             releaseSemaphoreForSendOp(op);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId));
+                op.sendComplete(new PulsarClientException(ie, op.sequenceId));
             }
         } catch (Throwable t) {
             releaseSemaphoreForSendOp(op);
             log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(t, op.sequenceId));
+                op.sendComplete(new PulsarClientException(t, op.sequenceId));
             }
         }
     }
@@ -1694,6 +1736,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                           cnx.channel(), op.sequenceId);
             }
             cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
+            op.updateSentTimestamp();
             stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
         }
         cnx.ctx().flush();