You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/15 07:27:20 UTC
svn commit: r890664 - in /hadoop/hdfs/branches/branch-0.21: ./
.eclipse.templates/.launches/ src/contrib/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server...
Author: hairong
Date: Tue Dec 15 06:27:19 2009
New Revision: 890664
URL: http://svn.apache.org/viewvc?rev=890664&view=rev
Log:
Merge -r 890655 to move the change of HDFS-724 from trunk to branch 0.21.
Added:
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
- copied unchanged from r890655, hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
Modified:
hadoop/hdfs/branches/branch-0.21/ (props changed)
hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/ (props changed)
hadoop/hdfs/branches/branch-0.21/CHANGES.txt (contents, props changed)
hadoop/hdfs/branches/branch-0.21/build.xml (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/build.xml (props changed)
hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/java/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/branch-0.21/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
/hadoop/hdfs/branches/HDFS-265:796829-820463
-/hadoop/hdfs/trunk:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/.eclipse.templates/.launches/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/.eclipse.templates/.launches:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/.eclipse.templates/.launches:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Modified: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/CHANGES.txt?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/hdfs/branches/branch-0.21/CHANGES.txt Tue Dec 15 06:27:19 2009
@@ -525,6 +525,9 @@
HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
name-node is in safemode. (Ravi Phulari via shv)
+ HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+ (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
Propchange: hadoop/hdfs/branches/branch-0.21/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs/CHANGES.txt:713112
/hadoop/hdfs/branches/HDFS-265/CHANGES.txt:796829-820463
-/hadoop/hdfs/trunk/CHANGES.txt:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/CHANGES.txt:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
/hadoop/core/trunk/build.xml:779102
/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
-/hadoop/hdfs/trunk/build.xml:817853-817863,818294-818298,818801,824552,824944,825229,826149,828116,828926,829258,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/build.xml:817853-817863,818294-818298,818801,824552,824944,825229,826149,828116,828926,829258,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/build.xml:713112
/hadoop/hdfs/branches/HDFS-265/src/contrib/build.xml:796829-820463
-/hadoop/hdfs/trunk/src/contrib/build.xml:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/contrib/build.xml:817853-817863,818294-818298,818801,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
-/hadoop/hdfs/trunk/src/java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec 15 06:27:19 2009
@@ -2434,7 +2434,27 @@
int dataPos;
int checksumStart;
int checksumPos;
-
+ private static final long HEART_BEAT_SEQNO = -1L;
+
+ /**
+ * create a heartbeat packet
+ */
+ Packet() {
+ this.lastPacketInBlock = false;
+ this.numChunks = 0;
+ this.offsetInBlock = 0;
+ this.seqno = HEART_BEAT_SEQNO;
+
+ buffer = null;
+ int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ buf = new byte[packetSize];
+
+ checksumStart = dataStart = packetSize;
+ checksumPos = checksumStart;
+ dataPos = dataStart;
+ maxChunks = 0;
+ }
+
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
@@ -2521,6 +2541,14 @@
return offsetInBlock + dataPos - dataStart;
}
+ /**
+ * Check if this packet is a heart beat packet
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ private boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
public String toString() {
return "packet seqno:" + this.seqno +
" offsetInBlock:" + this.offsetInBlock +
@@ -2638,6 +2666,7 @@
* and closes them. Any error recovery is also done by this thread.
*/
public void run() {
+ long lastPacket = System.currentTimeMillis();
while (!streamerClosed && clientRunning) {
// if the Responder encountered an error, shutdown Responder
@@ -2661,19 +2690,32 @@
synchronized (dataQueue) {
// wait for a packet to be sent.
+ long now = System.currentTimeMillis();
while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
+ && dataQueue.size() == 0 &&
+ (stage != BlockConstructionStage.DATA_STREAMING ||
+ stage == BlockConstructionStage.DATA_STREAMING &&
+ now - lastPacket < socketTimeout/2)) || doSleep ) {
+ long timeout = socketTimeout/2 - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+ timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+ timeout : 1000;
try {
- dataQueue.wait(1000);
+ dataQueue.wait(timeout);
} catch (InterruptedException e) {
}
doSleep = false;
+ now = System.currentTimeMillis();
}
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ if (streamerClosed || hasError || !clientRunning) {
continue;
}
// get packet to be sent.
- one = dataQueue.getFirst();
+ if (dataQueue.isEmpty()) {
+ one = new Packet(); // heartbeat packet
+ } else {
+ one = dataQueue.getFirst(); // regular data packet
+ }
}
// get new block from namenode.
@@ -2719,9 +2761,11 @@
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
+ if (!one.isHeartbeatPacket()) {
+ dataQueue.removeFirst();
+ ackQueue.addLast(one);
+ dataQueue.notifyAll();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -2732,6 +2776,10 @@
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
+ lastPacket = System.currentTimeMillis();
+
+ if (one.isHeartbeatPacket()) { //heartbeat packet
+ }
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2868,24 +2916,7 @@
}
long seqno = ack.getSeqno();
- Packet one = null;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- continue;
- } else if (seqno == -2) {
- // no nothing
- } else {
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
- one.seqno + " but received " + seqno);
- }
- isLastPacketInBlock = one.lastPacketInBlock;
- }
-
- // processes response status from all datanodes.
+ // processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
@@ -2896,12 +2927,24 @@
targets[i].getName());
}
}
+
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unkown seqno should be a failed ack: " + ack;
+ if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
+ continue;
+ }
- if (one == null) {
- throw new IOException("Panic: responder did not receive " +
- "an ack for a packet: " + seqno);
+ // a success ack for a data packet
+ Packet one = null;
+ synchronized (dataQueue) {
+ one = ackQueue.getFirst();
}
-
+ if (one.seqno != seqno) {
+ throw new IOException("Responseprocessor: Expecting seqno " +
+ " for block " + block +
+ one.seqno + " but received " + seqno);
+ }
+ isLastPacketInBlock = one.lastPacketInBlock;
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Dec 15 06:27:19 2009
@@ -458,7 +458,7 @@
public static class PipelineAck implements Writable {
private long seqno;
private Status replies[];
- final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+ public final static long UNKOWN_SEQNO = -2;
/** default constructor **/
public PipelineAck() {
@@ -495,6 +495,10 @@
* @return the the ith reply
*/
public Status getReply(int i) {
+ if (i<0 || i>=replies.length) {
+ throw new IllegalArgumentException("The input parameter " + i +
+ " should in the range of [0, " + replies.length);
+ }
return replies[i];
}
Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,5 +1,5 @@
/hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:713112
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:776175-785643,785929-786278
/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:817353-818319,818321-818553
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/RecoveryInProgressException.java:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964,818294-818298
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 15 06:27:19 2009
@@ -560,7 +560,7 @@
throttler.throttle(len);
}
- return len;
+ return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -589,9 +589,9 @@
}
/*
- * Receive until packet has zero bytes of data.
+ * Receive until the last packet.
*/
- while (receivePacket() > 0) {}
+ while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
@@ -775,118 +775,11 @@
notifyAll();
}
- private synchronized void lastDataNodeRun() {
- long lastHeartbeat = System.currentTimeMillis();
- boolean lastPacket = false;
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
- while (running && datanode.shouldRun && !lastPacket) {
- long now = System.currentTimeMillis();
- try {
-
- // wait for a packet to be sent to downstream datanode
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
- long idle = now - lastHeartbeat;
- long timeout = (datanode.socketTimeout/2) - idle;
- if (timeout <= 0) {
- timeout = 1000;
- }
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- if (running) {
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " Interrupted.");
- running = false;
- }
- break;
- }
-
- // send a heartbeat if it is time.
- now = System.currentTimeMillis();
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
- PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
- replyOut.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " sent a heartbeat");
- }
- lastHeartbeat = now;
- }
- }
-
- if (!running || !datanode.shouldRun) {
- break;
- }
- Packet pkt = ackQueue.getFirst();
- long expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " acking for packet " + expected);
-
- // If this is the last packet in block, then close block
- // file and finalize the block before responding success
- if (pkt.lastPacketInBlock) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
- lastPacket = true;
- }
-
- new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
- replyOut.flush();
- // remove the packet from the ack queue
- removeAckHead();
- // update the bytes acked
- if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
- }
- } catch (Exception e) {
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
- if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
- ioe);
- }
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
- }
- }
- }
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
-
- // If this is the last datanode in pipeline, then handle differently
- if (numTargets == 0) {
- lastDataNodeRun();
- return;
- }
-
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -897,19 +790,18 @@
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
- try {
- // read an ack from downstream datanode
- ack.readFields(mirrorIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ long seqno = PipelineAck.UNKOWN_SEQNO;
+ try {
+ if (numTargets != 0) {// not the last DN
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ seqno = ack.getSeqno();
+ didRead = true;
}
- long seqno = ack.getSeqno();
- didRead = true;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- ack.write(replyOut);
- replyOut.flush();
- continue;
- } else if (seqno >= 0) {
+ if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -925,9 +817,12 @@
throw e;
}
}
+ if (!running || !datanode.shouldRun) {
+ break;
+ }
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- if (seqno != expected) {
+ if (numTargets > 0 && seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
@@ -983,14 +878,15 @@
// construct my ack message
Status[] replies = null;
- if (!didRead) { // no ack is read
+ if (!didRead && numTargets != 0) { // ack read error
replies = new Status[2];
replies[0] = SUCCESS;
replies[1] = ERROR;
} else {
- replies = new Status[1+ack.getNumOfReplies()];
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ replies = new Status[1+ackLen];
replies[0] = SUCCESS;
- for (int i=0; i<ack.getNumOfReplies(); i++) {
+ for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
}
Propchange: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -3,4 +3,4 @@
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/protocol/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1 +1 @@
-/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol:817853-817863,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Modified: hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 15 06:27:19 2009
@@ -99,12 +99,6 @@
&& args(acked)
&& this(pr);
- pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) :
- call (void ReplicaInPipelineInterface.setBytesAcked(long))
- && withincode (void PacketResponder.lastDataNodeRun())
- && args(acked)
- && this(pr);
-
after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
@@ -117,19 +111,7 @@
bytesAckedService((PipelinesTest)pTest, pr, acked);
}
}
- after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
- PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
- if (pTest == null) {
- LOG.debug("FI: no pipeline has been found in acking");
- return;
- }
- LOG.debug("FI: Acked total bytes from (last DN): " +
- pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
- if (pTest instanceof PipelinesTest) {
- bytesAckedService((PipelinesTest)pTest, pr, acked);
- }
- }
-
+
private void bytesAckedService
(final PipelinesTest pTest, final PacketResponder pr, final long acked) {
NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
Propchange: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
/hadoop/core/trunk/src/test/hdfs:776175-785643
/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/test/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/test/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Modified: hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=890664&r1=890663&r2=890664&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Tue Dec 15 06:27:19 2009
@@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hdfs;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -30,6 +34,11 @@
/** Class contains a set of tests to verify the correctness of
* newly introduced {@link FSDataOutputStream#hflush()} method */
public class TestHFlush {
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private final String fName = "hflushtest.dat";
/** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
actual[idx] = 0;
}
}
+
+ /** This creates a slow writer and check to see
+ * if pipeline heartbeats work fine
+ */
+ @Test
+ public void testPipelineHeartbeat() throws Exception {
+ final int DATANODE_NUM = 2;
+ final int fileLen = 6;
+ Configuration conf = new HdfsConfiguration();
+ final int timeout = 2000;
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ timeout);
+
+ final Path p = new Path("/pipelineHeartbeat/foo");
+ System.out.println("p=" + p);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+ // create a new file.
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+ stm.write(fileContents, 0, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+ System.out.println("Wrote 1 byte and hflush " + p);
+
+ // write another byte
+ Thread.sleep(timeout);
+ stm.write(fileContents, 1, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 2, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+
+ stm.write(fileContents, 3, 1);
+ Thread.sleep(timeout);
+ stm.write(fileContents, 4, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 5, 1);
+ Thread.sleep(timeout);
+ stm.close();
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, p, fileLen,
+ fileContents, "Failed to slowly write to a file");
+ }
}
Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
/hadoop/core/trunk/src/webapps/datanode:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
-/hadoop/hdfs/trunk/src/webapps/datanode:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/datanode:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
/hadoop/core/trunk/src/webapps/hdfs:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
-/hadoop/hdfs/trunk/src/webapps/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/hdfs:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655
Propchange: hadoop/hdfs/branches/branch-0.21/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 15 06:27:19 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
/hadoop/core/trunk/src/webapps/secondary:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
-/hadoop/hdfs/trunk/src/webapps/secondary:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002
+/hadoop/hdfs/trunk/src/webapps/secondary:817853-817863,818294-818298,824552,824944,826149,828116,828926,829880,829894,830003,831436,831455-831490,832043,833499,835728,880971,881014,881017,884432,888084,888507,888519,889002,890655