You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/08/26 06:00:27 UTC
svn commit: r1377372 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Author: suresh
Date: Sun Aug 26 04:00:26 2012
New Revision: 1377372
URL: http://svn.apache.org/viewvc?rev=1377372&view=rev
Log:
HDFS-3851. DFSOutputStream class code cleanup. Contributed by Jing Zhao.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1377372&r1=1377371&r2=1377372&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Aug 26 04:00:26 2012
@@ -129,6 +129,8 @@ Trunk (unreleased changes)
HDFS-3844. Add @Override and remove {@inheritdoc} and unnecessary
imports. (Jing Zhao via suresh)
+ HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1377372&r1=1377371&r2=1377372&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Sun Aug 26 04:00:26 2012
@@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -107,8 +107,8 @@ import com.google.common.annotations.Vis
****************************************************************/
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer implements Syncable {
- private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+ private final DFSClient dfsClient;
private Socket s;
// closed is accessed by different threads under different locks.
private volatile boolean closed = false;
@@ -138,15 +138,15 @@ public class DFSOutputStream extends FSO
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
- private class Packet {
- long seqno; // sequencenumber of buffer in block
- long offsetInBlock; // offset in block
- private boolean lastPacketInBlock; // is this the last packet in block?
- boolean syncBlock; // this packet forces the current block to disk
- int numChunks; // number of chunks currently in packet
- int maxChunks; // max chunks in packet
-
+ private static class Packet {
+ private static final long HEART_BEAT_SEQNO = -1L;
+ long seqno; // sequencenumber of buffer in block
+ final long offsetInBlock; // offset in block
+ boolean syncBlock; // this packet forces the current block to disk
+ int numChunks; // number of chunks currently in packet
+ final int maxChunks; // max chunks in packet
byte[] buf;
+ private boolean lastPacketInBlock; // is this the last packet in block?
/**
* buf is pointed into like follows:
@@ -164,45 +164,36 @@ public class DFSOutputStream extends FSO
*/
int checksumStart;
int checksumPos;
- int dataStart;
+ final int dataStart;
int dataPos;
- private static final long HEART_BEAT_SEQNO = -1L;
-
/**
* Create a heartbeat packet.
*/
- Packet() {
- this.lastPacketInBlock = false;
- this.numChunks = 0;
- this.offsetInBlock = 0;
- this.seqno = HEART_BEAT_SEQNO;
-
- buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-
- checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
- maxChunks = 0;
+ Packet(int checksumSize) {
+ this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
}
/**
* Create a new packet.
*
- * @param pktSize maximum size of the packet, including checksum data and actual data.
+ * @param pktSize maximum size of the packet,
+ * including checksum data and actual data.
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
*/
- Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+ Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
+ long seqno, int checksumSize) {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
- this.seqno = currentSeqno;
- currentSeqno++;
+ this.seqno = seqno;
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
- dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
+ dataStart = checksumStart + (chunksPerPkt * checksumSize);
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
@@ -412,6 +403,7 @@ public class DFSOutputStream extends FSO
response.join();
response = null;
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
}
@@ -439,6 +431,7 @@ public class DFSOutputStream extends FSO
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.now();
@@ -448,7 +441,7 @@ public class DFSOutputStream extends FSO
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
- one = new Packet(); // heartbeat packet
+ one = new Packet(checksum.getChecksumSize()); // heartbeat packet
} else {
one = dataQueue.getFirst(); // regular data packet
}
@@ -488,6 +481,7 @@ public class DFSOutputStream extends FSO
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
}
}
@@ -518,7 +512,7 @@ public class DFSOutputStream extends FSO
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
- // write to primary DN
+ // write to primary DN
errorIndex = 0;
throw e;
}
@@ -607,6 +601,7 @@ public class DFSOutputStream extends FSO
response.close();
response.join();
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
} finally {
response = null;
}
@@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSO
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
}
}
} else {
@@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSO
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
@@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSO
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(0, 0, bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock,
+ currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSO
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
}
} else {
// We already flushed up to this offset.
@@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSO
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
@@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSO
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(0, 0, bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock,
+ currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSO
DFSClient.LOG.info("Could not complete file " + src + " retrying...");
}
} catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
}
}
}