You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/08/01 03:41:24 UTC
svn commit: r1367841 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/web/
src/main/java/org/apache/hadoop/hdfs/web/resources/
src/test/java/org/apache/hadoop/hdfs/...
Author: szetszwo
Date: Wed Aug 1 01:41:23 2012
New Revision: 1367841
URL: http://svn.apache.org/viewvc?rev=1367841&view=rev
Log:
HDFS-3667. Add retry support to WebHdfsFileSystem.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 1 01:41:23 2012
@@ -360,6 +360,8 @@ Branch-2 ( Unreleased changes )
HDFS-3650. Use MutableQuantiles to provide latency histograms for various
operations. (Andrew Wang via atm)
+ HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo)
+
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Aug 1 01:41:23 2012
@@ -57,9 +57,9 @@ public abstract class ByteRangeInputStre
return url;
}
- protected abstract HttpURLConnection openConnection() throws IOException;
-
- protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
+ /** Connect to server with a data offset. */
+ protected abstract HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException;
}
enum StreamStatus {
@@ -85,9 +85,6 @@ public abstract class ByteRangeInputStre
this.resolvedURL = r;
}
- protected abstract void checkResponseCode(final HttpURLConnection connection
- ) throws IOException;
-
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
@@ -113,13 +110,10 @@ public abstract class ByteRangeInputStre
protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
- final URLOpener opener =
- (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
- final HttpURLConnection connection = opener.openConnection(startPos);
- connection.connect();
- checkResponseCode(connection);
+ final boolean resolved = resolvedURL.getURL() != null;
+ final URLOpener opener = resolved? resolvedURL: originalURL;
+ final HttpURLConnection connection = opener.connect(startPos, resolved);
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
if (cl == null) {
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Aug 1 01:41:23 2012
@@ -342,19 +342,28 @@ public class HftpFileSystem extends File
super(url);
}
- @Override
protected HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)URLUtils.openConnection(url);
}
/** Use HTTP Range header for specifying offset. */
@Override
- protected HttpURLConnection openConnection(final long offset) throws IOException {
+ protected HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException {
final HttpURLConnection conn = openConnection();
conn.setRequestMethod("GET");
if (offset != 0L) {
conn.setRequestProperty("Range", "bytes=" + offset + "-");
}
+ conn.connect();
+
+ //Expects HTTP_OK or HTTP_PARTIAL response codes.
+ final int code = conn.getResponseCode();
+ if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
+ throw new IOException("HTTP_PARTIAL expected, received " + code);
+ } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
+ throw new IOException("HTTP_OK expected, received " + code);
+ }
return conn;
}
}
@@ -368,22 +377,6 @@ public class HftpFileSystem extends File
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
}
- /** Expects HTTP_OK and HTTP_PARTIAL response codes. */
- @Override
- protected void checkResponseCode(final HttpURLConnection connection
- ) throws IOException {
- final int code = connection.getResponseCode();
- if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
- // We asked for a byte range but did not receive a partial content
- // response...
- throw new IOException("HTTP_PARTIAL expected, received " + code);
- } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
- // We asked for all bytes from the beginning but didn't receive a 200
- // response (none of the other 2xx codes are valid here)
- throw new IOException("HTTP_OK expected, received " + code);
- }
- }
-
@Override
protected URL getResolvedUrl(final HttpURLConnection connection) {
return connection.getURL();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Wed Aug 1 01:41:23 2012
@@ -259,7 +259,7 @@ public class NameNodeProxies {
*
* Note that dfs.client.retry.max < 0 is not allowed.
*/
- private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+ public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
@@ -300,6 +300,13 @@ public class NameNodeProxies {
+ p.getClass().getSimpleName() + ", exception=" + e);
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
}
+
+ @Override
+ public String toString() {
+ return "RetryPolicy[" + multipleLinearRandomRetry + ", "
+ + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
+ + "]";
+ }
};
}
}
@@ -335,7 +342,7 @@ public class NameNodeProxies {
boolean withRetries) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
- final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+ final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Aug 1 01:41:23 2012
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends F
private URI uri;
private Token<?> delegationToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
+ private RetryPolicy retryPolicy = null;
private Path workingDir;
{
@@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends F
throw new IllegalArgumentException(e);
}
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
+ this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
this.workingDir = getHomeDirectory();
if (UserGroupInformation.isSecurityEnabled()) {
@@ -276,39 +280,64 @@ public class WebHdfsFileSystem extends F
}
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
- final HttpURLConnection conn) throws IOException {
+ final HttpURLConnection conn, boolean unwrapException) throws IOException {
final int code = conn.getResponseCode();
if (code != op.getExpectedHttpResponseCode()) {
final Map<?, ?> m;
try {
m = jsonParse(conn, true);
- } catch(IOException e) {
+ } catch(Exception e) {
throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ ", message=" + conn.getResponseMessage(), e);
}
- if (m.get(RemoteException.class.getSimpleName()) == null) {
+ if (m == null) {
+ throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ + ", message=" + conn.getResponseMessage());
+ } else if (m.get(RemoteException.class.getSimpleName()) == null) {
return m;
}
final RemoteException re = JsonUtil.toRemoteException(m);
- throw re.unwrapRemoteException(AccessControlException.class,
- InvalidToken.class,
- AuthenticationException.class,
- AuthorizationException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- UnresolvedPathException.class,
- SafeModeException.class,
- DSQuotaExceededException.class,
- NSQuotaExceededException.class);
+ throw unwrapException? toIOException(re): re;
}
return null;
}
/**
+ * Covert an exception to an IOException.
+ *
+ * For a non-IOException, wrap it with IOException.
+ * For a RemoteException, unwrap it.
+ * For an IOException which is not a RemoteException, return it.
+ */
+ private static IOException toIOException(Exception e) {
+ if (!(e instanceof IOException)) {
+ return new IOException(e);
+ }
+
+ final IOException ioe = (IOException)e;
+ if (!(ioe instanceof RemoteException)) {
+ return ioe;
+ }
+
+ final RemoteException re = (RemoteException)ioe;
+ return re.unwrapRemoteException(AccessControlException.class,
+ InvalidToken.class,
+ AuthenticationException.class,
+ AuthorizationException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ UnresolvedPathException.class,
+ SafeModeException.class,
+ DSQuotaExceededException.class,
+ NSQuotaExceededException.class);
+ }
+
+ /**
* Return a URL pointing to given path on the namenode.
*
* @param path to obtain the URL for
@@ -362,70 +391,15 @@ public class WebHdfsFileSystem extends F
}
private HttpURLConnection getHttpUrlConnection(URL url)
- throws IOException {
+ throws IOException, AuthenticationException {
final HttpURLConnection conn;
- try {
- if (ugi.hasKerberosCredentials()) {
- conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
- } else {
- conn = (HttpURLConnection)url.openConnection();
- }
- } catch (AuthenticationException e) {
- throw new IOException("Authentication failed, url=" + url, e);
+ if (ugi.hasKerberosCredentials()) {
+ conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
+ } else {
+ conn = (HttpURLConnection)url.openConnection();
}
return conn;
}
-
- private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
- final Param<?,?>... parameters) throws IOException {
- final URL url = toUrl(op, fspath, parameters);
-
- //connect and get response
- HttpURLConnection conn = getHttpUrlConnection(url);
- try {
- conn.setRequestMethod(op.getType().toString());
- if (op.getDoOutput()) {
- conn = twoStepWrite(conn, op);
- conn.setRequestProperty("Content-Type", "application/octet-stream");
- }
- conn.setDoOutput(op.getDoOutput());
- conn.connect();
- return conn;
- } catch (IOException e) {
- conn.disconnect();
- throw e;
- }
- }
-
- /**
- * Two-step Create/Append:
- * Step 1) Submit a Http request with neither auto-redirect nor data.
- * Step 2) Submit another Http request with the URL from the Location header with data.
- *
- * The reason of having two-step create/append is for preventing clients to
- * send out the data before the redirect. This issue is addressed by the
- * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
- * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
- * and Java 6 http client), which do not correctly implement "Expect:
- * 100-continue". The two-step create/append is a temporary workaround for
- * the software library bugs.
- */
- static HttpURLConnection twoStepWrite(HttpURLConnection conn,
- final HttpOpParam.Op op) throws IOException {
- //Step 1) Submit a Http request with neither auto-redirect nor data.
- conn.setInstanceFollowRedirects(false);
- conn.setDoOutput(false);
- conn.connect();
- validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn);
- final String redirect = conn.getHeaderField("Location");
- conn.disconnect();
-
- //Step 2) Submit another Http request with the URL from the Location header with data.
- conn = (HttpURLConnection)new URL(redirect).openConnection();
- conn.setRequestMethod(op.getType().toString());
- conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
- return conn;
- }
/**
* Run a http operation.
@@ -439,12 +413,161 @@ public class WebHdfsFileSystem extends F
*/
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
final Param<?,?>... parameters) throws IOException {
- final HttpURLConnection conn = httpConnect(op, fspath, parameters);
- try {
- final Map<?, ?> m = validateResponse(op, conn);
- return m != null? m: jsonParse(conn, false);
- } finally {
- conn.disconnect();
+ return new Runner(op, fspath, parameters).run().json;
+ }
+
+ /**
+ * This class is for initialing a HTTP connection, connecting to server,
+ * obtaining a response, and also handling retry on failures.
+ */
+ class Runner {
+ private final HttpOpParam.Op op;
+ private final URL url;
+ private final boolean redirected;
+
+ private boolean checkRetry;
+ private HttpURLConnection conn = null;
+ private Map<?, ?> json = null;
+
+ Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
+ this.op = op;
+ this.url = url;
+ this.redirected = redirected;
+ }
+
+ Runner(final HttpOpParam.Op op, final Path fspath,
+ final Param<?,?>... parameters) throws IOException {
+ this(op, toUrl(op, fspath, parameters), false);
+ }
+
+ Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
+ this(op, null, false);
+ this.conn = conn;
+ }
+
+ private void init() throws IOException {
+ checkRetry = !redirected;
+ try {
+ conn = getHttpUrlConnection(url);
+ } catch(AuthenticationException ae) {
+ checkRetry = false;
+ throw new IOException("Authentication failed, url=" + url, ae);
+ }
+ }
+
+ private void connect() throws IOException {
+ connect(op.getDoOutput());
+ }
+
+ private void connect(boolean doOutput) throws IOException {
+ conn.setRequestMethod(op.getType().toString());
+ conn.setDoOutput(doOutput);
+ conn.setInstanceFollowRedirects(false);
+ conn.connect();
+ }
+
+ private void disconnect() {
+ if (conn != null) {
+ conn.disconnect();
+ conn = null;
+ }
+ }
+
+ Runner run() throws IOException {
+ for(int retry = 0; ; retry++) {
+ try {
+ init();
+ if (op.getDoOutput()) {
+ twoStepWrite();
+ } else {
+ getResponse(op != GetOpParam.Op.OPEN);
+ }
+ return this;
+ } catch(IOException ioe) {
+ shouldRetry(ioe, retry);
+ }
+ }
+ }
+
+ private void shouldRetry(final IOException ioe, final int retry
+ ) throws IOException {
+ if (checkRetry) {
+ try {
+ final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
+ ioe, retry, 0, true);
+ if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+ LOG.info("Retrying connect to namenode: " + nnAddr
+ + ". Already tried " + retry + " time(s); retry policy is "
+ + retryPolicy + ", delay " + a.delayMillis + "ms.");
+ Thread.sleep(a.delayMillis);
+ return;
+ }
+ } catch(Exception e) {
+ LOG.warn("Original exception is ", ioe);
+ throw toIOException(e);
+ }
+ }
+ throw toIOException(ioe);
+ }
+
+ /**
+ * Two-step Create/Append:
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
+ * Step 2) Submit another Http request with the URL from the Location header with data.
+ *
+ * The reason of having two-step create/append is for preventing clients to
+ * send out the data before the redirect. This issue is addressed by the
+ * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+ * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+ * and Java 6 http client), which do not correctly implement "Expect:
+ * 100-continue". The two-step create/append is a temporary workaround for
+ * the software library bugs.
+ */
+ HttpURLConnection twoStepWrite() throws IOException {
+ //Step 1) Submit a Http request with neither auto-redirect nor data.
+ connect(false);
+ validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
+ final String redirect = conn.getHeaderField("Location");
+ disconnect();
+ checkRetry = false;
+
+ //Step 2) Submit another Http request with the URL from the Location header with data.
+ conn = (HttpURLConnection)new URL(redirect).openConnection();
+ conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM);
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+ connect();
+ return conn;
+ }
+
+ FSDataOutputStream write(final int bufferSize) throws IOException {
+ return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+ }
+
+ void getResponse(boolean getJsonAndDisconnect) throws IOException {
+ try {
+ connect();
+ final int code = conn.getResponseCode();
+ if (!redirected && op.getRedirect()
+ && code != op.getExpectedHttpResponseCode()) {
+ final String redirect = conn.getHeaderField("Location");
+ json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
+ conn, false);
+ disconnect();
+
+ checkRetry = false;
+ conn = (HttpURLConnection)new URL(redirect).openConnection();
+ connect();
+ }
+
+ json = validateResponse(op, conn, false);
+ if (json == null && getJsonAndDisconnect) {
+ json = jsonParse(conn, false);
+ }
+ } finally {
+ if (getJsonAndDisconnect) {
+ disconnect();
+ }
+ }
}
}
@@ -578,7 +701,7 @@ public class WebHdfsFileSystem extends F
super.close();
} finally {
try {
- validateResponse(op, conn);
+ validateResponse(op, conn, true);
} finally {
conn.disconnect();
}
@@ -594,13 +717,14 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
- final HttpURLConnection conn = httpConnect(op, f,
+ return new Runner(op, f,
new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
- new BlockSizeParam(blockSize));
- return write(op, conn, bufferSize);
+ new BlockSizeParam(blockSize))
+ .run()
+ .write(bufferSize);
}
@Override
@@ -609,9 +733,9 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
- final HttpURLConnection conn = httpConnect(op, f,
- new BufferSizeParam(bufferSize));
- return write(op, conn, bufferSize);
+ return new Runner(op, f, new BufferSizeParam(bufferSize))
+ .run()
+ .write(bufferSize);
}
@SuppressWarnings("deprecation")
@@ -638,26 +762,17 @@ public class WebHdfsFileSystem extends F
}
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
- /** The url with offset parameter */
- private URL offsetUrl;
-
OffsetUrlOpener(final URL url) {
super(url);
}
- /** Open connection with offset url. */
- @Override
- protected HttpURLConnection openConnection() throws IOException {
- return getHttpUrlConnection(offsetUrl);
- }
-
- /** Setup offset url before open connection. */
+ /** Setup offset url and connect. */
@Override
- protected HttpURLConnection openConnection(final long offset) throws IOException {
- offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
- final HttpURLConnection conn = openConnection();
- conn.setRequestMethod("GET");
- return conn;
+ protected HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException {
+ final URL offsetUrl = offset == 0L? url
+ : new URL(url + "&" + new OffsetParam(offset));
+ return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
}
}
@@ -698,12 +813,6 @@ public class WebHdfsFileSystem extends F
OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) {
super(o, r);
}
-
- @Override
- protected void checkResponseCode(final HttpURLConnection connection
- ) throws IOException {
- validateResponse(GetOpParam.Op.OPEN, connection);
- }
/** Remove offset parameter before returning the resolved url. */
@Override
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java Wed Aug 1 01:41:23 2012
@@ -44,6 +44,11 @@ public class DeleteOpParam extends HttpO
}
@Override
+ public boolean getRedirect() {
+ return false;
+ }
+
+ @Override
public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Wed Aug 1 01:41:23 2012
@@ -23,25 +23,27 @@ import java.net.HttpURLConnection;
public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
/** Get operations. */
public static enum Op implements HttpOpParam.Op {
- OPEN(HttpURLConnection.HTTP_OK),
+ OPEN(true, HttpURLConnection.HTTP_OK),
- GETFILESTATUS(HttpURLConnection.HTTP_OK),
- LISTSTATUS(HttpURLConnection.HTTP_OK),
- GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK),
- GETFILECHECKSUM(HttpURLConnection.HTTP_OK),
-
- GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK),
- GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
- GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK),
+ GETFILESTATUS(false, HttpURLConnection.HTTP_OK),
+ LISTSTATUS(false, HttpURLConnection.HTTP_OK),
+ GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK),
+ GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK),
+
+ GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
+ GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
+ GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
/** GET_BLOCK_LOCATIONS is a private unstable op. */
- GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK),
+ GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
- NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+ NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+ final boolean redirect;
final int expectedHttpResponseCode;
- Op(final int expectedHttpResponseCode) {
+ Op(final boolean redirect, final int expectedHttpResponseCode) {
+ this.redirect = redirect;
this.expectedHttpResponseCode = expectedHttpResponseCode;
}
@@ -56,6 +58,11 @@ public class GetOpParam extends HttpOpPa
}
@Override
+ public boolean getRedirect() {
+ return redirect;
+ }
+
+ @Override
public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Wed Aug 1 01:41:23 2012
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.web.resources;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import javax.ws.rs.core.Response;
@@ -42,6 +46,9 @@ public abstract class HttpOpParam<E exte
/** @return true if the operation will do output. */
public boolean getDoOutput();
+ /** @return true if the operation will be redirected. */
+ public boolean getRedirect();
+
/** @return true the expected http response code. */
public int getExpectedHttpResponseCode();
@@ -51,15 +58,25 @@ public abstract class HttpOpParam<E exte
/** Expects HTTP response 307 "Temporary Redirect". */
public static class TemporaryRedirectOp implements Op {
- static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE);
- static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND);
+ static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(
+ PutOpParam.Op.CREATE);
+ static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(
+ PostOpParam.Op.APPEND);
+ static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp(
+ GetOpParam.Op.OPEN);
+ static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp(
+ GetOpParam.Op.GETFILECHECKSUM);
+ static final List<TemporaryRedirectOp> values
+ = Collections.unmodifiableList(Arrays.asList(
+ new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
+
/** Get an object for the given op. */
public static TemporaryRedirectOp valueOf(final Op op) {
- if (op == CREATE.op) {
- return CREATE;
- } else if (op == APPEND.op) {
- return APPEND;
+ for(TemporaryRedirectOp t : values) {
+ if (op == t.op) {
+ return t;
+ }
}
throw new IllegalArgumentException(op + " not found.");
}
@@ -80,6 +97,11 @@ public abstract class HttpOpParam<E exte
return op.getDoOutput();
}
+ @Override
+ public boolean getRedirect() {
+ return false;
+ }
+
/** Override the original expected response with "Temporary Redirect". */
@Override
public int getExpectedHttpResponseCode() {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java Wed Aug 1 01:41:23 2012
@@ -44,6 +44,11 @@ public class PostOpParam extends HttpOpP
}
@Override
+ public boolean getRedirect() {
+ return true;
+ }
+
+ @Override
public int getExpectedHttpResponseCode() {
return expectedHttpResponseCode;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java Wed Aug 1 01:41:23 2012
@@ -39,11 +39,11 @@ public class PutOpParam extends HttpOpPa
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
- final boolean doOutput;
+ final boolean doOutputAndRedirect;
final int expectedHttpResponseCode;
- Op(final boolean doOutput, final int expectedHttpResponseCode) {
- this.doOutput = doOutput;
+ Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
+ this.doOutputAndRedirect = doOutputAndRedirect;
this.expectedHttpResponseCode = expectedHttpResponseCode;
}
@@ -54,7 +54,12 @@ public class PutOpParam extends HttpOpPa
@Override
public boolean getDoOutput() {
- return doOutput;
+ return doOutputAndRedirect;
+ }
+
+ @Override
+ public boolean getRedirect() {
+ return doOutputAndRedirect;
}
@Override
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java Wed Aug 1 01:41:23 2012
@@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFac
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -74,6 +76,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -825,13 +828,17 @@ public class TestDFSClientRetries {
/** Test client retry with namenode restarting. */
@Test
public void testNamenodeRestart() throws Exception {
+ namenodeRestartTest(new Configuration(), false);
+ }
+
+ public static void namenodeRestartTest(final Configuration conf,
+ final boolean isWebHDFS) throws Exception {
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
final List<Exception> exceptions = new ArrayList<Exception>();
final Path dir = new Path("/testNamenodeRestart");
- final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
final short numDatanodes = 3;
@@ -841,16 +848,18 @@ public class TestDFSClientRetries {
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
+ final FileSystem fs = isWebHDFS?
+ WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs;
final URI uri = dfs.getUri();
assertTrue(HdfsUtils.isHealthy(uri));
//create a file
final long length = 1L << 20;
final Path file1 = new Path(dir, "foo");
- DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
+ DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
//get file status
- final FileStatus s1 = dfs.getFileStatus(file1);
+ final FileStatus s1 = fs.getFileStatus(file1);
assertEquals(length, s1.getLen());
//shutdown namenode
@@ -858,6 +867,25 @@ public class TestDFSClientRetries {
cluster.shutdownNameNode(0);
assertFalse(HdfsUtils.isHealthy(uri));
+ //namenode is down, read the file in a thread
+ final Thread reader = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ //it should retry till namenode is up.
+ final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
+ final FSDataInputStream in = fs.open(file1);
+ int count = 0;
+ for(; in.read() != -1; count++);
+ in.close();
+ assertEquals(s1.getLen(), count);
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ reader.start();
+
//namenode is down, create another file in a thread
final Path file3 = new Path(dir, "file");
final Thread thread = new Thread(new Runnable() {
@@ -865,7 +893,7 @@ public class TestDFSClientRetries {
public void run() {
try {
//it should retry till namenode is up.
- final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+ final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
} catch (Exception e) {
exceptions.add(e);
@@ -892,12 +920,15 @@ public class TestDFSClientRetries {
}).start();
//namenode is down, it should retry until namenode is up again.
- final FileStatus s2 = dfs.getFileStatus(file1);
+ final FileStatus s2 = fs.getFileStatus(file1);
assertEquals(s1, s2);
//check file1 and file3
thread.join();
- assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
+ assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
+ assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
+
+ reader.join();
//enter safe mode
assertTrue(HdfsUtils.isHealthy(uri));
@@ -922,8 +953,8 @@ public class TestDFSClientRetries {
//namenode is in safe mode, create should retry until it leaves safe mode.
final Path file2 = new Path(dir, "bar");
- DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
- assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
+ DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L);
+ assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2));
assertTrue(HdfsUtils.isHealthy(uri));
@@ -931,7 +962,7 @@ public class TestDFSClientRetries {
final Path nonExisting = new Path(dir, "nonExisting");
LOG.info("setPermission: " + nonExisting);
try {
- dfs.setPermission(nonExisting, new FsPermission((short)0));
+ fs.setPermission(nonExisting, new FsPermission((short)0));
fail();
} catch(FileNotFoundException fnfe) {
LOG.info("GOOD!", fnfe);
@@ -949,6 +980,18 @@ public class TestDFSClientRetries {
}
}
+ private static FileSystem createFsWithDifferentUsername(
+ final Configuration conf, final boolean isWebHDFS
+ ) throws IOException, InterruptedException {
+ final String username = UserGroupInformation.getCurrentUser(
+ ).getShortUserName() + "_XXX";
+ final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+ username, new String[]{"supergroup"});
+
+ return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf)
+ : DFSTestUtil.getFileSystemAs(ugi, conf);
+ }
+
@Test
public void testMultipleLinearRandomRetry() {
parseMultipleLinearRandomRetry(null, "");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Wed Aug 1 01:41:23 2012
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -140,9 +139,7 @@ public class TestDelegationTokenForProxy
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
@Override
public Token<DelegationTokenIdentifier> run() throws IOException {
- DistributedFileSystem dfs = (DistributedFileSystem) cluster
- .getFileSystem();
- return dfs.getDelegationToken("RenewerUser");
+ return cluster.getFileSystem().getDelegationToken("RenewerUser");
}
});
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
@@ -206,7 +203,7 @@ public class TestDelegationTokenForProxy
final PutOpParam.Op op = PutOpParam.Op.CREATE;
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn = WebHdfsTestUtil.twoStepWrite(conn, op);
+ conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
out.write("Hello, webhdfs user!".getBytes());
out.close();
@@ -221,7 +218,7 @@ public class TestDelegationTokenForProxy
final PostOpParam.Op op = PostOpParam.Op.APPEND;
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn = WebHdfsTestUtil.twoStepWrite(conn, op);
+ conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
out.write("\nHello again!".getBytes());
out.close();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java Wed Aug 1 01:41:23 2012
@@ -18,22 +18,10 @@
package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.net.URI;
import java.net.URL;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener;
import org.junit.Test;
public class TestOffsetUrlInputStream {
@@ -73,65 +61,4 @@ public class TestOffsetUrlInputStream {
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
}
}
-
- @Test
- public void testByteRange() throws Exception {
- final Configuration conf = new Configuration();
- final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/";
- final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
-
- OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/")));
- doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
- .openConnection();
- OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null));
- doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
- .openConnection();
- final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy);
-
- assertEquals("getPos wrong", 0, is.getPos());
-
- is.read();
-
- assertNull("Initial call made incorrectly (Range Check)", ospy
- .openConnection().getRequestProperty("Range"));
-
- assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
-
- is.read();
-
- assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
-
- // No additional connections should have been made (no seek)
-
- rspy.setURL(new URL("http://resolvedurl/"));
-
- is.seek(100);
- is.read();
-
- assertEquals("getPos should be 101 after reading one byte", 101,
- is.getPos());
-
- verify(rspy, times(1)).openConnection();
-
- is.seek(101);
- is.read();
-
- verify(rspy, times(1)).openConnection();
-
- // Seek to 101 should not result in another request"
-
- is.seek(2500);
- is.read();
-
- ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
- is.seek(0);
-
- try {
- is.read();
- fail("Exception should be thrown when 206 response is given "
- + "but 200 is expected");
- } catch (IOException e) {
- WebHdfsFileSystem.LOG.info(e.toString());
- }
- }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java Wed Aug 1 01:41:23 2012
@@ -23,12 +23,16 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.TestDFSClientRetries;
+import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
@@ -196,4 +200,12 @@ public class TestWebHDFS {
in.close();
t.end(checked);
}
+
+ /** Test client retry with namenode restarting. */
+ @Test
+ public void testNamenodeRestart() throws Exception {
+ ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
+ final Configuration conf = WebHdfsTestUtil.createConf();
+ TestDFSClientRetries.namenodeRestartTest(conf, true);
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1367841&r1=1367840&r2=1367841&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Wed Aug 1 01:41:23 2012
@@ -79,13 +79,9 @@ public class WebHdfsTestUtil {
return WebHdfsFileSystem.jsonParse(conn, false);
}
- public static HttpURLConnection twoStepWrite(HttpURLConnection conn,
- final HttpOpParam.Op op) throws IOException {
- conn.setRequestMethod(op.getType().toString());
- conn = WebHdfsFileSystem.twoStepWrite(conn, op);
- conn.setDoOutput(true);
- conn.connect();
- return conn;
+ public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs,
+ final HttpOpParam.Op op, HttpURLConnection conn) throws IOException {
+ return webhdfs.new Runner(op, conn).twoStepWrite();
}
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,