You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/08 19:37:25 UTC

svn commit: r888507 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/ha...

Author: hairong
Date: Tue Dec  8 18:37:25 2009
New Revision: 888507

URL: http://svn.apache.org/viewvc?rev=888507&view=rev
Log:
HDFS-793. Data node should receive the whole packet ack message before it constructs and sends its own ack message for the packet. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec  8 18:37:25 2009
@@ -584,6 +584,9 @@
     HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
     (Zhang Bingjun via dhruba)
 
+    HDFS-793. Data node should receive the whole packet ack message before it
+    constructs and sends its own ack message for the packet. (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec  8 18:37:25 2009
@@ -87,6 +87,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -2894,15 +2895,20 @@
         public void run() {
 
           this.setName("ResponseProcessor for block " + block);
+          PipelineAck ack = new PipelineAck();
 
           while (!responderClosed && clientRunning && !isLastPacketInBlock) {
             // process responses from datanodes.
             try {
-              // verify seqno from datanode
-              long seqno = blockReplyStream.readLong();
-              LOG.debug("DFSClient received ack for seqno " + seqno);
+              // read an ack from the pipeline
+              ack.readFields(blockReplyStream);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("DFSClient " + ack);
+              }
+              
+              long seqno = ack.getSeqno();
               Packet one = null;
-              if (seqno == -1) {
+              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
                 continue;
               } else if (seqno == -2) {
                 // no nothing
@@ -2919,20 +2925,9 @@
               }
 
               // processes response status from all datanodes.
-              String replies = null;
-              if (LOG.isDebugEnabled()) {
-                replies = "DFSClient Replies for seqno " + seqno + " are";
-              }
-              for (int i = 0; i < targets.length && clientRunning; i++) {
-                final DataTransferProtocol.Status reply
-                    = DataTransferProtocol.Status.read(blockReplyStream);
-                if (LOG.isDebugEnabled()) {
-                  replies += " " + reply;
-                }
+              for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
+                final DataTransferProtocol.Status reply = ack.getReply(i);
                 if (reply != SUCCESS) {
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug(replies);
-                  }
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2941,10 +2936,6 @@
                 }
               }
 
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(replies);
-              }
-              
               if (one == null) {
                 throw new IOException("Panic: responder did not receive " +
                     "an ack for a packet: " + seqno);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Dec  8 18:37:25 2009
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
@@ -39,12 +40,11 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 17:
-   *    Change the block write protocol to support pipeline recovery.
-   *    Additional fields, like recovery flags, new GS, minBytesRcvd, 
-   *    and maxBytesRcvd are included.
+   * Version 18:
+   *    Change the block packet ack protocol to include seqno,
+   *    numberOfReplies, reply0, reply1, ...
    */
-  public static final int DATA_TRANSFER_VERSION = 17;
+  public static final int DATA_TRANSFER_VERSION = 18;
 
   /** Operation */
   public enum Op {
@@ -453,4 +453,94 @@
       return t; 
     }
   }
+  
+  /** reply **/
+  public static class PipelineAck implements Writable {
+    private long seqno;
+    private Status replies[];
+    final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+
+    /** default constructor **/
+    public PipelineAck() {
+    }
+    
+    /**
+     * Constructor
+     * @param seqno sequence number
+     * @param replies an array of replies
+     */
+    public PipelineAck(long seqno, Status[] replies) {
+      this.seqno = seqno;
+      this.replies = replies;
+    }
+    
+    /**
+     * Get the sequence number
+     * @return the sequence number
+     */
+    public long getSeqno() {
+      return seqno;
+    }
+    
+    /**
+     * Get the number of replies
+     * @return the number of replies
+     */
+    public short getNumOfReplies() {
+      return (short)replies.length;
+    }
+    
+    /**
+     * get the ith reply
+     * @return the the ith reply
+     */
+    public Status getReply(int i) {
+      return replies[i];
+    }
+    
+    /**
+     * Check if this ack contains error status
+     * @return true if all statuses are SUCCESS
+     */
+    public boolean isSuccess() {
+      for (Status reply : replies) {
+        if (reply != Status.SUCCESS) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    /**** Writable interface ****/
+    @Override // Writable
+    public void readFields(DataInput in) throws IOException {
+      seqno = in.readLong();
+      short numOfReplies = in.readShort();
+      replies = new Status[numOfReplies];
+      for (int i=0; i<numOfReplies; i++) {
+        replies[i] = Status.read(in);
+      }
+    }
+
+    @Override // Writable
+    public void write(DataOutput out) throws IOException {
+      //WritableUtils.writeVLong(out, seqno);
+      out.writeLong(seqno);
+      out.writeShort((short)replies.length);
+      for(Status reply : replies) {
+        reply.write(out);
+      }
+    }
+    
+    @Override //Object
+    public String toString() {
+      StringBuilder ack = new StringBuilder("Replies for seqno ");
+      ack.append( seqno ).append( " are" );
+      for(Status reply : replies) {
+        ack.append(" ");
+        ack.append(reply);
+      }
+      return ack.toString();
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec  8 18:37:25 2009
@@ -35,11 +35,11 @@
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -805,8 +805,13 @@
               // send a heartbeat if it is time.
               now = System.currentTimeMillis();
               if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                replyOut.writeLong(-1); // send heartbeat
+                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
                 replyOut.flush();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets +
+                            " for block " + block + 
+                            " sent a heartbeat");
+                }
                 lastHeartbeat = now;
               }
             }
@@ -843,7 +848,7 @@
               lastPacket = true;
             }
 
-            ackReply(expected);
+            new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
             replyOut.flush();
             // remove the packet from the ack queue
             removeAckHead();
@@ -870,14 +875,6 @@
                " for block " + block + " terminating");
     }
 
-    // This method is introduced to facilitate testing. Otherwise
-    // there was a little chance to bind an AspectJ advice to such a sequence
-    // of calls
-    private void ackReply(long expected) throws IOException {
-      replyOut.writeLong(expected);
-      SUCCESS.write(replyOut);
-    }
-
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
@@ -896,24 +893,23 @@
 
         boolean isInterrupted = false;
         try {
-            DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
             Packet pkt = null;
             long expected = -2;
+            PipelineAck ack = new PipelineAck();
             try { 
-              // read seqno from downstream datanode
-              long seqno = mirrorIn.readLong();
+              // read an ack from downstream datanode
+              ack.readFields(mirrorIn);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("PacketResponder " + numTargets + " got " + ack);
+              }
+              long seqno = ack.getSeqno();
               didRead = true;
-              if (seqno == -1) {
-                replyOut.writeLong(-1); // send keepalive
+              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+                ack.write(replyOut);
                 replyOut.flush();
-                LOG.debug("PacketResponder " + numTargets + " got -1");
                 continue;
-              } else if (seqno == -2) {
-                LOG.debug("PacketResponder " + numTargets + " got -2");
-              } else {
-                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
-                    seqno);
+              } else if (seqno >= 0) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -931,7 +927,6 @@
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
@@ -964,10 +959,6 @@
               continue;
             }
             
-            if (!didRead) {
-              op = ERROR;
-            }
-            
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
@@ -990,54 +981,42 @@
               }
             }
 
-            // send my status back to upstream datanode
-            ackReply(expected);
-
-            LOG.debug("PacketResponder " + numTargets + 
-                      " for block " + block +
-                      " responded my status " +
-                      " for seqno " + expected);
-
-            boolean success = true;
-            // forward responses from downstream datanodes.
-            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
-              try {
-                if (op == SUCCESS) {
-                  op = Status.read(mirrorIn);
-                  if (op != SUCCESS) {
-                    success = false;
-                    LOG.debug("PacketResponder for block " + block +
-                              ": error code received from downstream " +
-                              " datanode[" + i + "] " + op);
-                  }
-                }
-              } catch (Throwable e) {
-                op = ERROR;
-                success = false;
+            // construct my ack message
+            Status[] replies = null;
+            if (!didRead) { // no ack is read
+              replies = new Status[2];
+              replies[0] = SUCCESS;
+              replies[1] = ERROR;
+            } else {
+              replies = new Status[1+ack.getNumOfReplies()];
+              replies[0] = SUCCESS;
+              for (int i=0; i<ack.getNumOfReplies(); i++) {
+                replies[i+1] = ack.getReply(i);
               }
-              op.write(replyOut);
             }
-            replyOut.flush();
+            PipelineAck replyAck = new PipelineAck(expected, replies);
             
-            LOG.debug("PacketResponder " + block + " " + numTargets + 
-                      " responded other status " + " for seqno " + expected);
-
+            // send my ack back to upstream datanode
+            replyAck.write(replyOut);
+            replyOut.flush();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("PacketResponder " + numTargets + 
+                        " for block " + block +
+                        " responded an ack: " + replyAck);
+            }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
-              if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              if (replyAck.isSuccess() && 
+                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
                 replicaInfo.setBytesAcked(pkt.lastByteInBlock);
               }
             }
-            // If we were unable to read the seqno from downstream, then stop.
-            if (expected == -2) {
-              running = false;
-            }
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
-            if (op == ERROR && receiver.clientName.length() > 0) {
+            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
               running = false;
             }
         } catch (IOException e) {

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec  8 18:37:25 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -31,6 +32,7 @@
 import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
@@ -140,7 +142,7 @@
   }
   
   pointcut preventAckSending () :
-    call (void ackReply(long)) 
+    call (void PipelineAck.write(DataOutput)) 
     && within (PacketResponder);
 
   static int ackCounter = 0;
@@ -193,7 +195,7 @@
   }
 
   pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
-    call (Status Status.read(DataInput))
+    call (void PipelineAck.readFields(DataInput))
       && this(packetresponder);
 
   after(BlockReceiver.PacketResponder packetresponder) throws IOException

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=888507&r1=888506&r2=888507&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Dec  8 18:37:25 2009
@@ -19,6 +19,8 @@
 
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
@@ -157,9 +159,8 @@
         
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
     sendRecvData(description, false);
   }
   
@@ -381,9 +382,8 @@
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    ERROR.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
                  true);
 
@@ -406,9 +406,8 @@
     sendOut.flush();
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */