You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2021/05/01 18:06:04 UTC
[hadoop] branch trunk updated: HDFS-15865. Interrupt DataStreamer
thread if no ack (#2728)
This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd3da73 HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)
bd3da73 is described below
commit bd3da73a0ff75231340b1168f7805164710bf4fe
Author: Karthik Palanisamy <kp...@cloudera.com>
AuthorDate: Sat May 1 11:05:31 2021 -0700
HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)
---
.../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 96c86c3..e04268e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -897,6 +897,8 @@ class DataStreamer extends Daemon {
try (TraceScope ignored = dfsClient.getTracer().
newScope("waitForAckedSeqno")) {
LOG.debug("{} waiting for ack for: {}", this, seqno);
+ int dnodes = nodes != null ? nodes.length : 3;
+ int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow();
try {
synchronized (dataQueue) {
@@ -907,6 +909,16 @@ class DataStreamer extends Daemon {
}
try {
dataQueue.wait(1000); // when we receive an ack, we notify on
+ long duration = Time.monotonicNow() - begin;
+ if (duration > writeTimeout) {
+ LOG.error("No ack received, took {}ms (threshold={}ms). "
+ + "File being written: {}, block: {}, "
+ + "Write pipeline datanodes: {}.",
+ duration, writeTimeout, src, block, nodes);
+ throw new InterruptedIOException("No ack received after " +
+ duration / 1000 + "s and a timeout of " +
+ writeTimeout / 1000 + "s");
+ }
// dataQueue
} catch (InterruptedException ie) {
throw new InterruptedIOException(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org