You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/09 18:45:14 UTC

[14/34] hadoop git commit: HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun.

HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/352d299c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/352d299c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/352d299c

Branch: refs/heads/HDFS-1312
Commit: 352d299cf8ebe330d24117df98d1e6a64ae38c26
Parents: 391da36
Author: Akira Ajisaka <aa...@apache.org>
Authored: Tue Mar 8 10:43:17 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Tue Mar 8 10:43:17 2016 +0900

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/hdfs/DFSOutputStream.java   | 9 +++++++--
 .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java  | 8 ++++----
 2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 1c58b28..dc88e08 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -770,14 +770,19 @@ public class DFSOutputStream extends FSOutputSummer
       flushInternal();             // flush all data to Datanodes
       // get last block before destroying the streamer
       ExtendedBlock lastBlock = getStreamer().getBlock();
-      closeThreads(false);
+
       try (TraceScope ignored =
                dfsClient.getTracer().newScope("completeFile")) {
         completeFile(lastBlock);
       }
     } catch (ClosedChannelException ignored) {
     } finally {
-      setClosed();
+      // Failures may happen when flushing data.
+      // Streamers may keep waiting for the new block information.
+      // Thus need to force closing these threads.
+      // Don't need to call setClosed() because closeThreads(true)
+      // calls setClosed() in the finally block.
+      closeThreads(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 9d3cb55..9ae443d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -507,7 +507,7 @@ class DataStreamer extends Daemon {
   }
 
   protected void endBlock() {
-    LOG.debug("Closing old block " + block);
+    LOG.debug("Closing old block {}", block);
     this.setName("DataStreamer for file " + src);
     closeResponder();
     closeStream();
@@ -591,7 +591,7 @@ class DataStreamer extends Daemon {
           LOG.debug("stage=" + stage + ", " + this);
         }
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-          LOG.debug("Allocating new block: " + this);
+          LOG.debug("Allocating new block: {}", this);
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
@@ -644,7 +644,7 @@ class DataStreamer extends Daemon {
           }
         }
 
-        LOG.debug(this + " sending " + one);
+        LOG.debug("{} sending {}", this, one);
 
         // write out data to remote datanode
         try (TraceScope ignored = dfsClient.getTracer().
@@ -1766,7 +1766,7 @@ class DataStreamer extends Daemon {
       packet.addTraceParent(Tracer.getCurrentSpanId());
       dataQueue.addLast(packet);
       lastQueuedSeqno = packet.getSeqno();
-      LOG.debug("Queued " + packet + ", " + this);
+      LOG.debug("Queued {}, {}", packet, this);
       dataQueue.notifyAll();
     }
   }