You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:58:47 UTC

svn commit: r1077274 - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/

Author: omalley
Date: Fri Mar  4 03:58:47 2011
New Revision: 1077274

URL: http://svn.apache.org/viewvc?rev=1077274&view=rev
Log:
commit 37703856e56e0acdbb846a335b31eafa58d5f2bd
Author: Hairong Kuang <ha...@ucdev21.inktomisearch.com>
Date:   Wed Mar 3 23:41:07 2010 +0000

    HDFS:793 from http://issues.apache.org/jira/secure/attachment/12437808/separateSendRcvAck-0.20-yahoo.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar  4 03:58:47 2011
@@ -13,6 +13,10 @@ Release 0.20.2 - Unreleased
     HDFS-927. DFSInputStream retries too many times for new block locations
     (Todd Lipcon via Stack)
 
+    HDFS-793. DataNode should first receive the whole packet ack message
+    before it constructs and sends its own ack message for the packet.
+    (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:58:47 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -2535,14 +2536,18 @@ public class DFSClient implements FSCons
       public void run() {
 
         this.setName("ResponseProcessor for block " + block);
+        PipelineAck ack = new PipelineAck();
   
         while (!closed && clientRunning && !lastPacketInBlock) {
           // process responses from datanodes.
           try {
-            // verify seqno from datanode
-            long seqno = blockReplyStream.readLong();
-            LOG.debug("DFSClient received ack for seqno " + seqno);
-            if (seqno == -1) {
+            // read an ack from the pipeline
+            ack.readFields(blockReplyStream);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DFSClient " + ack);
+            }
+            long seqno = ack.getSeqno();
+            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
               continue;
             } else if (seqno == -2) {
               // no nothing
@@ -2560,8 +2565,8 @@ public class DFSClient implements FSCons
             }
 
             // processes response status from all datanodes.
-            for (int i = 0; i < targets.length && clientRunning; i++) {
-              short reply = blockReplyStream.readShort();
+            for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
+              short reply = ack.getReply(i);
               if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
                 errorIndex = i; // first bad datanode
                 throw new IOException("Bad response " + reply +

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Fri Mar  4 03:58:47 2011
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
 
 /**
  * 
@@ -31,12 +36,11 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 16:
-   *    Datanode now needs to send back a status code together 
-   *    with firstBadLink during pipeline setup for dfs write
-   *    (only for DFSClients, not for other datanodes).
+   * Version 18:
+   *    Change the block packet ack protocol to include seqno,
+   *    numberOfReplies, reply0, reply1, ...
    */
-  public static final int DATA_TRANSFER_VERSION = 16;
+  public static final int DATA_TRANSFER_VERSION = 17;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -57,6 +61,97 @@ public interface DataTransferProtocol {
   public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
   public static final int OP_STATUS_CHECKSUM_OK = 6;
 
-
-
+  /** reply **/
+  public static class PipelineAck implements Writable {
+    private long seqno;
+    private short replies[];
+    final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new short[0]);
+
+    /** default constructor **/
+    public PipelineAck() {
+    }
+
+    /**
+     * Constructor
+     * @param seqno sequence number
+     * @param replies an array of replies
+     */
+    public PipelineAck(long seqno, short[] replies) {
+      this.seqno = seqno;
+      this.replies = replies;
+    }
+
+    /**
+     * Get the sequence number
+     * @return the sequence number
+     */
+    public long getSeqno() {
+      return seqno;
+    }
+
+    /**
+     * Get the number of replies
+     * @return the number of replies
+     */
+    public short getNumOfReplies() {
+      return (short)replies.length;
+    }
+
+    /**
+     * get the ith reply
+     * @return the the ith reply
+     */
+    public short getReply(int i) {
+      return replies[i];
+    }
+
+    /**
+     * Check if this ack contains error status
+     * @return true if all statuses are SUCCESS
+     */
+    public boolean isSuccess() {
+      for (short reply : replies) {
+        if (reply != OP_STATUS_SUCCESS) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**** Writable interface ****/
+    @Override // Writable
+    public void readFields(DataInput in) throws IOException {
+      seqno = in.readLong();
+      short numOfReplies = in.readShort();
+      replies = new short[numOfReplies];
+      for (int i=0; i<numOfReplies; i++) {
+        replies[i] = in.readShort();
+      }
+    }
+
+    @Override // Writable
+    public void write(DataOutput out) throws IOException {
+      //WritableUtils.writeVLong(out, seqno);
+      out.writeLong(seqno);
+      out.writeShort((short)replies.length);
+      for(short reply : replies) {
+        out.writeShort(reply);
+      }
+    }
+
+    @Override //Object
+    public String toString() {
+      StringBuilder ack = new StringBuilder("Replies for seqno ");
+      ack.append( seqno ).append( " are" );
+      for(short reply : replies) {
+        ack.append(" ");
+        if (reply == OP_STATUS_SUCCESS) {
+          ack.append("SUCCESS");
+        } else {
+          ack.append("FAILED");
+        }
+      }
+      return ack.toString();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar  4 03:58:47 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -777,8 +778,13 @@ class BlockReceiver implements java.io.C
               // send a heartbeat if it is time.
               now = System.currentTimeMillis();
               if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                replyOut.writeLong(-1); // send heartbeat
+                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
                 replyOut.flush();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets +
+                            " for block " + block + 
+                            " sent a heartbeat");
+                }
                 lastHeartbeat = now;
               }
             }
@@ -820,8 +826,8 @@ class BlockReceiver implements java.io.C
               lastPacket = true;
             }
 
-            replyOut.writeLong(expected);
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+            new PipelineAck(expected, new short[]{
+                DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
             replyOut.flush();
         } catch (Exception e) {
           LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
@@ -859,23 +865,21 @@ class BlockReceiver implements java.io.C
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
-            short op = DataTransferProtocol.OP_STATUS_SUCCESS;
             boolean didRead = false;
             long expected = -2;
+            PipelineAck ack = new PipelineAck();
             try { 
-              // read seqno from downstream datanode
-              long seqno = mirrorIn.readLong();
+              // read an ack from downstream datanode
+              ack.readFields(mirrorIn);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("PacketResponder " + numTargets + " got " + ack);
+              }
+              long seqno = ack.getSeqno();
               didRead = true;
-              if (seqno == -1) {
-                replyOut.writeLong(-1); // send keepalive
+              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+                ack.write(replyOut); // send keepalive
                 replyOut.flush();
-                LOG.debug("PacketResponder " + numTargets + " got -1");
-                continue;
-              } else if (seqno == -2) {
-                LOG.debug("PacketResponder " + numTargets + " got -2");
-              } else {
-                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
-                    seqno);
+              } else if (seqno >= 0) {
                 Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -890,7 +894,6 @@ class BlockReceiver implements java.io.C
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   notifyAll();
-                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
@@ -923,10 +926,6 @@ class BlockReceiver implements java.io.C
               continue;
             }
             
-            if (!didRead) {
-              op = DataTransferProtocol.OP_STATUS_ERROR;
-            }
-            
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {
@@ -951,43 +950,34 @@ class BlockReceiver implements java.io.C
               }
             }
 
-            // send my status back to upstream datanode
-            replyOut.writeLong(expected); // send seqno upstream
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
-
-            LOG.debug("PacketResponder " + numTargets + 
-                      " for block " + block +
-                      " responded my status " +
-                      " for seqno " + expected);
-
-            // forward responses from downstream datanodes.
-            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
-              try {
-                if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
-                  op = mirrorIn.readShort();
-                  if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
-                    LOG.debug("PacketResponder for block " + block +
-                              ": error code received from downstream " +
-                              " datanode[" + i + "] " + op);
-                  }
-                }
-              } catch (Throwable e) {
-                op = DataTransferProtocol.OP_STATUS_ERROR;
+            // construct my ack message
+            short[] replies = null;
+            if (!didRead) { // no ack is read
+              replies = new short[2];
+              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+              replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+            } else {
+              replies = new short[1+ack.getNumOfReplies()];
+              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+              for (int i=0; i<ack.getNumOfReplies(); i++) {
+                replies[i+1] = ack.getReply(i);
               }
-              replyOut.writeShort(op);
             }
+            PipelineAck replyAck = new PipelineAck(expected, replies);
+ 
+            // send my ack back to upstream datanode
+            replyAck.write(replyOut);
             replyOut.flush();
-            LOG.debug("PacketResponder " + block + " " + numTargets + 
-                      " responded other status " + " for seqno " + expected);
-
-            // If we were unable to read the seqno from downstream, then stop.
-            if (expected == -2) {
-              running = false;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("PacketResponder " + numTargets +
+                        " for block " + block +
+                        " responded an ack: " + replyAck);
             }
+
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
-            if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
+            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
               running = false;
             }
         } catch (IOException e) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1077274&r1=1077273&r2=1077274&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Fri Mar  4 03:58:47 2011
@@ -256,9 +256,9 @@ public class TestDataTransferProtocol ex
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+    Text.writeString(recvOut, "");
+    new DataTransferProtocol.PipelineAck(100, 
+        new short[]{DataTransferProtocol.OP_STATUS_SUCCESS}).write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */