You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:58:47 UTC
svn commit: r1077274 - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/test/org/apache/hadoop/hdfs/
Author: omalley
Date: Fri Mar 4 03:58:47 2011
New Revision: 1077274
URL: http://svn.apache.org/viewvc?rev=1077274&view=rev
Log:
commit 37703856e56e0acdbb846a335b31eafa58d5f2bd
Author: Hairong Kuang <ha...@ucdev21.inktomisearch.com>
Date: Wed Mar 3 23:41:07 2010 +0000
HDFS:793 from http://issues.apache.org/jira/secure/attachment/12437808/separateSendRcvAck-0.20-yahoo.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar 4 03:58:47 2011
@@ -13,6 +13,10 @@ Release 0.20.2 - Unreleased
HDFS-927. DFSInputStream retries too many times for new block locations
(Todd Lipcon via Stack)
+ HDFS-793. DataNode should first receive the whole packet ack message
+ before it constructs and sends its own ack message for the packet.
+ (hairong)
+
Release 0.20.1 - 2009-09-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 03:58:47 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -2535,14 +2536,18 @@ public class DFSClient implements FSCons
public void run() {
this.setName("ResponseProcessor for block " + block);
+ PipelineAck ack = new PipelineAck();
while (!closed && clientRunning && !lastPacketInBlock) {
// process responses from datanodes.
try {
- // verify seqno from datanode
- long seqno = blockReplyStream.readLong();
- LOG.debug("DFSClient received ack for seqno " + seqno);
- if (seqno == -1) {
+ // read an ack from the pipeline
+ ack.readFields(blockReplyStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient " + ack);
+ }
+ long seqno = ack.getSeqno();
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
continue;
} else if (seqno == -2) {
// no nothing
@@ -2560,8 +2565,8 @@ public class DFSClient implements FSCons
}
// processes response status from all datanodes.
- for (int i = 0; i < targets.length && clientRunning; i++) {
- short reply = blockReplyStream.readShort();
+ for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
+ short reply = ack.getReply(i);
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Mar 4 03:58:47 2011
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
/**
*
@@ -31,12 +36,11 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 16:
- * Datanode now needs to send back a status code together
- * with firstBadLink during pipeline setup for dfs write
- * (only for DFSClients, not for other datanodes).
+ * Version 18:
+ * Change the block packet ack protocol to include seqno,
+ * numberOfReplies, reply0, reply1, ...
*/
- public static final int DATA_TRANSFER_VERSION = 16;
+ public static final int DATA_TRANSFER_VERSION = 17;
// Processed at datanode stream-handler
public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -57,6 +61,97 @@ public interface DataTransferProtocol {
public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
public static final int OP_STATUS_CHECKSUM_OK = 6;
-
-
+ /** reply **/
+ public static class PipelineAck implements Writable {
+ private long seqno;
+ private short replies[];
+ final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new short[0]);
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, short[] replies) {
+ this.seqno = seqno;
+ this.replies = replies;
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return seqno;
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)replies.length;
+ }
+
+ /**
+ * get the ith reply
+ * @return the the ith reply
+ */
+ public short getReply(int i) {
+ return replies[i];
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (short reply : replies) {
+ if (reply != OP_STATUS_SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**** Writable interface ****/
+ @Override // Writable
+ public void readFields(DataInput in) throws IOException {
+ seqno = in.readLong();
+ short numOfReplies = in.readShort();
+ replies = new short[numOfReplies];
+ for (int i=0; i<numOfReplies; i++) {
+ replies[i] = in.readShort();
+ }
+ }
+
+ @Override // Writable
+ public void write(DataOutput out) throws IOException {
+ //WritableUtils.writeVLong(out, seqno);
+ out.writeLong(seqno);
+ out.writeShort((short)replies.length);
+ for(short reply : replies) {
+ out.writeShort(reply);
+ }
+ }
+
+ @Override //Object
+ public String toString() {
+ StringBuilder ack = new StringBuilder("Replies for seqno ");
+ ack.append( seqno ).append( " are" );
+ for(short reply : replies) {
+ ack.append(" ");
+ if (reply == OP_STATUS_SUCCESS) {
+ ack.append("SUCCESS");
+ } else {
+ ack.append("FAILED");
+ }
+ }
+ return ack.toString();
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar 4 03:58:47 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
@@ -777,8 +778,13 @@ class BlockReceiver implements java.io.C
// send a heartbeat if it is time.
now = System.currentTimeMillis();
if (now - lastHeartbeat > datanode.socketTimeout/2) {
- replyOut.writeLong(-1); // send heartbeat
+ PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
replyOut.flush();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " sent a heartbeat");
+ }
lastHeartbeat = now;
}
}
@@ -820,8 +826,8 @@ class BlockReceiver implements java.io.C
lastPacket = true;
}
- replyOut.writeLong(expected);
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+ new PipelineAck(expected, new short[]{
+ DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
replyOut.flush();
} catch (Exception e) {
LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
@@ -859,23 +865,21 @@ class BlockReceiver implements java.io.C
while (running && datanode.shouldRun && !lastPacketInBlock) {
try {
- short op = DataTransferProtocol.OP_STATUS_SUCCESS;
boolean didRead = false;
long expected = -2;
+ PipelineAck ack = new PipelineAck();
try {
- // read seqno from downstream datanode
- long seqno = mirrorIn.readLong();
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ long seqno = ack.getSeqno();
didRead = true;
- if (seqno == -1) {
- replyOut.writeLong(-1); // send keepalive
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+ ack.write(replyOut); // send keepalive
replyOut.flush();
- LOG.debug("PacketResponder " + numTargets + " got -1");
- continue;
- } else if (seqno == -2) {
- LOG.debug("PacketResponder " + numTargets + " got -2");
- } else {
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
- seqno);
+ } else if (seqno >= 0) {
Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -890,7 +894,6 @@ class BlockReceiver implements java.io.C
pkt = ackQueue.removeFirst();
expected = pkt.seqno;
notifyAll();
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
if (seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
@@ -923,10 +926,6 @@ class BlockReceiver implements java.io.C
continue;
}
- if (!didRead) {
- op = DataTransferProtocol.OP_STATUS_ERROR;
- }
-
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock && !receiver.finalized) {
@@ -951,43 +950,34 @@ class BlockReceiver implements java.io.C
}
}
- // send my status back to upstream datanode
- replyOut.writeLong(expected); // send seqno upstream
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
-
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " responded my status " +
- " for seqno " + expected);
-
- // forward responses from downstream datanodes.
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
- try {
- if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
- op = mirrorIn.readShort();
- if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
- LOG.debug("PacketResponder for block " + block +
- ": error code received from downstream " +
- " datanode[" + i + "] " + op);
- }
- }
- } catch (Throwable e) {
- op = DataTransferProtocol.OP_STATUS_ERROR;
+ // construct my ack message
+ short[] replies = null;
+ if (!didRead) { // no ack is read
+ replies = new short[2];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+ } else {
+ replies = new short[1+ack.getNumOfReplies()];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ for (int i=0; i<ack.getNumOfReplies(); i++) {
+ replies[i+1] = ack.getReply(i);
}
- replyOut.writeShort(op);
}
+ PipelineAck replyAck = new PipelineAck(expected, replies);
+
+ // send my ack back to upstream datanode
+ replyAck.write(replyOut);
replyOut.flush();
- LOG.debug("PacketResponder " + block + " " + numTargets +
- " responded other status " + " for seqno " + expected);
-
- // If we were unable to read the seqno from downstream, then stop.
- if (expected == -2) {
- running = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block +
+ " responded an ack: " + replyAck);
}
+
// If we forwarded an error response from a downstream datanode
// and we are acting on behalf of a client, then we quit. The
// client will drive the recovery mechanism.
- if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
+ if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
running = false;
}
} catch (IOException e) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Mar 4 03:58:47 2011
@@ -256,9 +256,9 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
- Text.writeString(recvOut, ""); // first bad node
- recvOut.writeLong(100); // sequencenumber
- recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+ Text.writeString(recvOut, "");
+ new DataTransferProtocol.PipelineAck(100,
+ new short[]{DataTransferProtocol.OP_STATUS_SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */