You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:51:10 UTC

svn commit: r1077199 - in /hadoop/common/branches/branch-0.20-security-patches: ./ .eclipse.templates/ src/hdfs/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/

Author: omalley
Date: Fri Mar  4 03:51:09 2011
New Revision: 1077199

URL: http://svn.apache.org/viewvc?rev=1077199&view=rev
Log:
commit 8a97b0fdad2a9d310179b3fd94d1d82b246fa98b
Author: Suresh Srinivas <su...@yahoo-inc.com>
Date:   Mon Feb 22 17:23:05 2010 -0800

    Promoting commit a469ff2eff573cb109c15f6a71753a3370c1ebf0 HDFS-927 DFSInputStream retries too many times for new block locations from origin/tags/release-0.20.2-rc4 into yahoo-hadoop-0.20.1xx.
    
    git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@908204 13f79535-47bb-0310-9956-ffa450edef68

Modified:
    hadoop/common/branches/branch-0.20-security-patches/.eclipse.templates/.classpath
    hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-patches/ivy.xml
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/common/branches/branch-0.20-security-patches/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/.eclipse.templates/.classpath?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/.eclipse.templates/.classpath (original)
+++ hadoop/common/branches/branch-0.20-security-patches/.eclipse.templates/.classpath Fri Mar  4 03:51:09 2011
@@ -29,6 +29,7 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jets3t-0.6.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/junit-3.8.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/log4j-1.2.15.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/mockito-all-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/oro-2.0.8.jar"/>
   	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-6.1.14.jar"/>
   	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-util-6.1.14.jar"/>

Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar  4 03:51:09 2011
@@ -10,6 +10,9 @@ Release 0.20.2 - Unreleased
     HADOOP-6498. IPC client bug may cause rpc call hang. (Ruyue Ma and hairong
     via hairong)
 
+    HDFS-927. DFSInputStream retries too many times for new block locations
+    (Todd Lipcon via Stack)
+
 Release 0.20.1 - 2009-09-01
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20-security-patches/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/ivy.xml?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/ivy.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/ivy.xml Fri Mar  4 03:51:09 2011
@@ -275,6 +275,5 @@
       rev="${aspectj.version}"
       conf="common->default">
     </dependency>
-    </dependencies>
-  
+  </dependencies>
 </ivy-module>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 03:51:09 2011
@@ -186,9 +186,7 @@ public class DFSClient implements FSCons
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
-    this.maxBlockAcquireFailures = 
-                          conf.getInt("dfs.client.max.block.acquire.failures",
-                                      MAX_BLOCK_ACQUIRE_FAILURES);
+    this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
     
     ugi = UserGroupInformation.getCurrentUser();
 
@@ -214,6 +212,11 @@ public class DFSClient implements FSCons
     }
   }
 
+  static int getMaxBlockAcquireFailures(Configuration conf) {
+    return conf.getInt("dfs.client.max.block.acquire.failures",
+                       MAX_BLOCK_ACQUIRE_FAILURES);
+  }
+
   private void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
@@ -1500,6 +1503,18 @@ public class DFSClient implements FSCons
     private Block currentBlock = null;
     private long pos = 0;
     private long blockEnd = -1;
