You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/08 19:37:25 UTC
svn commit: r888507 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/aop/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/ha...
Author: hairong
Date: Tue Dec 8 18:37:25 2009
New Revision: 888507
URL: http://svn.apache.org/viewvc?rev=888507&view=rev
Log:
HDFS-793. Data node should receive the whole packet ack message before it constructs and sends its own ack message for the packet. Contributed by Hairong Kuang.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 8 18:37:25 2009
@@ -584,6 +584,9 @@
HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
(Zhang Bingjun via dhruba)
+ HDFS-793. Data node should receive the whole packet ack message before it
+ constructs and sends its own ack message for the packet. (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec 8 18:37:25 2009
@@ -87,6 +87,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -2894,15 +2895,20 @@
public void run() {
this.setName("ResponseProcessor for block " + block);
+ PipelineAck ack = new PipelineAck();
while (!responderClosed && clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
- // verify seqno from datanode
- long seqno = blockReplyStream.readLong();
- LOG.debug("DFSClient received ack for seqno " + seqno);
+ // read an ack from the pipeline
+ ack.readFields(blockReplyStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient " + ack);
+ }
+
+ long seqno = ack.getSeqno();
Packet one = null;
- if (seqno == -1) {
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
continue;
} else if (seqno == -2) {
// no nothing
@@ -2919,20 +2925,9 @@
}
// processes response status from all datanodes.
- String replies = null;
- if (LOG.isDebugEnabled()) {
- replies = "DFSClient Replies for seqno " + seqno + " are";
- }
- for (int i = 0; i < targets.length && clientRunning; i++) {
- final DataTransferProtocol.Status reply
- = DataTransferProtocol.Status.read(blockReplyStream);
- if (LOG.isDebugEnabled()) {
- replies += " " + reply;
- }
+ for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
+ final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
- }
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2941,10 +2936,6 @@
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(replies);
- }
-
if (one == null) {
throw new IOException("Panic: responder did not receive " +
"an ack for a packet: " + seqno);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Dec 8 18:37:25 2009
@@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.security.BlockAccessToken;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -39,12 +40,11 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 17:
- * Change the block write protocol to support pipeline recovery.
- * Additional fields, like recovery flags, new GS, minBytesRcvd,
- * and maxBytesRcvd are included.
+ * Version 18:
+ * Change the block packet ack protocol to include seqno,
+ * numberOfReplies, reply0, reply1, ...
*/
- public static final int DATA_TRANSFER_VERSION = 17;
+ public static final int DATA_TRANSFER_VERSION = 18;
/** Operation */
public enum Op {
@@ -453,4 +453,94 @@
return t;
}
}
+
+ /** reply **/
+ public static class PipelineAck implements Writable {
+ private long seqno;
+ private Status replies[];
+ final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, Status[] replies) {
+ this.seqno = seqno;
+ this.replies = replies;
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return seqno;
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)replies.length;
+ }
+
+ /**
+ * get the ith reply
+ * @return the the ith reply
+ */
+ public Status getReply(int i) {
+ return replies[i];
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (Status reply : replies) {
+ if (reply != Status.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**** Writable interface ****/
+ @Override // Writable
+ public void readFields(DataInput in) throws IOException {
+ seqno = in.readLong();
+ short numOfReplies = in.readShort();
+ replies = new Status[numOfReplies];
+ for (int i=0; i<numOfReplies; i++) {
+ replies[i] = Status.read(in);
+ }
+ }
+
+ @Override // Writable
+ public void write(DataOutput out) throws IOException {
+ //WritableUtils.writeVLong(out, seqno);
+ out.writeLong(seqno);
+ out.writeShort((short)replies.length);
+ for(Status reply : replies) {
+ reply.write(out);
+ }
+ }
+
+ @Override //Object
+ public String toString() {
+ StringBuilder ack = new StringBuilder("Replies for seqno ");
+ ack.append( seqno ).append( " are" );
+ for(Status reply : replies) {
+ ack.append(" ");
+ ack.append(reply);
+ }
+ return ack.toString();
+ }
+ }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 8 18:37:25 2009
@@ -35,11 +35,11 @@
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -805,8 +805,13 @@
// send a heartbeat if it is time.
now = System.currentTimeMillis();
if (now - lastHeartbeat > datanode.socketTimeout/2) {
- replyOut.writeLong(-1); // send heartbeat
+ PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
replyOut.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " sent a heartbeat");
+ }
lastHeartbeat = now;
}
}
@@ -843,7 +848,7 @@
lastPacket = true;
}
- ackReply(expected);
+ new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
replyOut.flush();
// remove the packet from the ack queue
removeAckHead();
@@ -870,14 +875,6 @@
" for block " + block + " terminating");
}
- // This method is introduced to facilitate testing. Otherwise
- // there was a little chance to bind an AspectJ advice to such a sequence
- // of calls
- private void ackReply(long expected) throws IOException {
- replyOut.writeLong(expected);
- SUCCESS.write(replyOut);
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
@@ -896,24 +893,23 @@
boolean isInterrupted = false;
try {
- DataTransferProtocol.Status op = SUCCESS;
boolean didRead = false;
Packet pkt = null;
long expected = -2;
+ PipelineAck ack = new PipelineAck();
try {
- // read seqno from downstream datanode
- long seqno = mirrorIn.readLong();
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ long seqno = ack.getSeqno();
didRead = true;
- if (seqno == -1) {
- replyOut.writeLong(-1); // send keepalive
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+ ack.write(replyOut);
replyOut.flush();
- LOG.debug("PacketResponder " + numTargets + " got -1");
continue;
- } else if (seqno == -2) {
- LOG.debug("PacketResponder " + numTargets + " got -2");
- } else {
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
- seqno);
+ } else if (seqno >= 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -931,7 +927,6 @@
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
if (seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
@@ -964,10 +959,6 @@
continue;
}
- if (!didRead) {
- op = ERROR;
- }
-
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
@@ -990,54 +981,42 @@
}
}
- // send my status back to upstream datanode
- ackReply(expected);
-
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded my status " +
- " for seqno " + expected);
-
- boolean success = true;
- // forward responses from downstream datanodes.
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
- try {
- if (op == SUCCESS) {
- op = Status.read(mirrorIn);
- if (op != SUCCESS) {
- success = false;
- LOG.debug("PacketResponder for block " + block +
- ": error code received from downstream " +
- " datanode[" + i + "] " + op);
- }
- }
- } catch (Throwable e) {
- op = ERROR;
- success = false;
+ // construct my ack message
+ Status[] replies = null;
+ if (!didRead) { // no ack is read
+ replies = new Status[2];
+ replies[0] = SUCCESS;
+ replies[1] = ERROR;
+ } else {
+ replies = new Status[1+ack.getNumOfReplies()];
+ replies[0] = SUCCESS;
+ for (int i=0; i<ack.getNumOfReplies(); i++) {
+ replies[i+1] = ack.getReply(i);
}
- op.write(replyOut);
}
- replyOut.flush();
+ PipelineAck replyAck = new PipelineAck(expected, replies);
- LOG.debug("PacketResponder " + block + " " + numTargets +
- " responded other status " + " for seqno " + expected);
-
+ // send my ack back to upstream datanode
+ replyAck.write(replyOut);
+ replyOut.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded an ack: " + replyAck);
+ }
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
- if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+ if (replyAck.isSuccess() &&
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
}
}
- // If we were unable to read the seqno from downstream, then stop.
- if (expected == -2) {
- running = false;
- }
// If we forwarded an error response from a downstream datanode
// and we are acting on behalf of a client, then we quit. The
// client will drive the recovery mechanism.
- if (op == ERROR && receiver.clientName.length() > 0) {
+ if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
running = false;
}
} catch (IOException e) {
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 8 18:37:25 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,6 +32,7 @@
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -140,7 +142,7 @@
}
pointcut preventAckSending () :
- call (void ackReply(long))
+ call (void PipelineAck.write(DataOutput))
&& within (PacketResponder);
static int ackCounter = 0;
@@ -193,7 +195,7 @@
}
pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
- call (Status Status.read(DataInput))
+ call (void PipelineAck.readFields(DataInput))
&& this(packetresponder);
after(BlockReceiver.PacketResponder packetresponder) throws IOException
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Dec 8 18:37:25 2009
@@ -19,6 +19,8 @@
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
@@ -157,9 +159,8 @@
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- SUCCESS.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
sendRecvData(description, false);
}
@@ -381,9 +382,8 @@
// bad data chunk length
sendOut.writeInt(-1-random.nextInt(oneMil));
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- ERROR.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true);
@@ -406,9 +406,8 @@
sendOut.flush();
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- SUCCESS.write(recvOut);
+ Text.writeString(recvOut, "");
+ new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */