You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/06/10 01:09:07 UTC

svn commit: r953183 - in /hadoop/common/branches/branch-0.20-append: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Author: dhruba
Date: Wed Jun  9 23:09:07 2010
New Revision: 953183

URL: http://svn.apache.org/viewvc?rev=953183&view=rev
Log:
HDFS-101. DFSClient correctly detects second datanode failure in write
pipeline. (Nicolas Spiegelberg via dhruba)


Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=953183&r1=953182&r2=953183&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Wed Jun  9 23:09:07 2010
@@ -6,6 +6,9 @@ Release 0.20-append - Unreleased
 
     HDFS-200. Support append and sync for hadoop 0.20 branch.
 
+    HDFS-101. DFSClient correctly detects second datanode failure in write
+    pipeline. (Nicolas Spiegelberg via dhruba)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=953183&r1=953182&r2=953183&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Jun  9 23:09:07 2010
@@ -92,6 +92,15 @@ public interface DataTransferProtocol {
     }
 
     /**
+     * Get the number of replies
+     * @return the number of replies
+     */
+    public short getNumOfReplies() {
+      return (short)replies.length;
+    }
+
+
+    /**
      * get the ith reply
      * @return the the ith reply
      */

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=953183&r1=953182&r2=953183&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jun  9 23:09:07 2010
@@ -75,6 +75,7 @@ class BlockReceiver implements java.io.C
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private DataNode datanode = null;
+  volatile private boolean mirrorError;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
@@ -173,21 +174,19 @@ class BlockReceiver implements java.io.C
 
   /**
    * While writing to mirrorOut, failure to write to mirror should not
-   * affect this datanode unless a client is writing the block.
+   * affect this datanode.
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
-    LOG.info(datanode.dnRegistration + ":Exception writing block " +
+    LOG.info(datanode.dnRegistration + ": Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
-    mirrorOut = null;
-    //
-    // If stream-copy fails, continue
-    // writing to disk for replication requests. For client
-    // writes, return error so that the client can do error
-    // recovery.
-    //
-    if (clientName.length() > 0) {
+    if (Thread.interrupted()) { // shut down if the thread is interrupted
       throw ioe;
+    } else { // encounter an error while writing to mirror
+      // continue to run even if can not write to mirror
+      // notify client of the error
+      // and wait for the client to shut down the pipeline
+      mirrorError = true;
     }
   }
   
@@ -396,8 +395,8 @@ class BlockReceiver implements java.io.C
     
     setBlockPosition(offsetInBlock);
     
-    //First write the packet to the mirror:
-    if (mirrorOut != null) {
+    // First write the packet to the mirror:
+    if (mirrorOut != null && !mirrorError) {
       try {
         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
         mirrorOut.flush();
@@ -515,7 +514,8 @@ class BlockReceiver implements java.io.C
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets));
+                                                   replyOut, numTargets,
+                                                   Thread.currentThread()));
         responder.start(); // start thread to processes reponses
       }
 
@@ -700,18 +700,21 @@ class BlockReceiver implements java.io.C
     DataOutputStream replyOut;  // output to upstream datanode
     private int numTargets;     // number of downstream datanodes including myself
     private BlockReceiver receiver; // The owner of this responder.
+    private Thread receiverThread; // the thread that spawns this responder
 
     public String toString() {
       return "PacketResponder " + numTargets + " for Block " + this.block;
     }
 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets) {
+                    DataOutputStream out, int numTargets,
+                    Thread receiverThread) {
       this.receiver = receiver;
       this.block = b;
       mirrorIn = in;
       replyOut = out;
       this.numTargets = numTargets;
+      this.receiverThread = receiverThread;
     }
 
     /**
@@ -848,11 +851,10 @@ class BlockReceiver implements java.io.C
       }
 
       boolean lastPacketInBlock = false;
+      boolean isInterrupted = false;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
-            boolean didRead = false;
-
             /**
              * Sequence number -2 is a special value that is used when
              * a DN fails to read an ack from a downstream. In this case,
@@ -861,31 +863,20 @@ class BlockReceiver implements java.io.C
              * as an UNKNOWN value.
              */
             long expected = -2;
+            long seqno = -2;
 
             PipelineAck ack = new PipelineAck();
             try { 
-              // read an ack from downstream datanode
-              ack.readFields(mirrorIn, numTargets);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("PacketResponder " + numTargets + " got " + ack);
-              }
-              long seqno = ack.getSeqno();
-              didRead = true;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut); // send keepalive
-                replyOut.flush();
-                continue;
-              } else if (seqno == -2) {
-                // A downstream node must have failed to read an ack. We need
-                // to forward this on.
-                assert ! ack.isSuccess();
-              } else {
-                if (seqno < 0) {
-                  throw new IOException("Received an invalid negative sequence number. "
-                                        + "Ack = " + ack);
+              if (!mirrorError) {
+                // read an ack from downstream datanode
+                ack.readFields(mirrorIn, numTargets);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block + " got " + ack);
                 }
-                assert seqno >= 0;
-
+                seqno = ack.getSeqno();
+              }
+              if (seqno >= 0 || mirrorError) {
                 Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -897,10 +888,13 @@ class BlockReceiver implements java.io.C
                     }
                     wait();
                   }
+                  if (!running || !datanode.shouldRun) {
+                    break;
+                  }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   notifyAll();
-                  if (seqno != expected) {
+                  if (seqno != expected && !mirrorError) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " expected seqno:" + expected +
@@ -909,27 +903,32 @@ class BlockReceiver implements java.io.C
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
               }
-            } catch (Throwable e) {
-              if (running) {
-                LOG.info("PacketResponder " + block + " " + numTargets + 
-                         " Exception " + StringUtils.stringifyException(e));
-                running = false;
+            } catch (InterruptedException ine) {
+              isInterrupted = true;
+            } catch (IOException ioe) {
+              if (Thread.interrupted()) {
+                isInterrupted = true;
+              } else {
+                // continue to run even if can not read from mirror
+                // notify client of the error
+                // and wait for the client to shut down the pipeline
+                mirrorError = true;
+                LOG.info("PacketResponder " + block + " " + numTargets +
+                    " Exception " + StringUtils.stringifyException(ioe));
               }
             }
 
-            if (Thread.interrupted()) {
+            if (Thread.interrupted() || isInterrupted) {
               /* The receiver thread cancelled this thread. 
                * We could also check any other status updates from the 
                * receiver thread (e.g. if it is ok to write to replyOut). 
                * It is prudent to not send any more status back to the client
                * because this datanode has a problem. The upstream datanode
-               * will detect a timout on heartbeats and will declare that
-               * this datanode is bad, and rightly so.
+               * will detect that this datanode is bad, and rightly so.
                */
               LOG.info("PacketResponder " + block +  " " + numTargets +
                        " : Thread is interrupted.");
-              running = false;
-              continue;
+              break;
             }
             
             // If this is the last packet in block, then close block
@@ -954,23 +953,25 @@ class BlockReceiver implements java.io.C
               }
             }
 
-            // construct my ack message.
-            short[] replies = new short[1 + numTargets];
-            if (!didRead) { // no ack is read
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              // Fill all downstream nodes with ERROR - the client will
-              // eject the first node with ERROR status (our mirror)
-              for (int i = 1; i < replies.length; i++) {
-                replies[i] = DataTransferProtocol.OP_STATUS_ERROR;
-              }
+            PipelineAck replyAck;
+            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+              replyAck = ack;  // continue to send keep alive
             } else {
-              replies = new short[1+numTargets];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              for (int i=0; i<numTargets; i++) {
-                replies[i+1] = ack.getReply(i);
+              // construct my ack message
+              short[] replies = null;
+              if (mirrorError) { // no ack is read
+                replies = new short[2];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+              } else {
+                replies = new short[1+ack.getNumOfReplies()];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                for (int i=0; i<ack.getNumOfReplies(); i++) {
+                  replies[i+1] = ack.getReply(i);
+                }
               }
+              replyAck = new PipelineAck(expected, replies);
             }
-            PipelineAck replyAck = new PipelineAck(expected, replies);
  
             // send my ack back to upstream datanode
             replyAck.write(replyOut);
@@ -980,24 +981,15 @@ class BlockReceiver implements java.io.C
                         " for block " + block +
                         " responded an ack: " + replyAck);
             }
-
-            // If we forwarded an error response from a downstream datanode
-            // and we are acting on behalf of a client, then we quit. The 
-            // client will drive the recovery mechanism.
-            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
-              running = false;
-            }
-        } catch (IOException e) {
+        } catch (Throwable e) {
+          LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
           }
-        } catch (RuntimeException e) {
-          if (running) {
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
-            running = false;
+          if (!Thread.interrupted()) { // error not caused by interruption
+            receiverThread.interrupt();
           }
         }
       }