You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/03/08 20:06:41 UTC
[40/50] [abbrv] hadoop git commit: HDFS-9812. Streamer threads leak
if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun.
HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/352d299c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/352d299c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/352d299c
Branch: refs/heads/yarn-2877
Commit: 352d299cf8ebe330d24117df98d1e6a64ae38c26
Parents: 391da36
Author: Akira Ajisaka <aa...@apache.org>
Authored: Tue Mar 8 10:43:17 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Tue Mar 8 10:43:17 2016 +0900
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/hdfs/DFSOutputStream.java | 9 +++++++--
.../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 8 ++++----
2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 1c58b28..dc88e08 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -770,14 +770,19 @@ public class DFSOutputStream extends FSOutputSummer
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
- closeThreads(false);
+
try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
completeFile(lastBlock);
}
} catch (ClosedChannelException ignored) {
} finally {
- setClosed();
+ // Failures may happen when flushing data.
+ // Streamers may keep waiting for the new block information.
+ // Thus need to force closing these threads.
+ // Don't need to call setClosed() because closeThreads(true)
+ // calls setClosed() in the finally block.
+ closeThreads(true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 9d3cb55..9ae443d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -507,7 +507,7 @@ class DataStreamer extends Daemon {
}
protected void endBlock() {
- LOG.debug("Closing old block " + block);
+ LOG.debug("Closing old block {}", block);
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
@@ -591,7 +591,7 @@ class DataStreamer extends Daemon {
LOG.debug("stage=" + stage + ", " + this);
}
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- LOG.debug("Allocating new block: " + this);
+ LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
@@ -644,7 +644,7 @@ class DataStreamer extends Daemon {
}
}
- LOG.debug(this + " sending " + one);
+ LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
@@ -1766,7 +1766,7 @@ class DataStreamer extends Daemon {
packet.addTraceParent(Tracer.getCurrentSpanId());
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
- LOG.debug("Queued " + packet + ", " + this);
+ LOG.debug("Queued {}, {}", packet, this);
dataQueue.notifyAll();
}
}