+
+    /**
+     * This variable tracks the number of failures since the start of the
+     * most recent user-facing operation. That is to say, it should be reset
+     * whenever the user makes a call on this stream, and if at any point
+     * during the retry logic, the failure count exceeds a threshold,
+     * the errors will be thrown back to the operation.
+     *
+     * Specifically this counts the number of times the client has gone
+     * back to the namenode to get a new list of block locations, and is
+     * capped at maxBlockAcquireFailures
+     */
     private int failures = 0;
 
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
@@ -1831,6 +1846,8 @@ public class DFSClient implements FSCons
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
+
       if (pos < getFileLength()) {
         int retries = 2;
         while (retries > 0) {
@@ -1986,6 +2003,7 @@ public class DFSClient implements FSCons
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
       long filelen = getFileLength();
       if ((position < 0) || (position >= filelen)) {
         return -1;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Mar  4 03:51:09 2011
@@ -23,11 +23,13 @@ import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import junit.framework.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * A JUnit test for corrupted file handling.
@@ -57,17 +59,7 @@ import org.apache.hadoop.fs.Path;
  *     replica was created from the non-corrupted replica.
  */
 public class TestCrcCorruption extends TestCase {
-  
-  public TestCrcCorruption(String testName) {
-    super(testName);
-  }
-
-  protected void setUp() throws Exception {
-  }
 
-  protected void tearDown() throws Exception {
-  }
-  
   /** 
    * check if DFS can handle corrupted CRC blocks
    */
@@ -222,4 +214,57 @@ public class TestCrcCorruption extends T
     DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400);
     thistest(conf2, util2);
   }
+
+  /**
+   * Make a single-DN cluster, corrupt a block, and make sure
+   * there's no infinite loop, but rather it eventually
+   * reports the exception to the client.
+   */
+  public void testEntirelyCorruptFileOneNode() throws Exception {
+    doTestEntirelyCorruptFile(1);
+  }
+
+  /**
+   * Same thing with multiple datanodes - in history, this has
+   * behaved differently than the above.
+   *
+   * This test usually completes in around 15 seconds - if it
+   * times out, this suggests that the client is retrying
+   * indefinitely.
+   */
+  public void testEntirelyCorruptFileThreeNodes() throws Exception {
+    doTestEntirelyCorruptFile(3);
+  }
+
+  private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication", numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
+      DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+
+      String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+      cluster.corruptBlockOnDataNodes(block);
+
+      try {
+        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1077199&r1=1077198&r2=1077199&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Mar  4 03:51:09 2011
@@ -20,16 +20,20 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.*;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.AccessControlException;
@@ -39,7 +43,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 
 import junit.framework.TestCase;
-
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * These tests make sure that DFSClient retries fetching data from DFS
@@ -251,5 +257,124 @@ public class TestDFSClientRetries extend
            e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
     }
   }
-  
+
+  /**
+   * This tests that DFSInputStream failures are counted for a given read
+   * operation, and not over the lifetime of the stream. It is a regression
+   * test for HDFS-127.
+   */
+  public void testFailuresArePerOperation() throws Exception
+  {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
+    assertTrue(maxBlockAcquires > 0);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/);
+
+      // If the client will retry maxBlockAcquires times, then if we fail
+      // any more than that number of times, the operation should entirely
+      // fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      try {
+        IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+      // If we fail exactly that many times, then it should succeed.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                        true);
+
+      DFSClient.LOG.info("Starting test case for failure reset");
+
+      // Now the tricky case - if we fail a few times on one read, then succeed,
+      // then fail some more on another read, it shouldn't fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      DFSInputStream is = client.open(file.toString());
+      byte buf[] = new byte[10];
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+      DFSClient.LOG.info("First read successful after some failures.");
+
+      // Further reads at this point will succeed since it has the good block locations.
+      // So, force the block locations on this stream to be refreshed from bad info.
+      // When reading again, it should start from a fresh failure count, since
+      // we're starting a new operation on the user level.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      is.openInfo();
+      // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
+      // just keep reading on the existing stream and the fact that we've poisoned
+      // the block info won't do anything.
+      is.seek(0);
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Mock Answer implementation of NN.getBlockLocations that will return
+   * a poisoned block list a certain number of times before returning
+   * a proper one.
+   */
+  private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
+    private int failuresLeft;
+    private NameNode realNN;
+
+    public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+      failuresLeft = timesToFail;
+      this.realNN = realNN;
+    }
+
+    public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
+      Object args[] = invocation.getArguments();
+      LocatedBlocks realAnswer = realNN.getBlockLocations(
+        (String)args[0],
+        (Long)args[1],
+        (Long)args[2]);
+
+      if (failuresLeft-- > 0) {
+        NameNode.LOG.info("FailNTimesAnswer injecting failure.");
+        return makeBadBlockList(realAnswer);
+      }
+      NameNode.LOG.info("FailNTimesAnswer no longer failing.");
+      return realAnswer;
+    }
+
+    private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
+      LocatedBlock goodLocatedBlock = goodBlockList.get(0);
+      LocatedBlock badLocatedBlock = new LocatedBlock(
+        goodLocatedBlock.getBlock(),
+        new DatanodeInfo[] {
+          new DatanodeInfo(new DatanodeID("255.255.255.255:234"))
+        },
+        goodLocatedBlock.getStartOffset(),
+        false);
+
+
+      List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
+      badBlocks.add(badLocatedBlock);
+      return new LocatedBlocks(goodBlockList.getFileLength(), badBlocks, false);
+    }
+  }
 }