You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/16 00:12:33 UTC
[pulsar] branch master updated: Add more information in send
timeout exception (#8931)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c875573 Add more information in send timeout exception (#8931)
c875573 is described below
commit c87557310615969114c7b4e9a257ae542d5bee07
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Dec 15 16:12:09 2020 -0800
Add more information in send timeout exception (#8931)
*Motivation*
Currently the TimeoutException doesn't provide any useful information
for troubleshooting. This change adds more information for troubleshooting.
---
.../apache/pulsar/client/impl/ProducerImpl.java | 57 +++++++++++++++++++---
1 file changed, 50 insertions(+), 7 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 9ae4168..f634745 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -749,6 +750,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private ByteBufPair cmd;
private long sequenceId;
private ClientCnx cnx;
+ private OpSendMsg op;
static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
WriteInEventLoopCallback c = RECYCLER.get();
@@ -756,6 +758,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
c.cnx = cnx;
c.sequenceId = op.sequenceId;
c.cmd = op.cmd;
+ c.op = op;
return c;
}
@@ -768,6 +771,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
try {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ op.updateSentTimestamp();
} finally {
recycle();
}
@@ -778,6 +782,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
cnx = null;
cmd = null;
sequenceId = -1;
+ op = null;
recyclerHandle.recycle(this);
}
@@ -836,7 +841,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
format("The producer %s of the topic %s was already closed when closing the producers",
producerName, topic));
pendingMessages.forEach(msg -> {
- msg.callback.sendComplete(ex);
+ msg.sendComplete(ex);
msg.cmd.release();
msg.recycle();
});
@@ -960,7 +965,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
- op.callback.sendComplete(null);
+ op.sendComplete(null);
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
producerName, sequenceId, t);
@@ -1017,7 +1022,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
try {
- op.callback.sendComplete(
+ op.sendComplete(
new PulsarClientException.ChecksumException(
format("The checksum of the message which is produced by producer %s to the topic " +
"%s is corrupted", producerName, topic)));
@@ -1051,7 +1056,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
try {
- op.callback.sendComplete(
+ op.sendComplete(
new PulsarClientException.NotAllowedException(
format("The size of the message which is produced by producer %s to the topic " +
"%s is not allowed", producerName, topic)));
@@ -1113,6 +1118,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
Runnable rePopulate;
long sequenceId;
long createdAt;
+ long firstSentAt;
+ long lastSentAt;
+ int retryCount;
long batchSizeByte = 0;
int numMessagesInBatch = 1;
long highestSequenceId;
@@ -1151,6 +1159,38 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return op;
}
+ void updateSentTimestamp() {
+ this.lastSentAt = System.nanoTime();
+ if (this.firstSentAt == -1L) {
+ this.firstSentAt = this.lastSentAt;
+ }
+ ++this.retryCount;
+ }
+
+ void sendComplete(final Exception e) {
+ SendCallback callback = this.callback;
+ if (null != callback) {
+ Exception finalEx = e;
+ if (finalEx != null && finalEx instanceof TimeoutException) {
+ TimeoutException te = (TimeoutException) e;
+ long sequenceId = te.getSequenceId();
+ long ns = System.nanoTime();
+ String errMsg = String.format(
+ "%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s",
+ te.getMessage(),
+ ns - this.createdAt,
+ ns - this.firstSentAt,
+ ns - this.lastSentAt,
+ retryCount
+ );
+
+ finalEx = new TimeoutException(errMsg, sequenceId);
+ }
+
+ callback.sendComplete(finalEx);
+ }
+ }
+
void recycle() {
msg = null;
msgs = null;
@@ -1159,6 +1199,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
rePopulate = null;
sequenceId = -1L;
createdAt = -1L;
+ firstSentAt = -1L;
+ lastSentAt = -1L;
highestSequenceId = -1L;
totalChunks = 0;
chunkId = -1;
@@ -1538,7 +1580,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
- op.callback.sendComplete(ex);
+ op.sendComplete(ex);
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
@@ -1662,13 +1704,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
Thread.currentThread().interrupt();
releaseSemaphoreForSendOp(op);
if (op != null) {
- op.callback.sendComplete(new PulsarClientException(ie, op.sequenceId));
+ op.sendComplete(new PulsarClientException(ie, op.sequenceId));
}
} catch (Throwable t) {
releaseSemaphoreForSendOp(op);
log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
if (op != null) {
- op.callback.sendComplete(new PulsarClientException(t, op.sequenceId));
+ op.sendComplete(new PulsarClientException(t, op.sequenceId));
}
}
}
@@ -1711,6 +1753,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
cnx.channel(), op.sequenceId);
}
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
+ op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
cnx.ctx().flush();