You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/21 18:53:10 UTC

svn commit: r639734 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java

Author: rangadi
Date: Fri Mar 21 10:53:01 2008
New Revision: 639734

URL: http://svn.apache.org/viewvc?rev=639734&view=rev
Log:
HADOOP-3007. Tolerate mirror failures while DataNode is replicating blocks as it used to before. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=639734&r1=639733&r2=639734&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Mar 21 10:53:01 2008
@@ -355,6 +355,8 @@
     getCurrentOutputPath and getFinalOutputPath.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3007. Tolerate mirror failures while DataNode is replicating
+    blocks as it used to before. (rangadi)
 
 Release 0.16.1 - 2008-03-13
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=639734&r1=639733&r2=639734&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Mar 21 10:53:01 2008
@@ -1167,7 +1167,14 @@
             mirrorIn = null;
             IOUtils.closeSocket(mirrorSock);
             mirrorSock = null;
-            throw e;
+            if (client.length() > 0) {
+              throw e;
+            } else {
+              LOG.info(dnRegistration + ":Exception transfering block " +
+                       block + " to mirror " + mirrorNode +
+                       ". continuing without the mirror.\n" +
+                       StringUtils.stringifyException(e));
+            }
           }
         }
 
@@ -2220,6 +2227,26 @@
       }
     }
 
+    /**
+     * While writing to mirrorOut, failure to write to mirror should not
+     * affect this datanode unless a client is writing the block.
+     */
+    private void handleMirrorOutError(IOException ioe) throws IOException {
+      LOG.info(dnRegistration + ":Exception writing block " +
+               block + " to mirror " + mirrorAddr + "\n" +
+               StringUtils.stringifyException(ioe));
+      mirrorOut = null;
+      //
+      // If stream-copy fails, continue
+      // writing to disk for replication requests. For client
+      // writes, return error so that the client can do error
+      // recovery.
+      //
+      if (clientName.length() > 0) {
+        throw ioe;
+      }
+    }
+    
     /* receive a chunk: write it to disk & mirror it to another stream */
     private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
                               throws IOException {
@@ -2253,19 +2280,7 @@
           mirrorOut.write(checksumBuf, checksumOff, checksumSize);
           mirrorOut.write(buf, 0, len);
         } catch (IOException ioe) {
-          LOG.info(dnRegistration + ":Exception writing block " +
-                   block + " to mirror " + mirrorAddr + "\n" +
-                   StringUtils.stringifyException(ioe));
-          mirrorOut = null;
-          //
-          // If stream-copy fails, continue
-          // writing to disk for replication requests. For client
-          // writes, return error so that the client can do error
-          // recovery.
-          //
-          if (clientName.length() > 0) {
-            throw ioe;
-          }
+          handleMirrorOutError(ioe);
         }
       }
 
@@ -2339,26 +2354,19 @@
           mirrorOut.writeLong(seqno);
           mirrorOut.writeBoolean(lastPacketInBlock);
         } catch (IOException e) {
-          LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
-              + StringUtils.stringifyException(e));
-          mirrorOut = null;
-
-          // If stream-copy fails, continue
-          // writing to disk for replication requests. For client
-          // writes, return error so that the client can do error
-          // recovery.
-          //
-          if (clientName.length() > 0) {
-            throw e;
-          }
+          handleMirrorOutError(e);
         }
       }
 
       if (len == 0) {
         LOG.info("Receiving empty packet for block " + block);
         if (mirrorOut != null) {
-          mirrorOut.writeInt(len);
-          mirrorOut.flush();
+          try {
+            mirrorOut.writeInt(len);
+            mirrorOut.flush();
+          } catch (IOException e) {
+            handleMirrorOutError(e);
+          }
         }
       }
 
@@ -2397,7 +2405,11 @@
         
         if (curPacketSize == packetSize) {
           if (mirrorOut != null) {
-            mirrorOut.flush();
+            try {
+              mirrorOut.flush();
+            } catch (IOException e) {
+              handleMirrorOutError(e);
+            }
           }
           break;
         }
@@ -2421,15 +2433,15 @@
    
 
     public void receiveBlock(
-        DataOutputStream mirrorOut, // output to next datanode
-        DataInputStream mirrorIn,   // input from next datanode
+        DataOutputStream mirrOut, // output to next datanode
+        DataInputStream mirrIn,   // input from next datanode
         DataOutputStream replyOut,  // output to previous datanode
-        String mirrorAddr, Throttler throttler,
+        String mirrAddr, Throttler throttlerArg,
         int numTargets) throws IOException {
 
-        this.mirrorOut = mirrorOut;
-        this.mirrorAddr = mirrorAddr;
-        this.throttler = throttler;
+        mirrorOut = mirrOut;
+        mirrorAddr = mirrAddr;
+        throttler = throttlerArg;
 
       try {
         // write data chunk header
@@ -2439,7 +2451,7 @@
         }
         if (clientName.length() > 0) {
           responder = new Daemon(threadGroup, 
-                                 new PacketResponder(this, block, mirrorIn, 
+                                 new PacketResponder(this, block, mirrIn, 
                                                      replyOut, numTargets,
                                                      clientName));
           responder.start(); // start thread to processes reponses
@@ -2456,8 +2468,12 @@
 
         // flush the mirror out
         if (mirrorOut != null) {
-          mirrorOut.writeInt(0); // mark the end of the block
-          mirrorOut.flush();
+          try {
+            mirrorOut.writeInt(0); // mark the end of the block
+            mirrorOut.flush();
+          } catch (IOException e) {
+            handleMirrorOutError(e);
+          }
         }
 
         // wait for all outstanding packet responses. And then