You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2015/12/22 21:20:39 UTC
hadoop git commit: HDFS-7163. WebHdfsFileSystem should retry reads
according to the configured retry policy. Contributed by Eric Payne. (cherry
picked from commit 867048c3e4b20ece0039a876def129fa5eb9234f) (cherry picked
from commit 131260f0a7e8480e03b20d
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 8fad4c780 -> 263c54402
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. Contributed by Eric Payne.
(cherry picked from commit 867048c3e4b20ece0039a876def129fa5eb9234f)
(cherry picked from commit 131260f0a7e8480e03b20d6e6327d8b468a9313d)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/263c5440
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/263c5440
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/263c5440
Branch: refs/heads/branch-2.8
Commit: 263c544021d44b6ed4ec311646948610be85d197
Parents: 8fad4c7
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Dec 22 14:20:10 2015 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Dec 22 14:20:10 2015 -0600
----------------------------------------------------------------------
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 365 ++++++++++++++++++-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/namenode/FSXAttrBaseTest.java | 2 +-
.../hdfs/server/namenode/TestAuditLogs.java | 2 +-
.../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 159 ++++++++
.../hdfs/web/TestWebHdfsFileSystemContract.java | 2 +-
.../hadoop/hdfs/web/TestWebHdfsTokens.java | 6 +-
7 files changed, 525 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index b528fdb..096ba7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -18,7 +18,9 @@ res * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.hdfs.web;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -34,8 +36,11 @@ import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -44,6 +49,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
@@ -554,7 +560,7 @@ public class WebHdfsFileSystem extends FileSystem
* Also implements two-step connects for other operations redirected to
* a DN such as open and checksum
*/
- private HttpURLConnection connect(URL url) throws IOException {
+ protected HttpURLConnection connect(URL url) throws IOException {
//redirect hostname and port
String redirectHost = null;
@@ -707,7 +713,7 @@ public class WebHdfsFileSystem extends FileSystem
*/
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
private final Path fspath;
- private final Param<?,?>[] parameters;
+ private Param<?,?>[] parameters;
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
@@ -723,6 +729,10 @@ public class WebHdfsFileSystem extends FileSystem
this.parameters = parameters;
}
+ protected void updateURLParameters(Param<?, ?>... p) {
+ this.parameters = p;
+ }
+
@Override
protected URL getUrl() throws IOException {
if (excludeDatanodes.getValue() != null) {
@@ -1232,15 +1242,10 @@ public class WebHdfsFileSystem extends FileSystem
}
@Override
- public FSDataInputStream open(final Path f, final int buffersize
+ public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException {
statistics.incrementReadOps(1);
- final HttpOpParam.Op op = GetOpParam.Op.OPEN;
- // use a runner so the open can recover from an invalid token
- FsPathConnectionRunner runner =
- new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
- return new FSDataInputStream(new OffsetUrlInputStream(
- new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
+ return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
}
@Override
@@ -1521,4 +1526,346 @@ public class WebHdfsFileSystem extends FileSystem
InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs;
}
+
+ @VisibleForTesting
+ public void setRetryPolicy(RetryPolicy rp) {
+ this.retryPolicy = rp;
+ }
+
+ /**
+ * This class is used for opening, reading, and seeking files while using the
+ * WebHdfsFileSystem. This class will invoke the retry policy when performing
+ * any of these actions.
+ */
+ @VisibleForTesting
+ public class WebHdfsInputStream extends FSInputStream {
+ private ReadRunner readRunner = null;
+
+ WebHdfsInputStream(Path path, int buffersize) throws IOException {
+ // Only create the ReadRunner once. Each read's byte array and position
+ // will be updated within the ReadRunner object before every read.
+ readRunner = new ReadRunner(path, buffersize);
+ }
+
+ @Override
+ public int read() throws IOException {
+ final byte[] b = new byte[1];
+ return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ return readRunner.read(b, off, len);
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ readRunner.seek(newPos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return readRunner.getPos();
+ }
+
+ protected int getBufferSize() throws IOException {
+ return readRunner.getBufferSize();
+ }
+
+ protected Path getPath() throws IOException {
+ return readRunner.getPath();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ readRunner.close();
+ }
+
+ public void setFileLength(long len) {
+ readRunner.setFileLength(len);
+ }
+
+ public long getFileLength() {
+ return readRunner.getFileLength();
+ }
+
+ @VisibleForTesting
+ ReadRunner getReadRunner() {
+ return readRunner;
+ }
+
+ @VisibleForTesting
+ void setReadRunner(ReadRunner rr) {
+ this.readRunner = rr;
+ }
+ }
+
+ enum RunnerState {
+ DISCONNECTED, // Connection is closed programmatically by ReadRunner
+ OPEN, // Connection has been established by ReadRunner
+ SEEK, // Calling code has explicitly called seek()
+ CLOSED // Calling code has explicitly called close()
+ }
+
+ /**
+ * This class will allow retries to occur for both open and read operations.
+ * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
+ * which creates a new ReadRunner object that will be used to open a
+ * connection and read or seek into the input stream.
+ *
+ * ReadRunner is a subclass of the AbstractRunner class, which will run the
+ * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
+ * methods within a retry loop, based on the configured retry policy.
+ * ReadRunner#connect will create a connection if one has not already been
+ * created. Otherwise, it will return the previously created connection
+ * object. This is necessary because a new connection should not be created
+ * for every read.
+ * Likewise, ReadRunner#getUrl will construct a new URL object only if the
+ * connection has not previously been established. Otherwise, it will return
+ * the previously created URL object.
+ * ReadRunner#getResponse will initialize the input stream if it has not
+ * already been initialized and read the requested data from the specified
+ * input stream.
+ */
+ @VisibleForTesting
+ protected class ReadRunner extends AbstractFsPathRunner<Integer> {
+ private InputStream in = null;
+ private HttpURLConnection cachedConnection = null;
+ private byte[] readBuffer;
+ private int readOffset;
+ private int readLength;
+ private RunnerState runnerState = RunnerState.DISCONNECTED;
+ private URL originalUrl = null;
+ private URL resolvedUrl = null;
+
+ private final Path path;
+ private final int bufferSize;
+ private long pos = 0;
+ private long fileLength = 0;
+
+ /* The following methods are WebHdfsInputStream helpers. */
+
+ ReadRunner(Path p, int bs) throws IOException {
+ super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
+ this.path = p;
+ this.bufferSize = bs;
+ }
+
+ int read(byte[] b, int off, int len) throws IOException {
+ if (runnerState == RunnerState.CLOSED) {
+ throw new IOException("Stream closed");
+ }
+
+ // Before the first read, pos and fileLength will be 0 and readBuffer
+ // will all be null. They will be initialized once the first connection
+ // is made. Only after that it makes sense to compare pos and fileLength.
+ if (pos >= fileLength && readBuffer != null) {
+ return -1;
+ }
+
+ // If a seek is occurring, the input stream will have been closed, so it
+ // needs to be reopened. Use the URLRunner to call AbstractRunner#connect
+ // with the previously-cached resolved URL and with the 'redirected' flag
+ // set to 'true'. The resolved URL contains the URL of the previously
+ // opened DN as opposed to the NN. It is preferable to use the resolved
+ // URL when creating a connection because it does not hit the NN or every
+ // seek, nor does it open a connection to a new DN after every seek.
+ // The redirect flag is needed so that AbstractRunner#connect knows the
+ // URL is already resolved.
+ // Note that when the redirected flag is set, retries are not attempted.
+ // So, if the connection fails using URLRunner, clear out the connection
+ // and fall through to establish the connection using ReadRunner.
+ if (runnerState == RunnerState.SEEK) {
+ try {
+ final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
+ cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
+ } catch (IOException ioe) {
+ closeInputStream(RunnerState.DISCONNECTED);
+ }
+ }
+
+ readBuffer = b;
+ readOffset = off;
+ readLength = len;
+
+ int count = -1;
+ count = this.run();
+ if (count >= 0) {
+ statistics.incrementBytesRead(count);
+ pos += count;
+ } else if (pos < fileLength) {
+ throw new EOFException(
+ "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
+ }
+ return count;
+ }
+
+ void seek(long newPos) throws IOException {
+ if (pos != newPos) {
+ pos = newPos;
+ closeInputStream(RunnerState.SEEK);
+ }
+ }
+
+ public void close() throws IOException {
+ closeInputStream(RunnerState.CLOSED);
+ }
+
+ /* The following methods are overriding AbstractRunner methods,
+ * to be called within the retry policy context by runWithRetry.
+ */
+
+ @Override
+ protected URL getUrl() throws IOException {
+ // This method is called every time either a read is executed.
+ // The check for connection == null is to ensure that a new URL is only
+ // created upon a new connection and not for every read.
+ if (cachedConnection == null) {
+ // Update URL with current offset. BufferSize doesn't change, but it
+ // still must be included when creating the new URL.
+ updateURLParameters(new BufferSizeParam(bufferSize),
+ new OffsetParam(pos));
+ originalUrl = super.getUrl();
+ }
+ return originalUrl;
+ }
+
+ /* Only make the connection if it is not already open. Don't cache the
+ * connection here. After this method is called, runWithRetry will call
+ * validateResponse, and then call the below ReadRunner#getResponse. If
+ * the code path makes it that far, then we can cache the connection.
+ */
+ @Override
+ protected HttpURLConnection connect(URL url) throws IOException {
+ HttpURLConnection conn = cachedConnection;
+ if (conn == null) {
+ try {
+ conn = super.connect(url);
+ } catch (IOException e) {
+ closeInputStream(RunnerState.DISCONNECTED);
+ throw e;
+ }
+ }
+ return conn;
+ }
+
+ /*
+ * This method is used to perform reads within the retry policy context.
+ * This code is relying on runWithRetry to always call the above connect
+ * method and the verifyResponse method prior to calling getResponse.
+ */
+ @Override
+ Integer getResponse(final HttpURLConnection conn)
+ throws IOException {
+ try {
+ // In the "open-then-read" use case, runWithRetry will have executed
+ // ReadRunner#connect to make the connection and then executed
+ // validateResponse to validate the response code. Only then do we want
+ // to cache the connection.
+ // In the "read-after-seek" use case, the connection is made and the
+ // response is validated by the URLRunner. ReadRunner#read then caches
+ // the connection and the ReadRunner#connect will pass on the cached
+ // connection
+ // In either case, stream initialization is done here if necessary.
+ cachedConnection = conn;
+ if (in == null) {
+ in = initializeInputStream(conn);
+ }
+
+ int count = in.read(readBuffer, readOffset, readLength);
+ if (count < 0 && pos < fileLength) {
+ throw new EOFException(
+ "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
+ }
+ return Integer.valueOf(count);
+ } catch (IOException e) {
+ String redirectHost = resolvedUrl.getAuthority();
+ if (excludeDatanodes.getValue() != null) {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ + excludeDatanodes.getValue());
+ } else {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+ }
+
+ // If an exception occurs, close the input stream and null it out so
+ // that if the abstract runner decides to retry, it will reconnect.
+ closeInputStream(RunnerState.DISCONNECTED);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ InputStream initializeInputStream(HttpURLConnection conn)
+ throws IOException {
+ // Cache the resolved URL so that it can be used in the event of
+ // a future seek operation.
+ resolvedUrl = removeOffsetParam(conn.getURL());
+ final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+ InputStream inStream = conn.getInputStream();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("open file: " + conn.getURL());
+ }
+ if (cl != null) {
+ long streamLength = Long.parseLong(cl);
+ fileLength = pos + streamLength;
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ inStream = new BoundedInputStream(inStream, streamLength);
+ } else {
+ fileLength = getHdfsFileStatus(path).getLen();
+ }
+ // Wrapping in BufferedInputStream because it is more performant than
+ // BoundedInputStream by itself.
+ runnerState = RunnerState.OPEN;
+ return new BufferedInputStream(inStream, bufferSize);
+ }
+
+ // Close both the InputStream and the connection.
+ @VisibleForTesting
+ void closeInputStream(RunnerState rs) throws IOException {
+ if (in != null) {
+ IOUtils.close(cachedConnection);
+ in = null;
+ }
+ cachedConnection = null;
+ runnerState = rs;
+ }
+
+ /* Getters and Setters */
+
+ @VisibleForTesting
+ protected InputStream getInputStream() {
+ return in;
+ }
+
+ @VisibleForTesting
+ protected void setInputStream(InputStream inStream) {
+ in = inStream;
+ }
+
+ Path getPath() {
+ return path;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
+
+ long getFileLength() {
+ return fileLength;
+ }
+
+ void setFileLength(long len) {
+ fileLength = len;
+ }
+
+ long getPos() {
+ return pos;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 57f089a..0740aa7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1634,6 +1634,9 @@ Release 2.7.3 - UNRELEASED
IMPROVEMENTS
+ HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
+ retry policy. (Eric Payne via kihwal)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
index eb9053c..c526484 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java
@@ -1234,7 +1234,7 @@ public class FSXAttrBaseTest {
throws Exception {
// Test that a file with the xattr can or can't be opened.
try {
- userFs.open(filePath);
+ userFs.open(filePath).read();
assertFalse("open succeeded but expected it to fail", expectOpenFailure);
} catch (AccessControlException e) {
assertTrue("open failed but expected it to succeed", expectOpenFailure);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
index de22af7..115c0b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
@@ -287,7 +287,7 @@ public class TestAuditLogs {
setupAuditLogs();
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
- webfs.open(file);
+ webfs.open(file).read();
verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
index 22cddf4..4d4f652 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
@@ -20,13 +20,18 @@ package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -42,12 +47,14 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -58,11 +65,16 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@@ -70,6 +82,15 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/** Test WebHDFS */
public class TestWebHDFS {
static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
@@ -748,4 +769,142 @@ public class TestWebHDFS {
}
});
}
+
+ @Test(timeout=90000)
+ public void testWebHdfsReadRetries() throws Exception {
+ // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ final Path dir = new Path("/testWebHdfsReadRetries");
+
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+
+ final short numDatanodes = 1;
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDatanodes)
+ .build();
+ try {
+ cluster.waitActive();
+ final FileSystem fs = WebHdfsTestUtil
+ .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
+
+ //create a file
+ final long length = 1L << 20;
+ final Path file1 = new Path(dir, "testFile");
+
+ DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
+
+ //get file status and check that it was written properly.
+ final FileStatus s1 = fs.getFileStatus(file1);
+ assertEquals("Write failed for file " + file1, length, s1.getLen());
+
+ // Ensure file can be read through WebHdfsInputStream
+ FSDataInputStream in = fs.open(file1);
+ assertTrue("Input stream is not an instance of class WebHdfsInputStream",
+ in.getWrappedStream() instanceof WebHdfsInputStream);
+ int count = 0;
+ for(; in.read() != -1; count++);
+ assertEquals("Read failed for file " + file1, s1.getLen(), count);
+ assertEquals("Sghould not be able to read beyond end of file",
+ in.read(), -1);
+ in.close();
+ try {
+ in.read();
+ fail("Read after close should have failed");
+ } catch(IOException ioe) { }
+
+ WebHdfsFileSystem wfs = (WebHdfsFileSystem)fs;
+ // Read should not be retried if AccessControlException is encountered.
+ String msg = "ReadRetries: Test Access Control Exception";
+ testReadRetryExceptionHelper(wfs, file1,
+ new AccessControlException(msg), msg, false, 1);
+
+ // Retry policy should be invoked if IOExceptions are thrown.
+ msg = "ReadRetries: Test SocketTimeoutException";
+ testReadRetryExceptionHelper(wfs, file1,
+ new SocketTimeoutException(msg), msg, true, 5);
+ msg = "ReadRetries: Test SocketException";
+ testReadRetryExceptionHelper(wfs, file1,
+ new SocketException(msg), msg, true, 5);
+ msg = "ReadRetries: Test EOFException";
+ testReadRetryExceptionHelper(wfs, file1,
+ new EOFException(msg), msg, true, 5);
+ msg = "ReadRetries: Test Generic IO Exception";
+ testReadRetryExceptionHelper(wfs, file1,
+ new IOException(msg), msg, true, 5);
+
+ // If InvalidToken exception occurs, WebHdfs only retries if the
+ // delegation token was replaced. Do that twice, then verify by checking
+ // the number of times it tried.
+ WebHdfsFileSystem spyfs = spy(wfs);
+ when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
+ msg = "ReadRetries: Test Invalid Token Exception";
+ testReadRetryExceptionHelper(spyfs, file1,
+ new InvalidToken(msg), msg, false, 3);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ public boolean attemptedRetry;
+ private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
+ final IOException ex, String msg, boolean shouldAttemptRetry,
+ int numTimesTried)
+ throws Exception {
+ // Ovverride WebHdfsInputStream#getInputStream so that it returns
+ // an input stream that throws the specified exception when read
+ // is called.
+ FSDataInputStream in = fs.open(fn);
+ in.read(); // Connection is made only when the first read() occurs.
+ final WebHdfsInputStream webIn =
+ (WebHdfsInputStream)(in.getWrappedStream());
+
+ final InputStream spyInputStream =
+ spy(webIn.getReadRunner().getInputStream());
+ doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
+ final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
+ doReturn(spyInputStream)
+ .when(rr).initializeInputStream((HttpURLConnection) any());
+ rr.setInputStream(spyInputStream);
+ webIn.setReadRunner(rr);
+
+ // Override filesystem's retry policy in order to verify that
+ // WebHdfsInputStream is calling shouldRetry for the appropriate
+ // exceptions.
+ final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
+ final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
+ RetryPolicy rp = new RetryPolicy() {
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isIdempotentOrAtMostOnce) throws Exception {
+ attemptedRetry = true;
+ if (retries > 3) {
+ return failAction;
+ } else {
+ return retryAction;
+ }
+ }
+ };
+ fs.setRetryPolicy(rp);
+
+ // If the retry logic is exercised, attemptedRetry will be true. Some
+ // exceptions should exercise the retry logic and others should not.
+ // Either way, the value of attemptedRetry should match shouldAttemptRetry.
+ attemptedRetry = false;
+ try {
+ webIn.read();
+ fail(msg + ": Read should have thrown exception.");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains(msg));
+ }
+ assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
+ + "have called shouldRetry. ",
+ attemptedRetry, shouldAttemptRetry);
+
+ verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
+ webIn.close();
+ in.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
index c44c33e..45b363a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
@@ -184,7 +184,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
final Path p = new Path("/test/testOpenNonExistFile");
//open it as a file, should get FileNotFoundException
try {
- fs.open(p);
+ fs.open(p).read();
fail("Expected FileNotFoundException was not thrown");
} catch(FileNotFoundException fnfe) {
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/263c5440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
index d7a8717..44a6b37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
@@ -387,7 +387,9 @@ public class TestWebHdfsTokens {
reset(fs);
// verify an expired token is replaced with a new token
- fs.open(p).close();
+ InputStream is = fs.open(p);
+ is.read();
+ is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good
verify(fs, times(1)).replaceExpiredDelegationToken();
verify(fs, times(1)).getDelegationToken(null);
@@ -402,7 +404,7 @@ public class TestWebHdfsTokens {
// verify with open because it's a little different in how it
// opens connections
fs.cancelDelegationToken(fs.getRenewToken());
- InputStream is = fs.open(p);
+ is = fs.open(p);
is.read();
is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good