You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/09/12 19:56:15 UTC

svn commit: r694755 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Author: rangadi
Date: Fri Sep 12 10:56:15 2008
New Revision: 694755

URL: http://svn.apache.org/viewvc?rev=694755&view=rev
Log:
HADOOP-3831. Very slow reading clients sometimes failed while reading. (rangadi)

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=694755&r1=694754&r2=694755&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 12 10:56:15 2008
@@ -537,6 +537,9 @@
     HADOOP-4112. Handles cleanupTask in JobHistory 
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3831. Very slow reading clients sometimes failed while reading.
+    (rangadi)
+
 Release 0.18.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=694755&r1=694754&r2=694755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 12 10:56:15 2008
@@ -80,6 +80,7 @@
   private int datanodeWriteTimeout;
   final int writePacketSize;
   private FileSystem.Statistics stats;
+  private int maxBlockAcquireFailures;
     
  
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
@@ -162,6 +163,9 @@
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+    this.maxBlockAcquireFailures = 
+                          conf.getInt("dfs.client.max.block.acquire.failures",
+                                      MAX_BLOCK_ACQUIRE_FAILURES);
     
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -1473,6 +1477,14 @@
     private synchronized int readBuffer(byte buf[], int off, int len) 
                                                     throws IOException {
       IOException ioe;
+      
+      /* we retry current node only once. So this is set to true only here.
+       * Intention is to handle one common case of an error that is not a
+       * failure on datanode or client : when DataNode closes the connection
+       * since client is idle. If there are other cases of "non-errors" then
+       * then a datanode might be retried by setting this to true again.
+       */
+      boolean retryCurrentNode = true;
  
       while (true) {
         // retry as many times as seekToNewSource allows.
@@ -1483,16 +1495,30 @@
                    currentNode.getName() + " at " + ce.getPos());          
           reportChecksumFailure(src, currentBlock, currentNode);
           ioe = ce;
+          retryCurrentNode = false;
         } catch ( IOException e ) {
-          LOG.warn("Exception while reading from " + currentBlock +
-                   " of " + src + " from " + currentNode + ": " +
-                   StringUtils.stringifyException(e));
+          if (!retryCurrentNode) {
+            LOG.warn("Exception while reading from " + currentBlock +
+                     " of " + src + " from " + currentNode + ": " +
+                     StringUtils.stringifyException(e));
+          }
           ioe = e;
         }
-        addToDeadNodes(currentNode);
-        if (!seekToNewSource(pos)) {
-            throw ioe;
+        boolean sourceFound = false;
+        if (retryCurrentNode) {
+          /* possibly retry the same node so that transient errors don't
+           * result in application level failures (e.g. Datanode could have
+           * closed the connection because the client is idle for too long).
+           */ 
+          sourceFound = seekToBlockSource(pos);
+        } else {
+          addToDeadNodes(currentNode);
+          sourceFound = seekToNewSource(pos);
+        }
+        if (!sourceFound) {
+          throw ioe;
         }
+        retryCurrentNode = false;
       }
     }
 
@@ -1554,7 +1580,7 @@
           return new DNAddrPair(chosenNode, targetAddr);
         } catch (IOException ie) {
           String blockInfo = block.getBlock() + " file=" + src;
-          if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+          if (failures >= maxBlockAcquireFailures) {
             throw new IOException("Could not obtain block: " + blockInfo);
           }
           
@@ -1727,6 +1753,16 @@
     }
 
     /**
+     * Same as {@link #seekToNewSource(long)} except that it does not exclude
+     * the current datanode and might connect to the same node.
+     */
+    private synchronized boolean seekToBlockSource(long targetPos)
+                                                   throws IOException {
+      currentNode = blockSeekTo(targetPos);
+      return true;
+    }
+    
+    /**
      * Seek to given position on a node other than the current node.  If
      * a node other than the current node is found, then returns true. 
      * If another node could not be found, then returns false.

Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=694755&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Sep 12 10:56:15 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import junit.framework.TestCase;
+
+
+/**
+ * These tests make sure that DFSClient retries fetching data from DFS
+ * properly in case of errors.
+ */
+public class TestDFSClientRetries extends TestCase {
+  
+  // writes 'len' bytes of data to out.
+  private static void writeData(OutputStream out, int len) throws IOException {
+    byte [] buf = new byte[4096*16];
+    while(len > 0) {
+      int toWrite = Math.min(len, buf.length);
+      out.write(buf, 0, toWrite);
+      len -= toWrite;
+    }
+  }
+  
+  /**
+   * This makes sure that when DN closes clients socket after client had
+   * successfully connected earlier, the data can still be fetched.
+   */
+  public void testWriteTimeoutAtDataNode() throws IOException,
+                                                  InterruptedException { 
+    Configuration conf = new Configuration();
+    
+    final int writeTimeout = 100; //milliseconds.
+    // set a very short write timeout for datanode, so that tests runs fast.
+    conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout); 
+    // set a smaller block size
+    final int blockSize = 10*1024*1024;
+    conf.setInt("dfs.block.size", blockSize);
+    conf.setInt("dfs.client.max.block.acquire.failures", 1);
+    // set a small buffer size
+    final int bufferSize = 4096;
+    conf.setInt("io.file.buffer.size", bufferSize);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+    
+      Path filePath = new Path("/testWriteTimeoutAtDataNode");
+      OutputStream out = fs.create(filePath, true, bufferSize);
+    
+      // write a 2 block file.
+      writeData(out, 2*blockSize);
+      out.close();
+      
+      byte[] buf = new byte[1024*1024]; // enough to empty TCP buffers.
+      
+      InputStream in = fs.open(filePath, bufferSize);
+      
+      //first read a few bytes
+      IOUtils.readFully(in, buf, 0, bufferSize/2);
+      //now read few more chunks of data by sleeping in between :
+      for(int i=0; i<10; i++) {
+        Thread.sleep(2*writeTimeout); // force write timeout at the datanode.
+        // read enough to empty out socket buffers.
+        IOUtils.readFully(in, buf, 0, buf.length); 
+      }
+      // successfully read with write timeout on datanodes.
+      in.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  // more tests related to different failure cases can be added here.
+}