You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/19 20:35:27 UTC

svn commit: r657903 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DFSClient.java src/java/org/apache/hadoop/dfs/DataNode.java

Author: rangadi
Date: Mon May 19 11:35:27 2008
New Revision: 657903

URL: http://svn.apache.org/viewvc?rev=657903&view=rev
Log:
HADOOP-3339. Some of the failures on 3rd datanode in DFS write pipelie
are not detected properly. This could lead to hard failure of client's
write operation. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=657903&r1=657902&r2=657903&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 19 11:35:27 2008
@@ -278,6 +278,10 @@
     HADOOP-3396. TestDatanodeBlockScanner occationally fails. 
     (Lohit Vijayarenu via rangadi)
 
+    HADOOP-3339. Some of the failures on 3rd datanode in DFS write pipelie 
+    are not detected properly. This could lead to hard failure of client's
+    write operation. (rangadi)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=657903&r1=657902&r2=657903&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 19 11:35:27 2008
@@ -72,6 +72,7 @@
   private short defaultReplication;
   private SocketFactory socketFactory;
   private int socketTimeout;
+  private int datanodeWriteTimeout;
   final int writePacketSize;
   private FileSystem.Statistics stats;
     
@@ -145,6 +146,8 @@
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
                                      FSConstants.READ_TIMEOUT);
+    this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
+                                            FSConstants.WRITE_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
@@ -2279,7 +2282,7 @@
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
         LOG.debug("Send buf size " + s.getSendBufferSize());
         long writeTimeout = WRITE_TIMEOUT_EXTENSION * nodes.length +
-                            WRITE_TIMEOUT;
+                            datanodeWriteTimeout;
 
         //
         // Xmit header info to datanode

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=657903&r1=657902&r2=657903&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon May 19 11:35:27 2008
@@ -2137,12 +2137,23 @@
                 LOG.info("PacketResponder " + block + " " + numTargets + 
                          " Exception " + StringUtils.stringifyException(e));
                 running = false;
-                if (!didRead) {
-                  op = OP_STATUS_ERROR;
-                }
               }
             }
 
+            if (Thread.interrupted()) {
+              /* The receiver thread cancelled this thread. 
+               * We could also check any other status updates from the 
+               * receiver thread (e.g. if it is ok to write to replyOut). 
+               */
+              LOG.info("PacketResponder " + block +  " " + numTargets +
+                       " : Thread is interrupted.");
+              running = false;
+            }
+            
+            if (!didRead) {
+              op = OP_STATUS_ERROR;
+            }
+            
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {