You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by je...@apache.org on 2014/05/13 18:40:15 UTC
svn commit: r1594273 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/web/
src/main/java/org/apache/hadoop/hdfs/web/resources/
src/test/java/org/apache/hadoop/hdfs/web/
Author: jeagles
Date: Tue May 13 16:40:15 2014
New Revision: 1594273
URL: http://svn.apache.org/r1594273
Log:
HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn Sharp via jeagles)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 13 16:40:15 2014
@@ -461,6 +461,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum.
(Yi Liu via umamahesh)
+ HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn
+ Sharp via jeagles)
+
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue May 13 16:40:15 2014
@@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
-import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
-import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
-import org.apache.hadoop.hdfs.web.resources.DelegationParam;
-import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
-import org.apache.hadoop.hdfs.web.resources.DestinationParam;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.GroupParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
-import org.apache.hadoop.hdfs.web.resources.OwnerParam;
-import org.apache.hadoop.hdfs.web.resources.Param;
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
-import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
-import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
-import org.apache.hadoop.hdfs.web.resources.RenewerParam;
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
-import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -426,40 +400,23 @@ public class WebHdfsFileSystem extends F
}
/**
- * Run a http operation.
- * Connect to the http server, validate response, and obtain the JSON output.
- *
- * @param op http operation
- * @param fspath file system path
- * @param parameters parameters for the operation
- * @return a JSON object, e.g. Object[], Map<?, ?>, etc.
- * @throws IOException
- */
- private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
- final Param<?,?>... parameters) throws IOException {
- return new FsPathRunner(op, fspath, parameters).run().json;
- }
-
- /**
* This class is for initialing a HTTP connection, connecting to server,
* obtaining a response, and also handling retry on failures.
*/
- abstract class AbstractRunner {
+ abstract class AbstractRunner<T> {
abstract protected URL getUrl() throws IOException;
protected final HttpOpParam.Op op;
private final boolean redirected;
private boolean checkRetry;
- protected HttpURLConnection conn = null;
- private Map<?, ?> json = null;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op;
this.redirected = redirected;
}
- AbstractRunner run() throws IOException {
+ T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
@@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends F
// the entire lifecycle of the connection must be run inside the
// doAs to ensure authentication is performed correctly
return connectUgi.doAs(
- new PrivilegedExceptionAction<AbstractRunner>() {
+ new PrivilegedExceptionAction<T>() {
@Override
- public AbstractRunner run() throws IOException {
+ public T run() throws IOException {
return runWithRetry();
}
});
@@ -481,18 +438,51 @@ public class WebHdfsFileSystem extends F
throw new IOException(e);
}
}
-
- private void init() throws IOException {
- checkRetry = !redirected;
- URL url = getUrl();
- conn = (HttpURLConnection) connectionFactory.openConnection(url);
- }
-
- private void connect() throws IOException {
- connect(op.getDoOutput());
+
+ /**
+ * Two-step requests redirected to a DN
+ *
+ * Create/Append:
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
+ * Step 2) Submit another Http request with the URL from the Location header with data.
+ *
+ * The reason of having two-step create/append is for preventing clients to
+ * send out the data before the redirect. This issue is addressed by the
+ * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+ * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+ * and Java 6 http client), which do not correctly implement "Expect:
+ * 100-continue". The two-step create/append is a temporary workaround for
+ * the software library bugs.
+ *
+ * Open/Checksum
+ * Also implements two-step connects for other operations redirected to
+ * a DN such as open and checksum
+ */
+ private HttpURLConnection connect(URL url) throws IOException {
+ // resolve redirects for a DN operation unless already resolved
+ if (op.getRedirect() && !redirected) {
+ final HttpOpParam.Op redirectOp =
+ HttpOpParam.TemporaryRedirectOp.valueOf(op);
+ final HttpURLConnection conn = connect(redirectOp, url);
+ // application level proxy like httpfs might not issue a redirect
+ if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+ return conn;
+ }
+ try {
+ validateResponse(redirectOp, conn, false);
+ url = new URL(conn.getHeaderField("Location"));
+ } finally {
+ conn.disconnect();
+ }
+ }
+ return connect(op, url);
}
- private void connect(boolean doOutput) throws IOException {
+ private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+ throws IOException {
+ final HttpURLConnection conn =
+ (HttpURLConnection)connectionFactory.openConnection(url);
+ final boolean doOutput = op.getDoOutput();
conn.setRequestMethod(op.getType().toString());
conn.setInstanceFollowRedirects(false);
switch (op.getType()) {
@@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends F
// explicitly setting content-length to 0 won't do spnego!!
// opening and closing the stream will send "Content-Length: 0"
conn.getOutputStream().close();
+ } else {
+ conn.setRequestProperty("Content-Type",
+ MediaType.APPLICATION_OCTET_STREAM);
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
}
break;
}
@@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends F
}
}
conn.connect();
+ return conn;
}
- private void disconnect() {
- if (conn != null) {
- conn.disconnect();
- conn = null;
- }
- }
-
- private AbstractRunner runWithRetry() throws IOException {
+ private T runWithRetry() throws IOException {
/**
* Do the real work.
*
@@ -541,15 +529,16 @@ public class WebHdfsFileSystem extends F
* examines the exception and swallows it if it decides to rerun the work.
*/
for(int retry = 0; ; retry++) {
+ checkRetry = !redirected;
+ final URL url = getUrl();
try {
- init();
- if (op.getDoOutput()) {
- twoStepWrite();
- } else {
- getResponse(op != GetOpParam.Op.OPEN);
+ final HttpURLConnection conn = connect(url);
+ // output streams will validate on close
+ if (!op.getDoOutput()) {
+ validateResponse(op, conn, false);
}
- return this;
- } catch(IOException ioe) {
+ return getResponse(conn);
+ } catch (IOException ioe) {
Throwable cause = ioe.getCause();
if (cause != null && cause instanceof AuthenticationException) {
throw ioe; // no retries for auth failures
@@ -591,87 +580,129 @@ public class WebHdfsFileSystem extends F
throw toIOException(ioe);
}
- /**
- * Two-step Create/Append:
- * Step 1) Submit a Http request with neither auto-redirect nor data.
- * Step 2) Submit another Http request with the URL from the Location header with data.
- *
- * The reason of having two-step create/append is for preventing clients to
- * send out the data before the redirect. This issue is addressed by the
- * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
- * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
- * and Java 6 http client), which do not correctly implement "Expect:
- * 100-continue". The two-step create/append is a temporary workaround for
- * the software library bugs.
- */
- HttpURLConnection twoStepWrite() throws IOException {
- //Step 1) Submit a Http request with neither auto-redirect nor data.
- connect(false);
- validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
- final String redirect = conn.getHeaderField("Location");
- disconnect();
- checkRetry = false;
-
- //Step 2) Submit another Http request with the URL from the Location header with data.
- conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
- redirect));
- conn.setRequestProperty("Content-Type",
- MediaType.APPLICATION_OCTET_STREAM);
- conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
- connect();
- return conn;
+ abstract T getResponse(HttpURLConnection conn) throws IOException;
+ }
+
+ /**
+ * Abstract base class to handle path-based operations with params
+ */
+ abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+ private final Path fspath;
+ private final Param<?,?>[] parameters;
+
+ AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, false);
+ this.fspath = fspath;
+ this.parameters = parameters;
+ }
+
+ @Override
+ protected URL getUrl() throws IOException {
+ return toUrl(op, fspath, parameters);
}
+ }
- FSDataOutputStream write(final int bufferSize) throws IOException {
- return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+ /**
+ * Default path-based implementation expects no json response
+ */
+ class FsPathRunner extends AbstractFsPathRunner<Void> {
+ FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
}
+
+ @Override
+ Void getResponse(HttpURLConnection conn) throws IOException {
+ return null;
+ }
+ }
- void getResponse(boolean getJsonAndDisconnect) throws IOException {
+ /**
+ * Handle path-based operations with a json response
+ */
+ abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+ FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ @Override
+ final T getResponse(HttpURLConnection conn) throws IOException {
try {
- connect();
- final int code = conn.getResponseCode();
- if (!redirected && op.getRedirect()
- && code != op.getExpectedHttpResponseCode()) {
- final String redirect = conn.getHeaderField("Location");
- json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
- conn, false);
- disconnect();
-
- checkRetry = false;
- conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
- redirect));
- connect();
+ final Map<?,?> json = jsonParse(conn, false);
+ if (json == null) {
+ // match exception class thrown by parser
+ throw new IllegalStateException("Missing response");
}
-
- json = validateResponse(op, conn, false);
- if (json == null && getJsonAndDisconnect) {
- json = jsonParse(conn, false);
+ return decodeResponse(json);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) { // catch json parser errors
+ final IOException ioe =
+ new IOException("Response decoding failure: "+e.toString(), e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(ioe);
}
+ throw ioe;
} finally {
- if (getJsonAndDisconnect) {
- disconnect();
- }
+ conn.disconnect();
}
}
+
+ abstract T decodeResponse(Map<?,?> json) throws IOException;
}
- final class FsPathRunner extends AbstractRunner {
- private final Path fspath;
- private final Param<?, ?>[] parameters;
-
- FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
- super(op, false);
- this.fspath = fspath;
- this.parameters = parameters;
+ /**
+ * Handle path-based operations with json boolean response
+ */
+ class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+ FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
}
-
+
@Override
- protected URL getUrl() throws IOException {
- return toUrl(op, fspath, parameters);
+ Boolean decodeResponse(Map<?,?> json) throws IOException {
+ return (Boolean)json.get("boolean");
}
}
- final class URLRunner extends AbstractRunner {
+ /**
+ * Handle create/append output streams
+ */
+ class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+ private final int bufferSize;
+
+ FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ FSDataOutputStream getResponse(final HttpURLConnection conn)
+ throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(
+ conn.getOutputStream(), bufferSize), statistics) {
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ try {
+ validateResponse(op, conn, true);
+ } finally {
+ conn.disconnect();
+ }
+ }
+ }
+ };
+ }
+ }
+
+ /**
+ * Used by open() which tracks the resolved url itself
+ */
+ final class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url;
@Override
protected URL getUrl() {
@@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends F
super(op, redirected);
this.url = url;
}
+
+ @Override
+ HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+ return conn;
+ }
}
private FsPermission applyUMask(FsPermission permission) {
@@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends F
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
- final Map<?, ?> json = run(op, f);
- final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
+ HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+ @Override
+ HdfsFileStatus decodeResponse(Map<?,?> json) {
+ return JsonUtil.toFileStatus(json, true);
+ }
+ }.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends F
@Override
public AclStatus getAclStatus(Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
- final Map<?, ?> json = run(op, f);
- AclStatus status = JsonUtil.toAclStatus(json);
+ AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+ @Override
+ AclStatus decodeResponse(Map<?,?> json) {
+ return JsonUtil.toAclStatus(json);
+ }
+ }.run();
if (status == null) {
throw new FileNotFoundException("File does not exist: " + f);
}
@@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends F
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
- final Map<?, ?> json = run(op, f,
- new PermissionParam(applyUMask(permission)));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, f,
+ new PermissionParam(applyUMask(permission))
+ ).run();
}
/**
@@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
- run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()),
- new CreateParentParam(createParent));
+ new FsPathRunner(op, f,
+ new DestinationParam(makeQualified(destination).toUri().getPath()),
+ new CreateParentParam(createParent)
+ ).run();
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
- final Map<?, ?> json = run(op, src,
- new DestinationParam(makeQualified(dst).toUri().getPath()));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath())
+ ).run();
}
@SuppressWarnings("deprecation")
@@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends F
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
- run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()),
- new RenameOptionSetParam(options));
+ new FsPathRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath()),
+ new RenameOptionSetParam(options)
+ ).run();
}
@Override
@@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
- run(op, p, new OwnerParam(owner), new GroupParam(group));
+ new FsPathRunner(op, p,
+ new OwnerParam(owner), new GroupParam(group)
+ ).run();
}
@Override
@@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
- run(op, p, new PermissionParam(permission));
+ new FsPathRunner(op, p,new PermissionParam(permission)).run();
}
@Override
@@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
- run(op, path, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
@@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
- run(op, path, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
- run(op, path);
+ new FsPathRunner(op, path).run();
}
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
- run(op, path);
+ new FsPathRunner(op, path).run();
}
@Override
@@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends F
throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
- run(op, p, new AclPermissionParam(aclSpec));
+ new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
}
@Override
@@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
- final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, p,
+ new ReplicationParam(replication)
+ ).run();
}
@Override
@@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends F
) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
- run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
+ new FsPathRunner(op, p,
+ new ModificationTimeParam(mtime),
+ new AccessTimeParam(atime)
+ ).run();
}
@Override
@@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends F
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
}
- FSDataOutputStream write(final HttpOpParam.Op op,
- final HttpURLConnection conn, final int bufferSize) throws IOException {
- return new FSDataOutputStream(new BufferedOutputStream(
- conn.getOutputStream(), bufferSize), statistics) {
- @Override
- public void close() throws IOException {
- try {
- super.close();
- } finally {
- try {
- validateResponse(op, conn, true);
- } finally {
- conn.disconnect();
- }
- }
- }
- };
- }
-
@Override
public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
-
- ConcatSourcesParam param = new ConcatSourcesParam(srcs);
- run(op, trg, param);
+ new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
}
@Override
@@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
- return new FsPathRunner(op, f,
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
new PermissionParam(applyUMask(permission)),
new OverwriteParam(overwrite),
new BufferSizeParam(bufferSize),
new ReplicationParam(replication),
- new BlockSizeParam(blockSize))
- .run()
- .write(bufferSize);
+ new BlockSizeParam(blockSize)
+ ).run();
}
@Override
@@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends F
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
- return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
- .run()
- .write(bufferSize);
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
+ new BufferSizeParam(bufferSize)
+ ).run();
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
- final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
- return (Boolean)json.get("boolean");
+ return new FsPathBooleanRunner(op, f,
+ new RecursiveParam(recursive)
+ ).run();
}
@Override
@@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends F
final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset));
- return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
+ return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
}
}
@@ -1001,25 +1034,36 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
- final Map<?, ?> json = run(op, f);
- final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
- final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
-
- //convert FileStatus
- final FileStatus[] statuses = new FileStatus[array.length];
- for(int i = 0; i < array.length; i++) {
- final Map<?, ?> m = (Map<?, ?>)array[i];
- statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
- }
- return statuses;
+ return new FsPathResponseRunner<FileStatus[]>(op, f) {
+ @Override
+ FileStatus[] decodeResponse(Map<?,?> json) {
+ final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+ final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
+
+ //convert FileStatus
+ final FileStatus[] statuses = new FileStatus[array.length];
+ for (int i = 0; i < array.length; i++) {
+ final Map<?, ?> m = (Map<?, ?>)array[i];
+ statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
+ }
+ return statuses;
+ }
+ }.run();
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
- final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
- final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
+ Token<DelegationTokenIdentifier> token =
+ new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+ op, null, new RenewerParam(renewer)) {
+ @Override
+ Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+ throws IOException {
+ return JsonUtil.toDelegationToken(json);
+ }
+ }.run();
token.setService(tokenServiceName);
return token;
}
@@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends F
public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
- TokenArgumentParam dtargParam = new TokenArgumentParam(
- token.encodeToUrlString());
- final Map<?, ?> m = run(op, null, dtargParam);
- return (Long) m.get("long");
+ return new FsPathResponseRunner<Long>(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())) {
+ @Override
+ Long decodeResponse(Map<?,?> json) throws IOException {
+ return (Long) json.get("long");
+ }
+ }.run();
}
@Override
public synchronized void cancelDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
- TokenArgumentParam dtargParam = new TokenArgumentParam(
- token.encodeToUrlString());
- run(op, null, dtargParam);
+ new FsPathRunner(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())
+ ).run();
}
@Override
@@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
- final Map<?, ?> m = run(op, p, new OffsetParam(offset),
- new LengthParam(length));
- return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
+ return new FsPathResponseRunner<BlockLocation[]>(op, p,
+ new OffsetParam(offset), new LengthParam(length)) {
+ @Override
+ BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+ return DFSUtil.locatedBlocks2Locations(
+ JsonUtil.toLocatedBlocks(json));
+ }
+ }.run();
}
@Override
@@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
- final Map<?, ?> m = run(op, p);
- return JsonUtil.toContentSummary(m);
+ return new FsPathResponseRunner<ContentSummary>(op, p) {
+ @Override
+ ContentSummary decodeResponse(Map<?,?> json) {
+ return JsonUtil.toContentSummary(json);
+ }
+ }.run();
}
@Override
@@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends F
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
- final Map<?, ?> m = run(op, p);
- return JsonUtil.toMD5MD5CRC32FileChecksum(m);
+ return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+ @Override
+ MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+ return JsonUtil.toMD5MD5CRC32FileChecksum(json);
+ }
+ }.run();
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Tue May 13 16:40:15 2014
@@ -102,7 +102,7 @@ public abstract class HttpOpParam<E exte
@Override
public boolean getDoOutput() {
- return op.getDoOutput();
+ return false;
}
@Override
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Tue May 13 16:40:15 2014
@@ -94,10 +94,4 @@ public class WebHdfsTestUtil {
Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
return WebHdfsFileSystem.jsonParse(conn, false);
}
-
- public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
- final HttpOpParam.Op op, final HttpURLConnection conn,
- final int bufferSize) throws IOException {
- return webhdfs.write(op, conn, bufferSize);
- }
}