You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/04/02 20:09:48 UTC
[10/50] [abbrv] hadoop git commit: HDFS-7888. Change DFSOutputStream
and DataStreamer for convenience of subclassing. Contributed by Li Bo
HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of subclassing. Contributed by Li Bo
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ed43f21
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ed43f21
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ed43f21
Branch: refs/heads/HDFS-7285
Commit: 9ed43f2189fb4674b7379e8e995d53d4970d5c3a
Parents: 867d5d2
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Apr 2 10:59:26 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Apr 2 10:59:26 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 116 ++++++++++---------
.../org/apache/hadoop/hdfs/DataStreamer.java | 13 +--
3 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 80d958d..0c66309 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -376,6 +376,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
(Walter Su via wang)
+ HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of
+ subclassing. (Li Bo via szetszwo)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index c88639d..f6733e3 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.net.Socket;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
@@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer
static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
CryptoProtocolVersion.supported();
- private final DFSClient dfsClient;
- private final ByteArrayManager byteArrayManager;
+ protected final DFSClient dfsClient;
+ protected final ByteArrayManager byteArrayManager;
// closed is accessed by different threads under different locks.
- private volatile boolean closed = false;
-
- private final String src;
- private final long fileId;
- private final long blockSize;
- private final int bytesPerChecksum;
-
- private DFSPacket currentPacket = null;
- private DataStreamer streamer;
- private int packetSize = 0; // write packet size, not including the header.
- private int chunksPerPacket = 0;
- private long lastFlushOffset = 0; // offset when flush was invoked
+ protected volatile boolean closed = false;
+
+ protected final String src;
+ protected final long fileId;
+ protected final long blockSize;
+ protected final int bytesPerChecksum;
+
+ protected DFSPacket currentPacket = null;
+ protected DataStreamer streamer;
+ protected int packetSize = 0; // write packet size, not including the header.
+ protected int chunksPerPacket = 0;
+ protected long lastFlushOffset = 0; // offset when flush was invoked
private long initialFileSize = 0; // at time of file open
private final short blockReplication; // replication factor of file
- private boolean shouldSyncBlock = false; // force blocks to disk upon close
- private final AtomicReference<CachingStrategy> cachingStrategy;
+ protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+ protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo;
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
- private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
final byte[] buf;
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
@@ -206,7 +205,7 @@ public class DFSOutputStream extends FSOutputSummer
}
/** Construct a new output stream for creating a file. */
- private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
@@ -359,7 +358,7 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private void computePacketChunkSize(int psize, int csize) {
+ protected void computePacketChunkSize(int psize, int csize) {
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
@@ -426,33 +425,46 @@ public class DFSOutputStream extends FSOutputSummer
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
- // If the reopened file did not end at chunk boundary and the above
- // write filled up its partial chunk. Tell the summer to generate full
- // crc chunks from now on.
- if (streamer.getAppendChunk() &&
- streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
- streamer.setAppendChunk(false);
- resetChecksumBufSize();
- }
+ adjustChunkBoundary();
- if (!streamer.getAppendChunk()) {
- int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()),
- dfsClient.getConf().writePacketSize);
- computePacketChunkSize(psize, bytesPerChecksum);
- }
- //
- // if encountering a block boundary, send an empty packet to
- // indicate the end of block and reset bytesCurBlock.
- //
- if (streamer.getBytesCurBlock() == blockSize) {
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
- streamer.waitAndQueuePacket(currentPacket);
- currentPacket = null;
- streamer.setBytesCurBlock(0);
- lastFlushOffset = 0;
- }
+ endBlock();
+ }
+ }
+
+ /**
+ * If the reopened file did not end at chunk boundary and the above
+ * write filled up its partial chunk. Tell the summer to generate full
+ * crc chunks from now on.
+ */
+ protected void adjustChunkBoundary() {
+ if (streamer.getAppendChunk() &&
+ streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
+ streamer.setAppendChunk(false);
+ resetChecksumBufSize();
+ }
+
+ if (!streamer.getAppendChunk()) {
+ int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
+ dfsClient.getConf().writePacketSize);
+ computePacketChunkSize(psize, bytesPerChecksum);
+ }
+ }
+
+ /**
+ * if encountering a block boundary, send an empty packet to
+ * indicate the end of block and reset bytesCurBlock.
+ *
+ * @throws IOException
+ */
+ protected void endBlock() throws IOException {
+ if (streamer.getBytesCurBlock() == blockSize) {
+ currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ streamer.setBytesCurBlock(0);
+ lastFlushOffset = 0;
}
}
@@ -676,7 +688,7 @@ public class DFSOutputStream extends FSOutputSummer
* Waits till all existing data is flushed and confirmations
* received from datanodes.
*/
- private void flushInternal() throws IOException {
+ protected void flushInternal() throws IOException {
long toWaitFor;
synchronized (this) {
dfsClient.checkOpen();
@@ -692,7 +704,7 @@ public class DFSOutputStream extends FSOutputSummer
streamer.waitForAckedSeqno(toWaitFor);
}
- private synchronized void start() {
+ protected synchronized void start() {
streamer.start();
}
@@ -721,7 +733,7 @@ public class DFSOutputStream extends FSOutputSummer
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
- private void closeThreads(boolean force) throws IOException {
+ protected void closeThreads(boolean force) throws IOException {
try {
streamer.close(force);
streamer.join();
@@ -749,7 +761,7 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private synchronized void closeImpl() throws IOException {
+ protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
IOException e = streamer.getLastException().getAndSet(null);
if (e == null)
@@ -761,7 +773,7 @@ public class DFSOutputStream extends FSOutputSummer
try {
flushBuffer(); // flush from all upper layers
- if (currentPacket != null) {
+ if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
@@ -792,7 +804,7 @@ public class DFSOutputStream extends FSOutputSummer
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
- private void completeFile(ExtendedBlock last) throws IOException {
+ protected void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.monotonicNow();
long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6ff4c24..6bcbfde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1519,7 +1519,7 @@ class DataStreamer extends Daemon {
}
}
- private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = dfsClient.getConf().
@@ -1729,15 +1729,6 @@ class DataStreamer extends Daemon {
}
/**
- * get the socket connecting to the first datanode in pipeline
- *
- * @return socket connecting to the first datanode in pipeline
- */
- Socket getSocket() {
- return s;
- }
-
- /**
* set socket to null
*/
void setSocketToNull() {
@@ -1814,4 +1805,4 @@ class DataStreamer extends Daemon {
s.close();
}
}
-}
\ No newline at end of file
+}