You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/06/25 03:07:42 UTC
[09/50] [abbrv] hadoop git commit: HDFS-8605. Merge Refactor of
DFSOutputStream from HDFS-7285 branch. (vinayakumarb)
HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c13519e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c13519e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c13519e
Branch: refs/heads/HDFS-7966
Commit: 1c13519e1e7588c3e2974138d37bf3449ca8b3df
Parents: 2ad6687
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Jun 18 08:48:09 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Jun 18 08:48:09 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 ++++++++++----------
.../org/apache/hadoop/hdfs/DataStreamer.java | 7 ++-
3 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2545bcf..a61cf78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -656,6 +656,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6249. Output AclEntry in PBImageXmlWriter.
(surendra singh lilhore via aajisaka)
+ HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch.
+ (vinayakumarb via wang)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 4622be6..c16aef2 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -86,6 +88,7 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind {
+ static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
/**
* Number of times to retry creating a file when there are transient
* errors (typically related to encryption zones and KeyProvider operations).
@@ -413,21 +416,30 @@ public class DFSOutputStream extends FSOutputSummer
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
- currentPacket.getSeqno() +
- ", src=" + src +
- ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
- ", blockSize=" + blockSize +
- ", appendChunk=" + getStreamer().getAppendChunk());
- }
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacketFull();
+ }
+ }
- adjustChunkBoundary();
+ void enqueueCurrentPacket() throws IOException {
+ getStreamer().waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
- endBlock();
- }
+ void enqueueCurrentPacketFull() throws IOException {
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ + " appendChunk={}, {}", currentPacket, src, getStreamer()
+ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+ getStreamer());
+ enqueueCurrentPacket();
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ /** create an empty packet to mark the end of the block. */
+ void setCurrentPacketToEmpty() throws InterruptedIOException {
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
}
/**
@@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
- getStreamer().getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ setCurrentPacketToEmpty();
+ enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
@@ -586,8 +595,7 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket != null) {
currentPacket.setSyncBlock(isSync);
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
}
if (endBlock && getStreamer().getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to
@@ -595,8 +603,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
} else {
@@ -775,15 +782,11 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
+ enqueueCurrentPacket();
}
if (getStreamer().getBytesCurBlock() != 0) {
- // send an empty packet to mark the end of the block
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
- getStreamer().getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
+ setCurrentPacketToEmpty();
}
flushInternal(); // flush all data to Datanodes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index cecd5a0..8dd85b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -44,7 +44,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -1901,4 +1900,10 @@ class DataStreamer extends Daemon {
s.close();
}
}
+
+ @Override
+ public String toString() {
+ return (block == null? null: block.getLocalBlock())
+ + "@" + Arrays.toString(getNodes());
+ }
}