You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 11:59:11 UTC

[pulsar] branch branch-2.6 updated: Add more information in send timeout exception (#8931)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 8bce823  Add more information in send timeout exception (#8931)
8bce823 is described below

commit 8bce82321409f37f948e1b89447e332bc67e9c51
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Dec 15 16:12:09 2020 -0800

    Add more information in send timeout exception (#8931)
    
    *Motivation*
    
    Currently the TimeoutException doesn't provide any useful information
    for troubleshooting. This change adds more information for troubleshooting.
    
    (cherry picked from commit c87557310615969114c7b4e9a257ae542d5bee07)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 55 +++++++++++++++++++---
 1 file changed, 49 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 65e605c..2367665 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -729,6 +730,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         private ByteBufPair cmd;
         private long sequenceId;
         private ClientCnx cnx;
+        private OpSendMsg op;
 
         static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
             WriteInEventLoopCallback c = RECYCLER.get();
@@ -736,6 +738,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             c.cnx = cnx;
             c.sequenceId = op.sequenceId;
             c.cmd = op.cmd;
+            c.op = op;
             return c;
         }
 
@@ -748,6 +751,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
             try {
                 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                op.updateSentTimestamp();
             } finally {
                 recycle();
             }
@@ -758,6 +762,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             cnx = null;
             cmd = null;
             sequenceId = -1;
+            op = null;
             recyclerHandle.recycle(this);
         }
 
@@ -816,7 +821,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     format("The producer %s of the topic %s was already closed when closing the producers",
                         producerName, topic));
                 pendingMessages.forEach(msg -> {
-                    msg.callback.sendComplete(ex);
+                    msg.sendComplete(ex);
                     msg.cmd.release();
                     msg.recycle();
                 });
@@ -940,7 +945,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                             // Need to protect ourselves from any exception being thrown in the future handler from the
                             // application
-                            op.callback.sendComplete(null);
+                            op.sendComplete(null);
                         } catch (Throwable t) {
                             log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
                                     producerName, sequenceId, t);
@@ -997,7 +1002,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     pendingMessages.remove();
                     releaseSemaphoreForSendOp(op);
                     try {
-                        op.callback.sendComplete(
+                        op.sendComplete(
                             new PulsarClientException.ChecksumException(
                                 format("The checksum of the message which is produced by producer %s to the topic " +
                                     "%s is corrupted", producerName, topic)));
@@ -1074,6 +1079,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         Runnable rePopulate;
         long sequenceId;
         long createdAt;
+        long firstSentAt;
+        long lastSentAt;
+        int retryCount;
         long batchSizeByte = 0;
         int numMessagesInBatch = 1;
         long highestSequenceId;
@@ -1112,6 +1120,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return op;
         }
 
+        void updateSentTimestamp() {
+            this.lastSentAt = System.nanoTime();
+            if (this.firstSentAt == -1L) {
+                this.firstSentAt = this.lastSentAt;
+            }
+            ++this.retryCount;
+        }
+
+        void sendComplete(final Exception e) {
+            SendCallback callback = this.callback;
+            if (null != callback) {
+                Exception finalEx = e;
+                if (finalEx != null && finalEx instanceof TimeoutException) {
+                    TimeoutException te = (TimeoutException) e;
+                    long sequenceId = te.getSequenceId();
+                    long ns = System.nanoTime();
+                    String errMsg = String.format(
+                        "%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s",
+                        te.getMessage(),
+                        ns - this.createdAt,
+                        ns - this.firstSentAt,
+                        ns - this.lastSentAt,
+                        retryCount
+                    );
+
+                    finalEx = new TimeoutException(errMsg, sequenceId);
+                }
+
+                callback.sendComplete(finalEx);
+            }
+        }
+
         void recycle() {
             msg = null;
             msgs = null;
@@ -1120,6 +1160,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             rePopulate = null;
             sequenceId = -1L;
             createdAt = -1L;
+            firstSentAt = -1L;
+            lastSentAt = -1L;
             highestSequenceId = -1L;
             totalChunks = 0;
             chunkId = -1;
@@ -1472,7 +1514,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
                         // Need to protect ourselves from any exception being thrown in the future handler from the
                         // application
-                        op.callback.sendComplete(ex);
+                        op.sendComplete(ex);
                     }
                 } catch (Throwable t) {
                     log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
@@ -1622,13 +1664,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             Thread.currentThread().interrupt();
             releaseSemaphoreForSendOp(op);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId));
+                op.sendComplete(new PulsarClientException(ie, op.sequenceId));
             }
         } catch (Throwable t) {
             releaseSemaphoreForSendOp(op);
             log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
             if (op != null) {
-                op.callback.sendComplete(new PulsarClientException(t, op.sequenceId));
+                op.sendComplete(new PulsarClientException(t, op.sequenceId));
             }
         }
     }
@@ -1671,6 +1713,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                           cnx.channel(), op.sequenceId);
             }
             cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
+            op.updateSentTimestamp();
             stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
         }
         cnx.ctx().flush();