You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by je...@apache.org on 2014/05/13 18:40:15 UTC

svn commit: r1594273 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/web/ src/main/java/org/apache/hadoop/hdfs/web/resources/ src/test/java/org/apache/hadoop/hdfs/web/

Author: jeagles
Date: Tue May 13 16:40:15 2014
New Revision: 1594273

URL: http://svn.apache.org/r1594273
Log:
HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn Sharp via jeagles)

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 13 16:40:15 2014
@@ -461,6 +461,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6367. EnumSetParam$Domain#parse fails for parameter containing more than one enum.
     (Yi Liu via umamahesh)
 
+    HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn
+    Sharp via jeagles)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue May 13 16:40:15 2014
@@ -58,34 +58,8 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
-import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
-import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
-import org.apache.hadoop.hdfs.web.resources.DelegationParam;
-import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
-import org.apache.hadoop.hdfs.web.resources.DestinationParam;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.GroupParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
-import org.apache.hadoop.hdfs.web.resources.OwnerParam;
-import org.apache.hadoop.hdfs.web.resources.Param;
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
-import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
-import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
-import org.apache.hadoop.hdfs.web.resources.RenewerParam;
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
-import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -426,40 +400,23 @@ public class WebHdfsFileSystem extends F
   }
 
   /**
-   * Run a http operation.
-   * Connect to the http server, validate response, and obtain the JSON output.
-   * 
-   * @param op http operation
-   * @param fspath file system path
-   * @param parameters parameters for the operation
-   * @return a JSON object, e.g. Object[], Map<?, ?>, etc.
-   * @throws IOException
-   */
-  private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
-      final Param<?,?>... parameters) throws IOException {
-    return new FsPathRunner(op, fspath, parameters).run().json;
-  }
-
-  /**
    * This class is for initialing a HTTP connection, connecting to server,
    * obtaining a response, and also handling retry on failures.
    */
