You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2015/12/22 21:20:39 UTC

hadoop git commit: HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. Contributed by Eric Payne. (cherry picked from commit 867048c3e4b20ece0039a876def129fa5eb9234f) (cherry picked from commit 131260f0a7e8480e03b20d

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 8fad4c780 -> 263c54402


HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. Contributed by Eric Payne.
(cherry picked from commit 867048c3e4b20ece0039a876def129fa5eb9234f)
(cherry picked from commit 131260f0a7e8480e03b20d6e6327d8b468a9313d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/263c5440
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/263c5440
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/263c5440

Branch: refs/heads/branch-2.8
Commit: 263c544021d44b6ed4ec311646948610be85d197
Parents: 8fad4c7
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Dec 22 14:20:10 2015 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Dec 22 14:20:10 2015 -0600

----------------------------------------------------------------------
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      | 365 ++++++++++++++++++-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSXAttrBaseTest.java   |   2 +-
 .../hdfs/server/namenode/TestAuditLogs.java     |   2 +-
 .../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 159 ++++++++
 .../hdfs/web/TestWebHdfsFileSystemContract.java |   2 +-
 .../hadoop/hdfs/web/TestWebHdfsTokens.java      |   6 +-
 7 files changed, 525 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index b528fdb..096ba7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -18,7 +18,9 @@ res * Licensed to the Apache Software Foundation (ASF) under one
 
 package org.apache.hadoop.hdfs.web;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,8 +36,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -44,6 +49,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DelegationTokenRenewer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
@@ -554,7 +560,7 @@ public class WebHdfsFileSystem extends FileSystem
      * Also implements two-step connects for other operations redirected to
      * a DN such as open and checksum
      */
-    private HttpURLConnection connect(URL url) throws IOException {
+    protected HttpURLConnection connect(URL url) throws IOException {
       //redirect hostname and port
       String redirectHost = null;
 
@@ -707,7 +713,7 @@ public class WebHdfsFileSystem extends FileSystem
    */
   abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
     private final Path fspath;
-    private final Param<?,?>[] parameters;
+    private Param<?,?>[] parameters;
 
     AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
         Param<?,?>... parameters) {
@@ -723,6 +729,10 @@ public class WebHdfsFileSystem extends FileSystem
       this.parameters = parameters;
     }
 
+    protected void updateURLParameters(Param<?, ?>... p) {
+      this.parameters = p;
+    }
+
     @Override
     protected URL getUrl() throws IOException {
       if (excludeDatanodes.getValue() != null) {
@@ -1232,15 +1242,10 @@ public class WebHdfsFileSystem extends FileSystem
   }
 
   @Override
-  public FSDataInputStream open(final Path f, final int buffersize
+  public FSDataInputStream open(final Path f, final int bufferSize
   ) throws IOException {
     statistics.incrementReadOps(1);
-    final HttpOpParam.Op op = GetOpParam.Op.OPEN;
-    // use a runner so the open can recover from an invalid token
-    FsPathConnectionRunner runner =
-        new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
-    return new FSDataInputStream(new OffsetUrlInputStream(
-        new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
+    return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
   }
 
   @Override
@@ -1521,4 +1526,346 @@ public class WebHdfsFileSystem extends FileSystem
   InetSocketAddress[] getResolvedNNAddr() {
     return nnAddrs;
   }
+
+  @VisibleForTesting
+  public void setRetryPolicy(RetryPolicy rp) {
+    this.retryPolicy = rp;
+  }
+
+  /**
+   * This class is used for opening, reading, and seeking files while using the
+   * WebHdfsFileSystem. This class will invoke the retry policy when performing
+   * any of these actions.
+   */
+  @VisibleForTesting
+  public class WebHdfsInputStream extends FSInputStream {
+    private ReadRunner readRunner = null;
+
+    WebHdfsInputStream(Path path, int buffersize) throws IOException {
+      // Only create the ReadRunner once. Each read's byte array and position
+      // will be updated within the ReadRunner object before every read.
+      readRunner = new ReadRunner(path, buffersize);
+    }
+
+    @Override
+    public int read() throws IOException {
+      final byte[] b = new byte[1];
+      return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+      return readRunner.read(b, off, len);
+    }
+
+    @Override
+    public void seek(long newPos) throws IOException {
+      readRunner.seek(newPos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return readRunner.getPos();
+    }
+
+    protected int getBufferSize() throws IOException {
+      return readRunner.getBufferSize();
+    }
+
+    protected Path getPath() throws IOException {
+      return readRunner.getPath();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+      readRunner.close();
+    }
+
+    public void setFileLength(long len) {
+      readRunner.setFileLength(len);
+    }
+
+    public long getFileLength() {
+      return readRunner.getFileLength();
+    }
+
+    @VisibleForTesting
+    ReadRunner getReadRunner() {
+      return readRunner;
+    }
+
+    @VisibleForTesting
+    void setReadRunner(ReadRunner rr) {
+      this.readRunner = rr;
+    }
+  }
+
+  enum RunnerState {
+    DISCONNECTED, // Connection is closed programmatically by ReadRunner
+    OPEN,         // Connection has been established by ReadRunner
+    SEEK,         // Calling code has explicitly called seek()
+    CLOSED        // Calling code has explicitly called close()
+    }
+
+  /**
+   * This class will allow retries to occur for both open and read operations.
+   * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
+   * which creates a new ReadRunner object that will be used to open a
+   * connection and read or seek into the input stream.
+   *
+   * ReadRunner is a subclass of the AbstractRunner class, which will run the
+   * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
+   * methods within a retry loop, based on the configured retry policy.
+   * ReadRunner#connect will create a connection if one has not already been
+   * created. Otherwise, it will return the previously created connection
+   * object. This is necessary because a new connection should not be created
+   * for every read.
+   * Likewise, ReadRunner#getUrl will construct a new URL object only if the
+   * connection has not previously been established. Otherwise, it will return
+   * the previously created URL object.
+   * ReadRunner#getResponse will initialize the input stream if it has not
+   * already been initialized and read the requested data from the specified
+   * input stream.
+   */
+  @VisibleForTesting
+  protected class ReadRunner extends AbstractFsPathRunner<Integer> {
+    private InputStream in = null;
+    private HttpURLConnection cachedConnection = null;
+    private byte[] readBuffer;
+    private int readOffset;
+    private int readLength;
+    private RunnerState runnerState = RunnerState.DISCONNECTED;
+    private URL originalUrl = null;
+    private URL resolvedUrl = null;
+
+    private final Path path;
+    private final int bufferSize;
+    private long pos = 0;
+    private long fileLength = 0;
+
+    /* The following methods are WebHdfsInputStream helpers. */
+
+    ReadRunner(Path p, int bs) throws IOException {
+      super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
+      this.path = p;
+      this.bufferSize = bs;
+    }
+
+    int read(byte[] b, int off, int len) throws IOException {
+      if (runnerState == RunnerState.CLOSED) {
+        throw new IOException("Stream closed");
+      }
+
+      // Before the first read, pos and fileLength will be 0 and readBuffer
+      // will all be null. They will be initialized once the first connection
+      // is made. Only after that it makes sense to compare pos and fileLength.
+      if (pos >= fileLength && readBuffer != null) {
+        return -1;
+      }
+
+      // If a seek is occurring, the input stream will have been closed, so it
+      // needs to be reopened. Use the URLRunner to call AbstractRunner#connect
+      // with the previously-cached resolved URL and with the 'redirected' flag
+      // set to 'true'. The resolved URL contains the URL of the previously
+      // opened DN as opposed to the NN. It is preferable to use the resolved
+      // URL when creating a connection because it does not hit the NN or every
+      // seek, nor does it open a connection to a new DN after every seek.
+      // The redirect flag is needed so that AbstractRunner#connect knows the
+      // URL is already resolved.
+      // Note that when the redirected flag is set, retries are not attempted.
+      // So, if the connection fails using URLRunner, clear out the connection
+      // and fall through to establish the connection using ReadRunner.
+      if (runnerState == RunnerState.SEEK) {
+        try {
+          final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
+          cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
+        } catch (IOException ioe) {
+          closeInputStream(RunnerState.DISCONNECTED);
+        }
+      }
+
+      readBuffer = b;
+      readOffset = off;
+      readLength = len;
+
+      int count = -1;
+      count = this.run();
+      if (count >= 0) {
+        statistics.incrementBytesRead(count);
+        pos += count;
+      } else if (pos < fileLength) {
+        throw new EOFException(
+                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
+      }
+      return count;
+    }
+
+    void seek(long newPos) throws IOException {
+      if (pos != newPos) {
+        pos = newPos;
+        closeInputStream(RunnerState.SEEK);
+      }
+    }
+
+    public void close() throws IOException {
+      closeInputStream(RunnerState.CLOSED);
+    }
+
+    /* The following methods are overriding AbstractRunner methods,
+     * to be called within the retry policy context by runWithRetry.
+     */
+
+    @Override
+    protected URL getUrl() throws IOException {
+      // This method is called every time either a read is executed.
+      // The check for connection == null is to ensure that a new URL is only
+      // created upon a new connection and not for every read.
+      if (cachedConnection == null) {
+        // Update URL with current offset. BufferSize doesn't change, but it
+        // still must be included when creating the new URL.
+        updateURLParameters(new BufferSizeParam(bufferSize),
+            new OffsetParam(pos));
+        originalUrl = super.getUrl();
+      }
+      return originalUrl;
+    }
+
+    /* Only make the connection if it is not already open. Don't cache the
+     * connection here. After this method is called, runWithRetry will call
+     * validateResponse, and then call the below ReadRunner#getResponse. If
+     * the code path makes it that far, then we can cache the connection.
+     */
+    @Override
+    protected HttpURLConnection connect(URL url) throws IOException {
+      HttpURLConnection conn = cachedConnection;
+      if (conn == null) {
+        try {
+          conn = super.connect(url);
+        } catch (IOException e) {
+          closeInputStream(RunnerState.DISCONNECTED);
+          throw e;
+        }
+      }
+      return conn;
+    }
+
+    /*
+     * This method is used to perform reads within the retry policy context.
+     * This code is relying on runWithRetry to always call the above connect
+     * method and the verifyResponse method prior to calling getResponse.
+     */
+    @Override
+    Integer getResponse(final HttpURLConnection conn)
+        throws IOException {
+      try {
+        // In the "open-then-read" use case, runWithRetry will have executed
+        // ReadRunner#connect to make the connection and then executed
+        // validateResponse to validate the response code. Only then do we want
+        // to cache the connection.
+        // In the "read-after-seek" use case, the connection is made and the
+        // response is validated by the URLRunner. ReadRunner#read then caches
+        // the connection and the ReadRunner#connect will pass on the cached
+        // connection
+        // In either case, stream initialization is done here if necessary.
+        cachedConnection = conn;
+        if (in == null) {
+          in = initializeInputStream(conn);
+        }
+
+        int count = in.read(readBuffer, readOffset, readLength);
+        if (count < 0 && pos < fileLength) {
+          throw new EOFException(
+                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
+        }
+        return Integer.valueOf(count);
+      } catch (IOException e) {
+        String redirectHost = resolvedUrl.getAuthority();
+        if (excludeDatanodes.getValue() != null) {
+          excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+              + excludeDatanodes.getValue());
+        } else {
+          excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+        }
+
+        // If an exception occurs, close the input stream and null it out so
+        // that if the abstract runner decides to retry, it will reconnect.
+        closeInputStream(RunnerState.DISCONNECTED);
+        throw e;
+      }
+    }
+
+    @VisibleForTesting
+    InputStream initializeInputStream(HttpURLConnection conn)
+        throws IOException {
+      // Cache the resolved URL so that it can be used in the event of
+      // a future seek operation.
+      resolvedUrl = removeOffsetParam(conn.getURL());
+      final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+      InputStream inStream = conn.getInputStream();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("open file: " + conn.getURL());
+      }
+      if (cl != null) {
+        long streamLength = Long.parseLong(cl);
+        fileLength = pos + streamLength;
+        // Java has a bug with >2GB request streams.  It won't bounds check
+        // the reads so the transfer blocks until the server times out
+        inStream = new BoundedInputStream(inStream, streamLength);
+      } else {
+        fileLength = getHdfsFileStatus(path).getLen();
+      }
+      // Wrapping in BufferedInputStream because it is more performant than
+      // BoundedInputStream by itself.
+      runnerState = RunnerState.OPEN;
+      return new BufferedInputStream(inStream, bufferSize);
+    }
+
+    // Close both the InputStream and the connection.
+    @VisibleForTesting
+    void closeInputStream(RunnerState rs) throws IOException {
+      if (in != null) {
+        IOUtils.close(cachedConnection);
+        in = null;
+      }
+      cachedConnection = null;
+      runnerState = rs;
+    }
+
+    /* Getters and Setters */
+
+    @VisibleForTesting
+    protected InputStream getInputStream() {
+      return in;
+    }
+
+    @VisibleForTesting
+    protected void setInputStream(InputStream inStream) {
+      in = inStream;
+    }
+
+    Path getPath() {
+      return path;
+    }
+
+    int getBufferSize() {
+      return bufferSize;
+    }
+
+    long getFileLength() {
+      return fileLength;
+    }
+
+    void setFileLength(long len) {
+      fileLength = len;
+    }
+
+    long getPos() {
+      return pos;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 57f089a..0740aa7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1634,6 +1634,9 @@ Release 2.7.3 - UNRELEASED
 
   IMPROVEMENTS
 
+    HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
+    retry policy. (Eric Payne via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
index eb9053c..c526484 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
@@ -1234,7 +1234,7 @@ public class FSXAttrBaseTest {
       throws Exception {
     // Test that a file with the xattr can or can't be opened.
     try {
-      userFs.open(filePath);
+      userFs.open(filePath).read();
       assertFalse("open succeeded but expected it to fail", expectOpenFailure);
     } catch (AccessControlException e) {
       assertTrue("open failed but expected it to succeed", expectOpenFailure);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
index de22af7..115c0b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
@@ -287,7 +287,7 @@ public class TestAuditLogs {
     setupAuditLogs();
 
     WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
-    webfs.open(file);
+    webfs.open(file).read();
 
     verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
index 22cddf4..4d4f652 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
@@ -20,13 +20,18 @@ package org.apache.hadoop.hdfs.web;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -42,12 +47,14 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -58,11 +65,16 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
 import org.apache.hadoop.hdfs.web.resources.LengthParam;
 import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -70,6 +82,15 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /** Test WebHDFS */
 public class TestWebHDFS {
   static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
@@ -748,4 +769,142 @@ public class TestWebHDFS {
       }
     });
   }
+
+  @Test(timeout=90000)
+  public void testWebHdfsReadRetries() throws Exception {
+    // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    final Path dir = new Path("/testWebHdfsReadRetries");
+
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+
+    final short numDatanodes = 1;
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes)
+        .build();
+    try {
+      cluster.waitActive();
+      final FileSystem fs = WebHdfsTestUtil
+                .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
+
+      //create a file
+      final long length = 1L << 20;
+      final Path file1 = new Path(dir, "testFile");
+
+      DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
+
+      //get file status and check that it was written properly.
+      final FileStatus s1 = fs.getFileStatus(file1);
+      assertEquals("Write failed for file " + file1, length, s1.getLen());
+
+      // Ensure file can be read through WebHdfsInputStream
+      FSDataInputStream in = fs.open(file1);
+      assertTrue("Input stream is not an instance of class WebHdfsInputStream",
+          in.getWrappedStream() instanceof WebHdfsInputStream);
+      int count = 0;
+      for(; in.read() != -1; count++);
+      assertEquals("Read failed for file " + file1, s1.getLen(), count);
+      assertEquals("Sghould not be able to read beyond end of file",
+          in.read(), -1);
+      in.close();
+      try {
+        in.read();
+        fail("Read after close should have failed");
+      } catch(IOException ioe) { }
+
+      WebHdfsFileSystem wfs = (WebHdfsFileSystem)fs;
+      // Read should not be retried if AccessControlException is encountered.
+      String msg = "ReadRetries: Test Access Control Exception";
+      testReadRetryExceptionHelper(wfs, file1,
+                          new AccessControlException(msg), msg, false, 1);
+
+      // Retry policy should be invoked if IOExceptions are thrown.
+      msg = "ReadRetries: Test SocketTimeoutException";
+      testReadRetryExceptionHelper(wfs, file1,
+                          new SocketTimeoutException(msg), msg, true, 5);
+      msg = "ReadRetries: Test SocketException";
+      testReadRetryExceptionHelper(wfs, file1,
+                          new SocketException(msg), msg, true, 5);
+      msg = "ReadRetries: Test EOFException";
+      testReadRetryExceptionHelper(wfs, file1,
+                          new EOFException(msg), msg, true, 5);
+      msg = "ReadRetries: Test Generic IO Exception";
+      testReadRetryExceptionHelper(wfs, file1,
+                          new IOException(msg), msg, true, 5);
+
+      // If InvalidToken exception occurs, WebHdfs only retries if the
+      // delegation token was replaced. Do that twice, then verify by checking
+      // the number of times it tried.
+      WebHdfsFileSystem spyfs = spy(wfs);
+      when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
+      msg = "ReadRetries: Test Invalid Token Exception";
+      testReadRetryExceptionHelper(spyfs, file1,
+                          new InvalidToken(msg), msg, false, 3);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  public boolean attemptedRetry;
+  private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
+      final IOException ex, String msg, boolean shouldAttemptRetry,
+      int numTimesTried)
+      throws Exception {
+    // Ovverride WebHdfsInputStream#getInputStream so that it returns
+    // an input stream that throws the specified exception when read
+    // is called.
+    FSDataInputStream in = fs.open(fn);
+    in.read(); // Connection is made only when the first read() occurs.
+    final WebHdfsInputStream webIn =
+        (WebHdfsInputStream)(in.getWrappedStream());
+
+    final InputStream spyInputStream =
+        spy(webIn.getReadRunner().getInputStream());
+    doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
+    final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
+    doReturn(spyInputStream)
+        .when(rr).initializeInputStream((HttpURLConnection) any());
+    rr.setInputStream(spyInputStream);
+    webIn.setReadRunner(rr);
+
+    // Override filesystem's retry policy in order to verify that
+    // WebHdfsInputStream is calling shouldRetry for the appropriate
+    // exceptions.
+    final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
+    final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
+    RetryPolicy rp = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception e, int retries, int failovers,
+          boolean isIdempotentOrAtMostOnce) throws Exception {
+        attemptedRetry = true;
+       if (retries > 3) {
+          return failAction;
+        } else {
+          return retryAction;
+        }
+      }
+    };
+    fs.setRetryPolicy(rp);
+
+    // If the retry logic is exercised, attemptedRetry will be true. Some
+    // exceptions should exercise the retry logic and others should not.
+    // Either way, the value of attemptedRetry should match shouldAttemptRetry.
+    attemptedRetry = false;
+    try {
+      webIn.read();
+      fail(msg + ": Read should have thrown exception.");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(msg));
+    }
+    assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
+                + "have called shouldRetry. ",
+        attemptedRetry, shouldAttemptRetry);
+
+    verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
+    webIn.close();
+    in.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
index c44c33e..45b363a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
@@ -184,7 +184,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
     final Path p = new Path("/test/testOpenNonExistFile");
     //open it as a file, should get FileNotFoundException 
     try {
-      fs.open(p);
+      fs.open(p).read();
       fail("Expected FileNotFoundException was not thrown");
     } catch(FileNotFoundException fnfe) {
       WebHdfsFileSystem.LOG.info("This is expected.", fnfe);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
index d7a8717..44a6b37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
@@ -387,7 +387,9 @@ public class TestWebHdfsTokens {
     reset(fs);
 
     // verify an expired token is replaced with a new token
-    fs.open(p).close();
+    InputStream is = fs.open(p);
+    is.read();
+    is.close();
     verify(fs, times(2)).getDelegationToken(); // first bad, then good
     verify(fs, times(1)).replaceExpiredDelegationToken();
     verify(fs, times(1)).getDelegationToken(null);
@@ -402,7 +404,7 @@ public class TestWebHdfsTokens {
     // verify with open because it's a little different in how it
     // opens connections
     fs.cancelDelegationToken(fs.getRenewToken());
-    InputStream is = fs.open(p);
+    is = fs.open(p);
     is.read();
     is.close();
     verify(fs, times(2)).getDelegationToken(); // first bad, then good