You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2021/05/01 18:06:04 UTC

[hadoop] branch trunk updated: HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)

This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd3da73  HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)
bd3da73 is described below

commit bd3da73a0ff75231340b1168f7805164710bf4fe
Author: Karthik Palanisamy <kp...@cloudera.com>
AuthorDate: Sat May 1 11:05:31 2021 -0700

    HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)
---
 .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 96c86c3..e04268e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -897,6 +897,8 @@ class DataStreamer extends Daemon {
     try (TraceScope ignored = dfsClient.getTracer().
         newScope("waitForAckedSeqno")) {
       LOG.debug("{} waiting for ack for: {}", this, seqno);
+      int dnodes = nodes != null ? nodes.length : 3;
+      int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
       long begin = Time.monotonicNow();
       try {
         synchronized (dataQueue) {
@@ -907,6 +909,16 @@ class DataStreamer extends Daemon {
             }
             try {
               dataQueue.wait(1000); // when we receive an ack, we notify on
+              long duration = Time.monotonicNow() - begin;
+              if (duration > writeTimeout) {
+                LOG.error("No ack received, took {}ms (threshold={}ms). "
+                    + "File being written: {}, block: {}, "
+                    + "Write pipeline datanodes: {}.",
+                    duration, writeTimeout, src, block, nodes);
+                throw new InterruptedIOException("No ack received after " +
+                    duration / 1000 + "s and a timeout of " +
+                    writeTimeout / 1000 + "s");
+              }
               // dataQueue
             } catch (InterruptedException ie) {
               throw new InterruptedIOException(

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org