-  abstract class AbstractRunner {
+  abstract class AbstractRunner<T> {
     abstract protected URL getUrl() throws IOException;
 
     protected final HttpOpParam.Op op;
     private final boolean redirected;
 
     private boolean checkRetry;
-    protected HttpURLConnection conn = null;
-    private Map<?, ?> json = null;
 
     protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
       this.op = op;
       this.redirected = redirected;
     }
 
-    AbstractRunner run() throws IOException {
+    T run() throws IOException {
       UserGroupInformation connectUgi = ugi.getRealUser();
       if (connectUgi == null) {
         connectUgi = ugi;
@@ -471,9 +428,9 @@ public class WebHdfsFileSystem extends F
         // the entire lifecycle of the connection must be run inside the
         // doAs to ensure authentication is performed correctly
         return connectUgi.doAs(
-            new PrivilegedExceptionAction<AbstractRunner>() {
+            new PrivilegedExceptionAction<T>() {
               @Override
-              public AbstractRunner run() throws IOException {
+              public T run() throws IOException {
                 return runWithRetry();
               }
             });
@@ -481,18 +438,51 @@ public class WebHdfsFileSystem extends F
         throw new IOException(e);
       }
     }
-    
-    private void init() throws IOException {
-      checkRetry = !redirected;
-      URL url = getUrl();
-      conn = (HttpURLConnection) connectionFactory.openConnection(url);
-    }
-    
-    private void connect() throws IOException {
-      connect(op.getDoOutput());
+
+    /**
+     * Two-step requests redirected to a DN
+     * 
+     * Create/Append:
+     * Step 1) Submit a Http request with neither auto-redirect nor data. 
+     * Step 2) Submit another Http request with the URL from the Location header with data.
+     * 
+     * The reason of having two-step create/append is for preventing clients to
+     * send out the data before the redirect. This issue is addressed by the
+     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+     * and Java 6 http client), which do not correctly implement "Expect:
+     * 100-continue". The two-step create/append is a temporary workaround for
+     * the software library bugs.
+     * 
+     * Open/Checksum
+     * Also implements two-step connects for other operations redirected to
+     * a DN such as open and checksum
+     */
+    private HttpURLConnection connect(URL url) throws IOException {
+      // resolve redirects for a DN operation unless already resolved
+      if (op.getRedirect() && !redirected) {
+        final HttpOpParam.Op redirectOp =
+            HttpOpParam.TemporaryRedirectOp.valueOf(op);
+        final HttpURLConnection conn = connect(redirectOp, url);
+        // application level proxy like httpfs might not issue a redirect
+        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+          return conn;
+        }
+        try {
+          validateResponse(redirectOp, conn, false);
+          url = new URL(conn.getHeaderField("Location"));
+        } finally {
+          conn.disconnect();
+        }
+      }
+      return connect(op, url);
     }
 
-    private void connect(boolean doOutput) throws IOException {
+    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+        throws IOException {
+      final HttpURLConnection conn =
+          (HttpURLConnection)connectionFactory.openConnection(url);
+      final boolean doOutput = op.getDoOutput();
       conn.setRequestMethod(op.getType().toString());
       conn.setInstanceFollowRedirects(false);
       switch (op.getType()) {
@@ -505,6 +495,10 @@ public class WebHdfsFileSystem extends F
             // explicitly setting content-length to 0 won't do spnego!!
             // opening and closing the stream will send "Content-Length: 0"
             conn.getOutputStream().close();
+          } else {
+            conn.setRequestProperty("Content-Type",
+                MediaType.APPLICATION_OCTET_STREAM);
+            conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
           }
           break;
         }
@@ -514,16 +508,10 @@ public class WebHdfsFileSystem extends F
         }
       }
       conn.connect();
+      return conn;
     }
 
-    private void disconnect() {
-      if (conn != null) {
-        conn.disconnect();
-        conn = null;
-      }
-    }
-
-    private AbstractRunner runWithRetry() throws IOException {
+    private T runWithRetry() throws IOException {
       /**
        * Do the real work.
        *
@@ -541,15 +529,16 @@ public class WebHdfsFileSystem extends F
        * examines the exception and swallows it if it decides to rerun the work.
        */
       for(int retry = 0; ; retry++) {
+        checkRetry = !redirected;
+        final URL url = getUrl();
         try {
-          init();
-          if (op.getDoOutput()) {
-            twoStepWrite();
-          } else {
-            getResponse(op != GetOpParam.Op.OPEN);
+          final HttpURLConnection conn = connect(url);
+          // output streams will validate on close
+          if (!op.getDoOutput()) {
+            validateResponse(op, conn, false);
           }
-          return this;
-        } catch(IOException ioe) {
+          return getResponse(conn);
+        } catch (IOException ioe) {
           Throwable cause = ioe.getCause();
           if (cause != null && cause instanceof AuthenticationException) {
             throw ioe; // no retries for auth failures
@@ -591,87 +580,129 @@ public class WebHdfsFileSystem extends F
       throw toIOException(ioe);
     }
 
-    /**
-     * Two-step Create/Append:
-     * Step 1) Submit a Http request with neither auto-redirect nor data. 
-     * Step 2) Submit another Http request with the URL from the Location header with data.
-     * 
-     * The reason of having two-step create/append is for preventing clients to
-     * send out the data before the redirect. This issue is addressed by the
-     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
-     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
-     * and Java 6 http client), which do not correctly implement "Expect:
-     * 100-continue". The two-step create/append is a temporary workaround for
-     * the software library bugs.
-     */
-    HttpURLConnection twoStepWrite() throws IOException {
-      //Step 1) Submit a Http request with neither auto-redirect nor data. 
-      connect(false);
-      validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
-      final String redirect = conn.getHeaderField("Location");
-      disconnect();
-      checkRetry = false;
-      
-      //Step 2) Submit another Http request with the URL from the Location header with data.
-      conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
-          redirect));
-      conn.setRequestProperty("Content-Type",
-          MediaType.APPLICATION_OCTET_STREAM);
-      conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
-      connect();
-      return conn;
+    abstract T getResponse(HttpURLConnection conn) throws IOException;
+  }
+
+  /**
+   * Abstract base class to handle path-based operations with params
+   */
+  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+    private final Path fspath;
+    private final Param<?,?>[] parameters;
+    
+    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+    
+    @Override
+    protected URL getUrl() throws IOException {
+      return toUrl(op, fspath, parameters);
     }
+  }
 
-    FSDataOutputStream write(final int bufferSize) throws IOException {
-      return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+  /**
+   * Default path-based implementation expects no json response
+   */
+  class FsPathRunner extends AbstractFsPathRunner<Void> {
+    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
     }
+    
+    @Override
+    Void getResponse(HttpURLConnection conn) throws IOException {
+      return null;
+    }
+  }
 
-    void getResponse(boolean getJsonAndDisconnect) throws IOException {
+  /**
+   * Handle path-based operations with a json response
+   */
+  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    
+    @Override
+    final T getResponse(HttpURLConnection conn) throws IOException {
       try {
-        connect();
-        final int code = conn.getResponseCode();
-        if (!redirected && op.getRedirect()
-            && code != op.getExpectedHttpResponseCode()) {
-          final String redirect = conn.getHeaderField("Location");
-          json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
-              conn, false);
-          disconnect();
-  
-          checkRetry = false;
-          conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
-              redirect));
-          connect();
+        final Map<?,?> json = jsonParse(conn, false);
+        if (json == null) {
+          // match exception class thrown by parser
+          throw new IllegalStateException("Missing response");
         }
-
-        json = validateResponse(op, conn, false);
-        if (json == null && getJsonAndDisconnect) {
-          json = jsonParse(conn, false);
+        return decodeResponse(json);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) { // catch json parser errors
+        final IOException ioe =
+            new IOException("Response decoding failure: "+e.toString(), e);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(ioe);
         }
+        throw ioe;
       } finally {
-        if (getJsonAndDisconnect) {
-          disconnect();
-        }
+        conn.disconnect();
       }
     }
+    
+    abstract T decodeResponse(Map<?,?> json) throws IOException;
   }
 
-  final class FsPathRunner extends AbstractRunner {
-    private final Path fspath;
-    private final Param<?, ?>[] parameters;
-
-    FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
-      super(op, false);
-      this.fspath = fspath;
-      this.parameters = parameters;
+  /**
+   * Handle path-based operations with json boolean response
+   */
+  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
     }
-
+    
     @Override
-    protected URL getUrl() throws IOException {
-      return toUrl(op, fspath, parameters);
+    Boolean decodeResponse(Map<?,?> json) throws IOException {
+      return (Boolean)json.get("boolean");
     }
   }
 
-  final class URLRunner extends AbstractRunner {
+  /**
+   * Handle create/append output streams
+   */
+  class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+    private final int bufferSize;
+    
+    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+      this.bufferSize = bufferSize;
+    }
+    
+    @Override
+    FSDataOutputStream getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return new FSDataOutputStream(new BufferedOutputStream(
+          conn.getOutputStream(), bufferSize), statistics) {
+        @Override
+        public void close() throws IOException {
+          try {
+            super.close();
+          } finally {
+            try {
+              validateResponse(op, conn, true);
+            } finally {
+              conn.disconnect();
+            }
+          }
+        }
+      };
+    }
+  }
+  
+  /**
+   * Used by open() which tracks the resolved url itself
+   */
+  final class URLRunner extends AbstractRunner<HttpURLConnection> {
     private final URL url;
     @Override
     protected URL getUrl() {
@@ -682,6 +713,11 @@ public class WebHdfsFileSystem extends F
       super(op, redirected);
       this.url = url;
     }
+
+    @Override
+    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+      return conn;
+    }
   }
 
   private FsPermission applyUMask(FsPermission permission) {
@@ -693,8 +729,12 @@ public class WebHdfsFileSystem extends F
 
   private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
-    final Map<?, ?> json = run(op, f);
-    final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
+    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+      @Override
+      HdfsFileStatus decodeResponse(Map<?,?> json) {
+        return JsonUtil.toFileStatus(json, true);
+      }
+    }.run();
     if (status == null) {
       throw new FileNotFoundException("File does not exist: " + f);
     }
@@ -718,8 +758,12 @@ public class WebHdfsFileSystem extends F
   @Override
   public AclStatus getAclStatus(Path f) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
-    final Map<?, ?> json = run(op, f);
-    AclStatus status = JsonUtil.toAclStatus(json);
+    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+      @Override
+      AclStatus decodeResponse(Map<?,?> json) {
+        return JsonUtil.toAclStatus(json);
+      }
+    }.run();
     if (status == null) {
       throw new FileNotFoundException("File does not exist: " + f);
     }
@@ -730,9 +774,9 @@ public class WebHdfsFileSystem extends F
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
-    final Map<?, ?> json = run(op, f,
-        new PermissionParam(applyUMask(permission)));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, f,
+        new PermissionParam(applyUMask(permission))
+    ).run();
   }
 
   /**
@@ -743,17 +787,19 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
-    run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()),
-        new CreateParentParam(createParent));
+    new FsPathRunner(op, f,
+        new DestinationParam(makeQualified(destination).toUri().getPath()),
+        new CreateParentParam(createParent)
+    ).run();
   }
 
   @Override
   public boolean rename(final Path src, final Path dst) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    final Map<?, ?> json = run(op, src,
-        new DestinationParam(makeQualified(dst).toUri().getPath()));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath())
+    ).run();
   }
 
   @SuppressWarnings("deprecation")
@@ -762,8 +808,10 @@ public class WebHdfsFileSystem extends F
       final Options.Rename... options) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()),
-        new RenameOptionSetParam(options));
+    new FsPathRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath()),
+        new RenameOptionSetParam(options)
+    ).run();
   }
 
   @Override
@@ -775,7 +823,9 @@ public class WebHdfsFileSystem extends F
 
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
-    run(op, p, new OwnerParam(owner), new GroupParam(group));
+    new FsPathRunner(op, p,
+        new OwnerParam(owner), new GroupParam(group)
+    ).run();
   }
 
   @Override
@@ -783,7 +833,7 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
-    run(op, p, new PermissionParam(permission));
+    new FsPathRunner(op, p,new PermissionParam(permission)).run();
   }
 
   @Override
@@ -791,7 +841,7 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
-    run(op, path, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
 
   @Override
@@ -799,21 +849,21 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
-    run(op, path, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
 
   @Override
   public void removeDefaultAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
-    run(op, path);
+    new FsPathRunner(op, path).run();
   }
 
   @Override
   public void removeAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
-    run(op, path);
+    new FsPathRunner(op, path).run();
   }
 
   @Override
@@ -821,7 +871,7 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETACL;
-    run(op, p, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
   }
 
   @Override
@@ -829,8 +879,9 @@ public class WebHdfsFileSystem extends F
      ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
-    final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, p,
+        new ReplicationParam(replication)
+    ).run();
   }
 
   @Override
@@ -838,7 +889,10 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
-    run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
+    new FsPathRunner(op, p,
+        new ModificationTimeParam(mtime),
+        new AccessTimeParam(atime)
+    ).run();
   }
 
   @Override
@@ -853,32 +907,11 @@ public class WebHdfsFileSystem extends F
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
   }
 
-  FSDataOutputStream write(final HttpOpParam.Op op,
-      final HttpURLConnection conn, final int bufferSize) throws IOException {
-    return new FSDataOutputStream(new BufferedOutputStream(
-        conn.getOutputStream(), bufferSize), statistics) {
-      @Override
-      public void close() throws IOException {
-        try {
-          super.close();
-        } finally {
-          try {
-            validateResponse(op, conn, true);
-          } finally {
-            conn.disconnect();
-          }
-        }
-      }
-    };
-  }
-
   @Override
   public void concat(final Path trg, final Path [] srcs) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
-
-    ConcatSourcesParam param = new ConcatSourcesParam(srcs);
-    run(op, trg, param);
+    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
   }
 
   @Override
@@ -888,14 +921,13 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
-    return new FsPathRunner(op, f,
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
         new PermissionParam(applyUMask(permission)),
         new OverwriteParam(overwrite),
         new BufferSizeParam(bufferSize),
         new ReplicationParam(replication),
-        new BlockSizeParam(blockSize))
-      .run()
-      .write(bufferSize);
+        new BlockSizeParam(blockSize)
+    ).run();
   }
 
   @Override
@@ -904,16 +936,17 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PostOpParam.Op.APPEND;
-    return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
-      .run()
-      .write(bufferSize);
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
+        new BufferSizeParam(bufferSize)
+    ).run();
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
     final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
-    final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, f,
+        new RecursiveParam(recursive)
+    ).run();
   }
 
   @Override
@@ -945,7 +978,7 @@ public class WebHdfsFileSystem extends F
         final boolean resolved) throws IOException {
       final URL offsetUrl = offset == 0L? url
           : new URL(url + "&" + new OffsetParam(offset));
-      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
+      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
     }  
   }
 
@@ -1001,25 +1034,36 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
-    final Map<?, ?> json  = run(op, f);
-    final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
-    final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
-
-    //convert FileStatus
-    final FileStatus[] statuses = new FileStatus[array.length];
-    for(int i = 0; i < array.length; i++) {
-      final Map<?, ?> m = (Map<?, ?>)array[i];
-      statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
-    }
-    return statuses;
+    return new FsPathResponseRunner<FileStatus[]>(op, f) {
+      @Override
+      FileStatus[] decodeResponse(Map<?,?> json) {
+        final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+        final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
+
+        //convert FileStatus
+        final FileStatus[] statuses = new FileStatus[array.length];
+        for (int i = 0; i < array.length; i++) {
+          final Map<?, ?> m = (Map<?, ?>)array[i];
+          statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
+        }
+        return statuses;
+      }
+    }.run();
   }
 
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(
       final String renewer) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
-    final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
-    final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
+    Token<DelegationTokenIdentifier> token =
+        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+            op, null, new RenewerParam(renewer)) {
+      @Override
+      Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+          throws IOException {
+        return JsonUtil.toDelegationToken(json);
+      }
+    }.run();
     token.setService(tokenServiceName);
     return token;
   }
@@ -1041,19 +1085,22 @@ public class WebHdfsFileSystem extends F
   public synchronized long renewDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
-    TokenArgumentParam dtargParam = new TokenArgumentParam(
-        token.encodeToUrlString());
-    final Map<?, ?> m = run(op, null, dtargParam);
-    return (Long) m.get("long");
+    return new FsPathResponseRunner<Long>(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())) {
+      @Override
+      Long decodeResponse(Map<?,?> json) throws IOException {
+        return (Long) json.get("long");
+      }
+    }.run();
   }
 
   @Override
   public synchronized void cancelDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
-    TokenArgumentParam dtargParam = new TokenArgumentParam(
-        token.encodeToUrlString());
-    run(op, null, dtargParam);
+    new FsPathRunner(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())
+    ).run();
   }
   
   @Override
@@ -1071,9 +1118,14 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
-    final Map<?, ?> m = run(op, p, new OffsetParam(offset),
-        new LengthParam(length));
-    return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
+    return new FsPathResponseRunner<BlockLocation[]>(op, p,
+        new OffsetParam(offset), new LengthParam(length)) {
+      @Override
+      BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+        return DFSUtil.locatedBlocks2Locations(
+            JsonUtil.toLocatedBlocks(json));
+      }
+    }.run();
   }
 
   @Override
@@ -1081,8 +1133,12 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
-    final Map<?, ?> m = run(op, p);
-    return JsonUtil.toContentSummary(m);
+    return new FsPathResponseRunner<ContentSummary>(op, p) {
+      @Override
+      ContentSummary decodeResponse(Map<?,?> json) {
+        return JsonUtil.toContentSummary(json);        
+      }
+    }.run();
   }
 
   @Override
@@ -1091,8 +1147,12 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
   
     final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
-    final Map<?, ?> m = run(op, p);
-    return JsonUtil.toMD5MD5CRC32FileChecksum(m);
+    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+      @Override
+      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+        return JsonUtil.toMD5MD5CRC32FileChecksum(json);
+      }
+    }.run();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Tue May 13 16:40:15 2014
@@ -102,7 +102,7 @@ public abstract class HttpOpParam<E exte
 
     @Override
     public boolean getDoOutput() {
-      return op.getDoOutput();
+      return false;
     }
 
     @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1594273&r1=1594272&r2=1594273&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Tue May 13 16:40:15 2014
@@ -94,10 +94,4 @@ public class WebHdfsTestUtil {
     Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
     return WebHdfsFileSystem.jsonParse(conn, false);
   }
-
-  public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
-      final HttpOpParam.Op op, final HttpURLConnection conn,
-      final int bufferSize) throws IOException {
-    return webhdfs.write(op, conn, bufferSize);
-  }
 }