You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [23/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -49,8 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -58,49 +62,28 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
-import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
-import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
-import org.apache.hadoop.hdfs.web.resources.DelegationParam;
-import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
-import org.apache.hadoop.hdfs.web.resources.DestinationParam;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.GroupParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
-import org.apache.hadoop.hdfs.web.resources.OwnerParam;
-import org.apache.hadoop.hdfs.web.resources.Param;
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
-import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
-import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
-import org.apache.hadoop.hdfs.web.resources.RenewerParam;
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
-import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
-import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.util.Progressable;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /** A FileSystem for HDFS over the web. */
@@ -119,7 +102,7 @@ public class WebHdfsFileSystem extends F
 
   /** Delegation token kind */
   public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
-  protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
+  private boolean canRefreshDelegationToken;
 
   private UserGroupInformation ugi;
   private URI uri;
@@ -148,13 +131,8 @@ public class WebHdfsFileSystem extends F
     return "http";
   }
 
-  /**
-   * Initialize tokenAspect. This function is intended to
-   * be overridden by SWebHdfsFileSystem.
-   */
-  protected synchronized void initializeTokenAspect() {
-    tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
-        TOKEN_KIND);
+  protected Text getTokenKind() {
+    return TOKEN_KIND;
   }
 
   @Override
@@ -175,12 +153,14 @@ public class WebHdfsFileSystem extends F
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.nnAddrs = resolveNNAddr();
 
-    boolean isHA = HAUtil.isLogicalUri(conf, this.uri);
-    // In non-HA case, the code needs to call getCanonicalUri() in order to
-    // handle the case where no port is specified in the URI
-    this.tokenServiceName = isHA ? HAUtil.buildTokenServiceForLogicalUri(uri)
+    boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri);
+    boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri);
+    // In non-HA or non-logical URI case, the code needs to call
+    // getCanonicalUri() in order to handle the case where no port is
+    // specified in the URI
+    this.tokenServiceName = isLogicalUri ?
+        HAUtil.buildTokenServiceForLogicalUri(uri, getScheme())
         : SecurityUtil.buildTokenService(getCanonicalUri());
-    initializeTokenAspect();
 
     if (!isHA) {
       this.retryPolicy =
@@ -213,10 +193,8 @@ public class WebHdfsFileSystem extends F
     }
 
     this.workingDir = getHomeDirectory();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      tokenAspect.initDelegationToken(ugi);
-    }
+    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+    this.delegationToken = null;
   }
 
   @Override
@@ -231,11 +209,46 @@ public class WebHdfsFileSystem extends F
     return b;
   }
 
+  TokenSelector<DelegationTokenIdentifier> tokenSelector =
+      new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
+
+  // the first getAuthParams() for a non-token op will either get the
+  // internal token from the ugi or lazy fetch one
   protected synchronized Token<?> getDelegationToken() throws IOException {
-    tokenAspect.ensureTokenInitialized();
+    if (canRefreshDelegationToken && delegationToken == null) {
+      Token<?> token = tokenSelector.selectToken(
+          new Text(getCanonicalServiceName()), ugi.getTokens());
+      // ugi tokens are usually indicative of a task which can't
+      // refetch tokens.  even if ugi has credentials, don't attempt
+      // to get another token to match hdfs/rpc behavior
+      if (token != null) {
+        LOG.debug("Using UGI token: " + token);
+        canRefreshDelegationToken = false; 
+      } else {
+        token = getDelegationToken(null);
+        if (token != null) {
+          LOG.debug("Fetched new token: " + token);
+        } else { // security is disabled
+          canRefreshDelegationToken = false;
+        }
+      }
+      setDelegationToken(token);
+    }
     return delegationToken;
   }
 
+  @VisibleForTesting
+  synchronized boolean replaceExpiredDelegationToken() throws IOException {
+    boolean replaced = false;
+    if (canRefreshDelegationToken) {
+      Token<?> token = getDelegationToken(null);
+      LOG.debug("Replaced expired token: " + token);
+      setDelegationToken(token);
+      replaced = (token != null);
+    }
+    return replaced;
+  }
+
   @Override
   protected int getDefaultPort() {
     return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
@@ -306,8 +319,8 @@ public class WebHdfsFileSystem extends F
     final int code = conn.getResponseCode();
     // server is demanding an authentication we don't support
     if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
-      throw new IOException(
-          new AuthenticationException(conn.getResponseMessage()));
+      // match hdfs/rpc exception
+      throw new AccessControlException(conn.getResponseMessage());
     }
     if (code != op.getExpectedHttpResponseCode()) {
       final Map<?, ?> m;
@@ -327,7 +340,15 @@ public class WebHdfsFileSystem extends F
         return m;
       }
 
-      final RemoteException re = JsonUtil.toRemoteException(m);
+      IOException re = JsonUtil.toRemoteException(m);
+      // extract UGI-related exceptions and unwrap InvalidToken
+      // the NN mangles these exceptions but the DN does not and may need
+      // to re-fetch a token if either report the token is expired
+      if (re.getMessage().startsWith("Failed to obtain user group information:")) {
+        String[] parts = re.getMessage().split(":\\s+", 3);
+        re = new RemoteException(parts[1], parts[2]);
+        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
+      }
       throw unwrapException? toIOException(re): re;
     }
     return null;
@@ -362,8 +383,6 @@ public class WebHdfsFileSystem extends F
    */
   private synchronized void resetStateToFailOver() {
     currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
-    delegationToken = null;
-    tokenAspect.reset();
   }
 
   /**
@@ -389,7 +408,7 @@ public class WebHdfsFileSystem extends F
     // Skip adding delegation token for token operations because these
     // operations require authentication.
     Token<?> token = null;
-    if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) {
+    if (!op.getRequireAuth()) {
       token = getDelegationToken();
     }
     if (token != null) {
@@ -422,40 +441,24 @@ public class WebHdfsFileSystem extends F
   }
 
   /**
-   * Run a http operation.
-   * Connect to the http server, validate response, and obtain the JSON output.
-   * 
-   * @param op http operation
-   * @param fspath file system path
-   * @param parameters parameters for the operation
-   * @return a JSON object, e.g. Object[], Map<?, ?>, etc.
-   * @throws IOException
-   */
-  private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
-      final Param<?,?>... parameters) throws IOException {
-    return new FsPathRunner(op, fspath, parameters).run().json;
-  }
-
-  /**
    * This class is for initialing a HTTP connection, connecting to server,
    * obtaining a response, and also handling retry on failures.
    */
-  abstract class AbstractRunner {
+  abstract class AbstractRunner<T> {
     abstract protected URL getUrl() throws IOException;
 
     protected final HttpOpParam.Op op;
     private final boolean redirected;
+    protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
 
     private boolean checkRetry;
-    protected HttpURLConnection conn = null;
-    private Map<?, ?> json = null;
 
     protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
       this.op = op;
       this.redirected = redirected;
     }
 
-    AbstractRunner run() throws IOException {
+    T run() throws IOException {
       UserGroupInformation connectUgi = ugi.getRealUser();
       if (connectUgi == null) {
         connectUgi = ugi;
@@ -467,9 +470,9 @@ public class WebHdfsFileSystem extends F
         // the entire lifecycle of the connection must be run inside the
         // doAs to ensure authentication is performed correctly
         return connectUgi.doAs(
-            new PrivilegedExceptionAction<AbstractRunner>() {
+            new PrivilegedExceptionAction<T>() {
               @Override
-              public AbstractRunner run() throws IOException {
+              public T run() throws IOException {
                 return runWithRetry();
               }
             });
@@ -477,32 +480,97 @@ public class WebHdfsFileSystem extends F
         throw new IOException(e);
       }
     }
-    
-    private void init() throws IOException {
-      checkRetry = !redirected;
-      URL url = getUrl();
-      conn = (HttpURLConnection) connectionFactory.openConnection(url);
-    }
-    
-    private void connect() throws IOException {
-      connect(op.getDoOutput());
+
+    /**
+     * Two-step requests redirected to a DN
+     * 
+     * Create/Append:
+     * Step 1) Submit a Http request with neither auto-redirect nor data. 
+     * Step 2) Submit another Http request with the URL from the Location header with data.
+     * 
+     * The reason of having two-step create/append is for preventing clients to
+     * send out the data before the redirect. This issue is addressed by the
+     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+     * and Java 6 http client), which do not correctly implement "Expect:
+     * 100-continue". The two-step create/append is a temporary workaround for
+     * the software library bugs.
+     * 
+     * Open/Checksum
+     * Also implements two-step connects for other operations redirected to
+     * a DN such as open and checksum
+     */
+    private HttpURLConnection connect(URL url) throws IOException {
+      //redirect hostname and port
+      String redirectHost = null;
+
+      
+      // resolve redirects for a DN operation unless already resolved
+      if (op.getRedirect() && !redirected) {
+        final HttpOpParam.Op redirectOp =
+            HttpOpParam.TemporaryRedirectOp.valueOf(op);
+        final HttpURLConnection conn = connect(redirectOp, url);
+        // application level proxy like httpfs might not issue a redirect
+        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+          return conn;
+        }
+        try {
+          validateResponse(redirectOp, conn, false);
+          url = new URL(conn.getHeaderField("Location"));
+          redirectHost = url.getHost() + ":" + url.getPort();
+        } finally {
+          conn.disconnect();
+        }
+      }
+      try {
+        return connect(op, url);
+      } catch (IOException ioe) {
+        if (redirectHost != null) {
+          if (excludeDatanodes.getValue() != null) {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+                + excludeDatanodes.getValue());
+          } else {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+          }
+        }
+        throw ioe;
+      }      
     }
 
-    private void connect(boolean doOutput) throws IOException {
+    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+        throws IOException {
+      final HttpURLConnection conn =
+          (HttpURLConnection)connectionFactory.openConnection(url);
+      final boolean doOutput = op.getDoOutput();
       conn.setRequestMethod(op.getType().toString());
-      conn.setDoOutput(doOutput);
       conn.setInstanceFollowRedirects(false);
-      conn.connect();
-    }
-
-    private void disconnect() {
-      if (conn != null) {
-        conn.disconnect();
-        conn = null;
+      switch (op.getType()) {
+        // if not sending a message body for a POST or PUT operation, need
+        // to ensure the server/proxy knows this 
+        case POST:
+        case PUT: {
+          conn.setDoOutput(true);
+          if (!doOutput) {
+            // explicitly setting content-length to 0 won't do spnego!!
+            // opening and closing the stream will send "Content-Length: 0"
+            conn.getOutputStream().close();
+          } else {
+            conn.setRequestProperty("Content-Type",
+                MediaType.APPLICATION_OCTET_STREAM);
+            conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+          }
+          break;
+        }
+        default: {
+          conn.setDoOutput(doOutput);
+          break;
+        }
       }
+      conn.connect();
+      return conn;
     }
 
-    private AbstractRunner runWithRetry() throws IOException {
+    private T runWithRetry() throws IOException {
       /**
        * Do the real work.
        *
@@ -520,19 +588,26 @@ public class WebHdfsFileSystem extends F
        * examines the exception and swallows it if it decides to rerun the work.
        */
       for(int retry = 0; ; retry++) {
+        checkRetry = !redirected;
+        final URL url = getUrl();
         try {
-          init();
-          if (op.getDoOutput()) {
-            twoStepWrite();
-          } else {
-            getResponse(op != GetOpParam.Op.OPEN);
+          final HttpURLConnection conn = connect(url);
+          // output streams will validate on close
+          if (!op.getDoOutput()) {
+            validateResponse(op, conn, false);
           }
-          return this;
-        } catch(IOException ioe) {
-          Throwable cause = ioe.getCause();
-          if (cause != null && cause instanceof AuthenticationException) {
-            throw ioe; // no retries for auth failures
+          return getResponse(conn);
+        } catch (AccessControlException ace) {
+          // no retries for auth failures
+          throw ace;
+        } catch (InvalidToken it) {
+          // try to replace the expired token with a new one.  the attempt
+          // to acquire a new token must be outside this operation's retry
+          // so if it fails after its own retries, this operation fails too.
+          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
+            throw it;
           }
+        } catch (IOException ioe) {
           shouldRetry(ioe, retry);
         }
       }
@@ -570,87 +645,159 @@ public class WebHdfsFileSystem extends F
       throw toIOException(ioe);
     }
 
-    /**
-     * Two-step Create/Append:
-     * Step 1) Submit a Http request with neither auto-redirect nor data. 
-     * Step 2) Submit another Http request with the URL from the Location header with data.
-     * 
-     * The reason of having two-step create/append is for preventing clients to
-     * send out the data before the redirect. This issue is addressed by the
-     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
-     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
-     * and Java 6 http client), which do not correctly implement "Expect:
-     * 100-continue". The two-step create/append is a temporary workaround for
-     * the software library bugs.
-     */
-    HttpURLConnection twoStepWrite() throws IOException {
-      //Step 1) Submit a Http request with neither auto-redirect nor data. 
-      connect(false);
-      validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
-      final String redirect = conn.getHeaderField("Location");
-      disconnect();
-      checkRetry = false;
-      
-      //Step 2) Submit another Http request with the URL from the Location header with data.
-      conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
-          redirect));
-      conn.setRequestProperty("Content-Type",
-          MediaType.APPLICATION_OCTET_STREAM);
-      conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
-      connect();
-      return conn;
+    abstract T getResponse(HttpURLConnection conn) throws IOException;
+  }
+
+  /**
+   * Abstract base class to handle path-based operations with params
+   */
+  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+    private final Path fspath;
+    private final Param<?,?>[] parameters;
+    
+    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+    
+    AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+        final Path fspath) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+    
+    @Override
+    protected URL getUrl() throws IOException {
+      if (excludeDatanodes.getValue() != null) {
+        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+        tmpParam[parameters.length] = excludeDatanodes;
+        return toUrl(op, fspath, tmpParam);
+      } else {
+        return toUrl(op, fspath, parameters);
+      }
     }
+  }
 
-    FSDataOutputStream write(final int bufferSize) throws IOException {
-      return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+  /**
+   * Default path-based implementation expects no json response
+   */
+  class FsPathRunner extends AbstractFsPathRunner<Void> {
+    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    
+    @Override
+    Void getResponse(HttpURLConnection conn) throws IOException {
+      return null;
     }
+  }
 
-    void getResponse(boolean getJsonAndDisconnect) throws IOException {
+  /**
+   * Handle path-based operations with a json response
+   */
+  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    
+    FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+        final Path fspath) {
+      super(op, parameters, fspath);
+    }
+    
+    @Override
+    final T getResponse(HttpURLConnection conn) throws IOException {
       try {
-        connect();
-        final int code = conn.getResponseCode();
-        if (!redirected && op.getRedirect()
-            && code != op.getExpectedHttpResponseCode()) {
-          final String redirect = conn.getHeaderField("Location");
-          json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
-              conn, false);
-          disconnect();
-  
-          checkRetry = false;
-          conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
-              redirect));
-          connect();
+        final Map<?,?> json = jsonParse(conn, false);
+        if (json == null) {
+          // match exception class thrown by parser
+          throw new IllegalStateException("Missing response");
         }
-
-        json = validateResponse(op, conn, false);
-        if (json == null && getJsonAndDisconnect) {
-          json = jsonParse(conn, false);
+        return decodeResponse(json);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) { // catch json parser errors
+        final IOException ioe =
+            new IOException("Response decoding failure: "+e.toString(), e);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(ioe);
         }
+        throw ioe;
       } finally {
-        if (getJsonAndDisconnect) {
-          disconnect();
-        }
+        conn.disconnect();
       }
     }
+    
+    abstract T decodeResponse(Map<?,?> json) throws IOException;
   }
 
-  final class FsPathRunner extends AbstractRunner {
-    private final Path fspath;
-    private final Param<?, ?>[] parameters;
-
-    FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
-      super(op, false);
-      this.fspath = fspath;
-      this.parameters = parameters;
+  /**
+   * Handle path-based operations with json boolean response
+   */
+  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    
+    @Override
+    Boolean decodeResponse(Map<?,?> json) throws IOException {
+      return (Boolean)json.get("boolean");
     }
+  }
 
+  /**
+   * Handle create/append output streams
+   */
+  class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+    private final int bufferSize;
+    
+    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+      this.bufferSize = bufferSize;
+    }
+    
     @Override
-    protected URL getUrl() throws IOException {
-      return toUrl(op, fspath, parameters);
+    FSDataOutputStream getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return new FSDataOutputStream(new BufferedOutputStream(
+          conn.getOutputStream(), bufferSize), statistics) {
+        @Override
+        public void close() throws IOException {
+          try {
+            super.close();
+          } finally {
+            try {
+              validateResponse(op, conn, true);
+            } finally {
+              conn.disconnect();
+            }
+          }
+        }
+      };
     }
   }
 
-  final class URLRunner extends AbstractRunner {
+  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
+    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    @Override
+    HttpURLConnection getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return conn;
+    }
+  }
+  
+  /**
+   * Used by open() which tracks the resolved url itself
+   */
+  final class URLRunner extends AbstractRunner<HttpURLConnection> {
     private final URL url;
     @Override
     protected URL getUrl() {
@@ -661,6 +808,11 @@ public class WebHdfsFileSystem extends F
       super(op, redirected);
       this.url = url;
     }
+
+    @Override
+    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+      return conn;
+    }
   }
 
   private FsPermission applyUMask(FsPermission permission) {
@@ -672,8 +824,12 @@ public class WebHdfsFileSystem extends F
 
   private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
-    final Map<?, ?> json = run(op, f);
-    final HdfsFileStatus status = JsonUtil.toFileStatus(json, true);
+    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+      @Override
+      HdfsFileStatus decodeResponse(Map<?,?> json) {
+        return JsonUtil.toFileStatus(json, true);
+      }
+    }.run();
     if (status == null) {
       throw new FileNotFoundException("File does not exist: " + f);
     }
@@ -697,8 +853,12 @@ public class WebHdfsFileSystem extends F
   @Override
   public AclStatus getAclStatus(Path f) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
-    final Map<?, ?> json = run(op, f);
-    AclStatus status = JsonUtil.toAclStatus(json);
+    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+      @Override
+      AclStatus decodeResponse(Map<?,?> json) {
+        return JsonUtil.toAclStatus(json);
+      }
+    }.run();
     if (status == null) {
       throw new FileNotFoundException("File does not exist: " + f);
     }
@@ -709,9 +869,9 @@ public class WebHdfsFileSystem extends F
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
-    final Map<?, ?> json = run(op, f,
-        new PermissionParam(applyUMask(permission)));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, f,
+        new PermissionParam(applyUMask(permission))
+    ).run();
   }
 
   /**
@@ -722,17 +882,19 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
-    run(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()),
-        new CreateParentParam(createParent));
+    new FsPathRunner(op, f,
+        new DestinationParam(makeQualified(destination).toUri().getPath()),
+        new CreateParentParam(createParent)
+    ).run();
   }
 
   @Override
   public boolean rename(final Path src, final Path dst) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    final Map<?, ?> json = run(op, src,
-        new DestinationParam(makeQualified(dst).toUri().getPath()));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath())
+    ).run();
   }
 
   @SuppressWarnings("deprecation")
@@ -741,8 +903,87 @@ public class WebHdfsFileSystem extends F
       final Options.Rename... options) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    run(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()),
-        new RenameOptionSetParam(options));
+    new FsPathRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath()),
+        new RenameOptionSetParam(options)
+    ).run();
+  }
+  
+  @Override
+  public void setXAttr(Path p, String name, byte[] value, 
+      EnumSet<XAttrSetFlag> flag) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
+    if (value != null) {
+      new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
+          XAttrCodec.encodeValue(value, XAttrCodec.HEX)), 
+          new XAttrSetFlagParam(flag)).run();
+    } else {
+      new FsPathRunner(op, p, new XAttrNameParam(name), 
+          new XAttrSetFlagParam(flag)).run();
+    }
+  }
+  
+  @Override
+  public byte[] getXAttr(Path p, final String name) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name), 
+        new XAttrEncodingParam(XAttrCodec.HEX)) {
+      @Override
+      byte[] decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtil.getXAttr(json, name);
+      }
+    }.run();
+  }
+  
+  @Override
+  public Map<String, byte[]> getXAttrs(Path p) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<Map<String, byte[]>>(op, p, 
+        new XAttrEncodingParam(XAttrCodec.HEX)) {
+      @Override
+      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtil.toXAttrs(json);
+      }
+    }.run();
+  }
+  
+  @Override
+  public Map<String, byte[]> getXAttrs(Path p, final List<String> names) 
+      throws IOException {
+    Preconditions.checkArgument(names != null && !names.isEmpty(), 
+        "XAttr names cannot be null or empty.");
+    Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
+    for (int i = 0; i < parameters.length - 1; i++) {
+      parameters[i] = new XAttrNameParam(names.get(i));
+    }
+    parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
+    
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
+      @Override
+      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtil.toXAttrs(json);
+      }
+    }.run();
+  }
+  
+  @Override
+  public List<String> listXAttrs(Path p) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
+    return new FsPathResponseRunner<List<String>>(op, p) {
+      @Override
+      List<String> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtil.toXAttrNames(json);
+      }
+    }.run();
+  }
+
+  @Override
+  public void removeXAttr(Path p, String name) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
+    new FsPathRunner(op, p, new XAttrNameParam(name)).run();
   }
 
   @Override
@@ -754,7 +995,9 @@ public class WebHdfsFileSystem extends F
 
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
-    run(op, p, new OwnerParam(owner), new GroupParam(group));
+    new FsPathRunner(op, p,
+        new OwnerParam(owner), new GroupParam(group)
+    ).run();
   }
 
   @Override
@@ -762,7 +1005,7 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
-    run(op, p, new PermissionParam(permission));
+    new FsPathRunner(op, p,new PermissionParam(permission)).run();
   }
 
   @Override
@@ -770,7 +1013,7 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
-    run(op, path, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
 
   @Override
@@ -778,21 +1021,21 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
-    run(op, path, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
   }
 
   @Override
   public void removeDefaultAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
-    run(op, path);
+    new FsPathRunner(op, path).run();
   }
 
   @Override
   public void removeAcl(Path path) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
-    run(op, path);
+    new FsPathRunner(op, path).run();
   }
 
   @Override
@@ -800,7 +1043,39 @@ public class WebHdfsFileSystem extends F
       throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETACL;
-    run(op, p, new AclPermissionParam(aclSpec));
+    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
+  }
+
+  @Override
+  public Path createSnapshot(final Path path, final String snapshotName) 
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
+    Path spath = new FsPathResponseRunner<Path>(op, path,
+        new SnapshotNameParam(snapshotName)) {
+      @Override
+      Path decodeResponse(Map<?,?> json) {
+        return new Path((String) json.get(Path.class.getSimpleName()));
+      }
+    }.run();
+    return spath;
+  }
+
+  @Override
+  public void deleteSnapshot(final Path path, final String snapshotName)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
+    new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
+  }
+
+  @Override
+  public void renameSnapshot(final Path path, final String snapshotOldName,
+      final String snapshotNewName) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
+    new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
+        new SnapshotNameParam(snapshotNewName)).run();
   }
 
   @Override
@@ -808,8 +1083,9 @@ public class WebHdfsFileSystem extends F
      ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
-    final Map<?, ?> json = run(op, p, new ReplicationParam(replication));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, p,
+        new ReplicationParam(replication)
+    ).run();
   }
 
   @Override
@@ -817,7 +1093,10 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
-    run(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime));
+    new FsPathRunner(op, p,
+        new ModificationTimeParam(mtime),
+        new AccessTimeParam(atime)
+    ).run();
   }
 
   @Override
@@ -832,32 +1111,11 @@ public class WebHdfsFileSystem extends F
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
   }
 
-  FSDataOutputStream write(final HttpOpParam.Op op,
-      final HttpURLConnection conn, final int bufferSize) throws IOException {
-    return new FSDataOutputStream(new BufferedOutputStream(
-        conn.getOutputStream(), bufferSize), statistics) {
-      @Override
-      public void close() throws IOException {
-        try {
-          super.close();
-        } finally {
-          try {
-            validateResponse(op, conn, true);
-          } finally {
-            conn.disconnect();
-          }
-        }
-      }
-    };
-  }
-
   @Override
   public void concat(final Path trg, final Path [] srcs) throws IOException {
     statistics.incrementWriteOps(1);
     final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
-
-    ConcatSourcesParam param = new ConcatSourcesParam(srcs);
-    run(op, trg, param);
+    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
   }
 
   @Override
@@ -867,14 +1125,13 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
-    return new FsPathRunner(op, f,
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
         new PermissionParam(applyUMask(permission)),
         new OverwriteParam(overwrite),
         new BufferSizeParam(bufferSize),
         new ReplicationParam(replication),
-        new BlockSizeParam(blockSize))
-      .run()
-      .write(bufferSize);
+        new BlockSizeParam(blockSize)
+    ).run();
   }
 
   @Override
@@ -883,16 +1140,17 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PostOpParam.Op.APPEND;
-    return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
-      .run()
-      .write(bufferSize);
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
+        new BufferSizeParam(bufferSize)
+    ).run();
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
     final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
-    final Map<?, ?> json = run(op, f, new RecursiveParam(recursive));
-    return (Boolean)json.get("boolean");
+    return new FsPathBooleanRunner(op, f,
+        new RecursiveParam(recursive)
+    ).run();
   }
 
   @Override
@@ -900,16 +1158,41 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementReadOps(1);
     final HttpOpParam.Op op = GetOpParam.Op.OPEN;
-    final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
+    // use a runner so the open can recover from an invalid token
+    FsPathConnectionRunner runner =
+        new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
     return new FSDataInputStream(new OffsetUrlInputStream(
-        new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
+        new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
   }
 
   @Override
-  public void close() throws IOException {
-    super.close();
-    synchronized (this) {
-      tokenAspect.removeRenewAction();
+  public synchronized void close() throws IOException {
+    try {
+      if (canRefreshDelegationToken && delegationToken != null) {
+        cancelDelegationToken(delegationToken);
+      }
+    } catch (IOException ioe) {
+      LOG.debug("Token cancel failed: "+ioe);
+    } finally {
+      super.close();
+    }
+  }
+
+  // use FsPathConnectionRunner to ensure retries for InvalidTokens
+  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
+    private final FsPathConnectionRunner runner;
+    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
+      super(null);
+      this.runner = runner;
+    }
+
+    @Override
+    protected HttpURLConnection connect(long offset, boolean resolved)
+        throws IOException {
+      assert offset == 0;
+      HttpURLConnection conn = runner.run();
+      setURL(conn.getURL());
+      return conn;
     }
   }
 
@@ -924,7 +1207,7 @@ public class WebHdfsFileSystem extends F
         final boolean resolved) throws IOException {
       final URL offsetUrl = offset == 0L? url
           : new URL(url + "&" + new OffsetParam(offset));
-      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
+      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
     }  
   }
 
@@ -962,7 +1245,7 @@ public class WebHdfsFileSystem extends F
   }
 
   static class OffsetUrlInputStream extends ByteRangeInputStream {
-    OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r)
+    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
         throws IOException {
       super(o, r);
     }
@@ -980,25 +1263,36 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
-    final Map<?, ?> json  = run(op, f);
-    final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
-    final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
-
-    //convert FileStatus
-    final FileStatus[] statuses = new FileStatus[array.length];
-    for(int i = 0; i < array.length; i++) {
-      final Map<?, ?> m = (Map<?, ?>)array[i];
-      statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
-    }
-    return statuses;
+    return new FsPathResponseRunner<FileStatus[]>(op, f) {
+      @Override
+      FileStatus[] decodeResponse(Map<?,?> json) {
+        final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+        final Object[] array = (Object[])rootmap.get(FileStatus.class.getSimpleName());
+
+        //convert FileStatus
+        final FileStatus[] statuses = new FileStatus[array.length];
+        for (int i = 0; i < array.length; i++) {
+          final Map<?, ?> m = (Map<?, ?>)array[i];
+          statuses[i] = makeQualified(JsonUtil.toFileStatus(m, false), f);
+        }
+        return statuses;
+      }
+    }.run();
   }
 
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(
       final String renewer) throws IOException {
     final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
-    final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
-    final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
+    Token<DelegationTokenIdentifier> token =
+        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+            op, null, new RenewerParam(renewer)) {
+      @Override
+      Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+          throws IOException {
+        return JsonUtil.toDelegationToken(json);
+      }
+    }.run();
     token.setService(tokenServiceName);
     return token;
   }
@@ -1020,19 +1314,22 @@ public class WebHdfsFileSystem extends F
   public synchronized long renewDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
-    TokenArgumentParam dtargParam = new TokenArgumentParam(
-        token.encodeToUrlString());
-    final Map<?, ?> m = run(op, null, dtargParam);
-    return (Long) m.get("long");
+    return new FsPathResponseRunner<Long>(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())) {
+      @Override
+      Long decodeResponse(Map<?,?> json) throws IOException {
+        return (Long) json.get("long");
+      }
+    }.run();
   }
 
   @Override
   public synchronized void cancelDelegationToken(final Token<?> token
       ) throws IOException {
     final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
-    TokenArgumentParam dtargParam = new TokenArgumentParam(
-        token.encodeToUrlString());
-    run(op, null, dtargParam);
+    new FsPathRunner(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())
+    ).run();
   }
   
   @Override
@@ -1050,9 +1347,20 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
-    final Map<?, ?> m = run(op, p, new OffsetParam(offset),
-        new LengthParam(length));
-    return DFSUtil.locatedBlocks2Locations(JsonUtil.toLocatedBlocks(m));
+    return new FsPathResponseRunner<BlockLocation[]>(op, p,
+        new OffsetParam(offset), new LengthParam(length)) {
+      @Override
+      BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+        return DFSUtil.locatedBlocks2Locations(
+            JsonUtil.toLocatedBlocks(json));
+      }
+    }.run();
+  }
+
+  @Override
+  public void access(final Path path, final FsAction mode) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
+    new FsPathRunner(op, path, new FsActionParam(mode)).run();
   }
 
   @Override
@@ -1060,8 +1368,12 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
 
     final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
-    final Map<?, ?> m = run(op, p);
-    return JsonUtil.toContentSummary(m);
+    return new FsPathResponseRunner<ContentSummary>(op, p) {
+      @Override
+      ContentSummary decodeResponse(Map<?,?> json) {
+        return JsonUtil.toContentSummary(json);        
+      }
+    }.run();
   }
 
   @Override
@@ -1070,15 +1382,19 @@ public class WebHdfsFileSystem extends F
     statistics.incrementReadOps(1);
   
     final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
-    final Map<?, ?> m = run(op, p);
-    return JsonUtil.toMD5MD5CRC32FileChecksum(m);
+    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+      @Override
+      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+        return JsonUtil.toMD5MD5CRC32FileChecksum(json);
+      }
+    }.run();
   }
 
   /**
    * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
    * resolver when the URL points to an non-HA cluster. When the URL points to
-   * an HA cluster, the resolver further resolves the logical name (i.e., the
-   * authority in the URL) into real namenode addresses.
+   * an HA cluster with its logical name, the resolver further resolves the
+   * logical name(i.e., the authority in the URL) into real namenode addresses.
    */
   private InetSocketAddress[] resolveNNAddr() throws IOException {
     Configuration conf = getConf();
@@ -1095,10 +1411,10 @@ public class WebHdfsFileSystem extends F
       Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
           .getHaNnWebHdfsAddresses(conf, scheme);
 
-      for (Map<String, InetSocketAddress> addrs : addresses.values()) {
-        for (InetSocketAddress addr : addrs.values()) {
-          ret.add(addr);
-        }
+      // Extract the entry corresponding to the logical name.
+      Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
+      for (InetSocketAddress addr : addrs.values()) {
+        ret.add(addr);
       }
     }
 
@@ -1111,4 +1427,9 @@ public class WebHdfsFileSystem extends F
     return tokenServiceName == null ? super.getCanonicalServiceName()
         : tokenServiceName.toString();
   }
+
+  @VisibleForTesting
+  InetSocketAddress[] getResolvedNNAddr() {
+    return nnAddrs;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/AclPermissionParam.java Tue Aug 19 23:49:39 2014
@@ -60,10 +60,7 @@ public class AclPermissionParam extends 
   }
 
   /**
-   * Parse the list of AclEntry and returns aclspec.
-   * 
-   * @param List <AclEntry>
-   * @return String
+   * @return parse {@code aclEntry} and return aclspec
    */
   private static String parseAclSpec(List<AclEntry> aclEntry) {
     return StringUtils.join(aclEntry, ",");

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ public class DeleteOpParam extends HttpO
   /** Delete operations. */
   public static enum Op implements HttpOpParam.Op {
     DELETE(HttpURLConnection.HTTP_OK),
+    DELETESNAPSHOT(HttpURLConnection.HTTP_OK),
 
     NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java Tue Aug 19 23:49:39 2014
@@ -79,8 +79,8 @@ abstract class EnumSetParam<E extends En
       final EnumSet<E> set = EnumSet.noneOf(enumClass);
       if (!str.isEmpty()) {
         for(int i, j = 0; j >= 0; ) {
-          i = j;
-          j = str.indexOf(',', i+1);
+          i = j > 0 ? j + 1 : 0;
+          j = str.indexOf(',', i);
           final String sub = j >= 0? str.substring(i, j): str.substring(i);
           set.add(Enum.valueOf(enumClass, sub.trim().toUpperCase()));
         }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java Tue Aug 19 23:49:39 2014
@@ -31,8 +31,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.ParamException;
 import com.sun.jersey.api.container.ContainerException;
 
@@ -42,9 +45,22 @@ public class ExceptionHandler implements
   public static final Log LOG = LogFactory.getLog(ExceptionHandler.class);
 
   private static Exception toCause(Exception e) {
-    final Throwable t = e.getCause();
-    if (t != null && t instanceof Exception) {
-      e = (Exception)e.getCause();
+    final Throwable t = e.getCause();    
+    if (e instanceof SecurityException) {
+      // For the issue reported in HDFS-6475, if SecurityException's cause
+      // is InvalidToken, and the InvalidToken's cause is StandbyException,
+      // return StandbyException; Otherwise, leave the exception as is,
+      // since they are handled elsewhere. See HDFS-6588.
+      if (t != null && t instanceof InvalidToken) {
+        final Throwable t1 = t.getCause();
+        if (t1 != null && t1 instanceof StandbyException) {
+          e = (StandbyException)t1;
+        }
+      }
+    } else {
+      if (t != null && t instanceof Exception) {
+        e = (Exception)t;
+      }
     }
     return e;
   }
@@ -74,6 +90,10 @@ public class ExceptionHandler implements
       e = ((RemoteException)e).unwrapRemoteException();
     }
 
+    if (e instanceof SecurityException) {
+      e = toCause(e);
+    }
+    
     //Map response status
     final Response.Status s;
     if (e instanceof SecurityException) {
@@ -96,4 +116,9 @@ public class ExceptionHandler implements
     final String js = JsonUtil.toJsonString(e);
     return Response.status(s).type(MediaType.APPLICATION_JSON).entity(js).build();
   }
+  
+  @VisibleForTesting
+  public void initResponse(HttpServletResponse response) {
+    this.response = response;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Tue Aug 19 23:49:39 2014
@@ -36,8 +36,12 @@ public class GetOpParam extends HttpOpPa
     /** GET_BLOCK_LOCATIONS is a private unstable op. */
     GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
     GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
+    GETXATTRS(false, HttpURLConnection.HTTP_OK),
+    LISTXATTRS(false, HttpURLConnection.HTTP_OK),
 
-    NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+    NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
+
+    CHECKACCESS(false, HttpURLConnection.HTTP_OK);
 
     final boolean redirect;
     final int expectedHttpResponseCode;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Tue Aug 19 23:49:39 2014
@@ -71,8 +71,8 @@ public abstract class HttpOpParam<E exte
         GetOpParam.Op.GETFILECHECKSUM);
     
     static final List<TemporaryRedirectOp> values
-        = Collections.unmodifiableList(Arrays.asList(
-            new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
+        = Collections.unmodifiableList(Arrays.asList(CREATE, APPEND, OPEN,
+                                       GETFILECHECKSUM));
 
     /** Get an object for the given op. */
     public static TemporaryRedirectOp valueOf(final Op op) {
@@ -102,7 +102,7 @@ public abstract class HttpOpParam<E exte
 
     @Override
     public boolean getDoOutput() {
-      return op.getDoOutput();
+      return false;
     }
 
     @Override

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java Tue Aug 19 23:49:39 2014
@@ -43,6 +43,12 @@ public class PutOpParam extends HttpOpPa
     REMOVEACL(false, HttpURLConnection.HTTP_OK),
     SETACL(false, HttpURLConnection.HTTP_OK),
     
+    SETXATTR(false, HttpURLConnection.HTTP_OK), 
+    REMOVEXATTR(false, HttpURLConnection.HTTP_OK),
+
+    CREATESNAPSHOT(false, HttpURLConnection.HTTP_OK),
+    RENAMESNAPSHOT(false, HttpURLConnection.HTTP_OK),
+    
     NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 
     final boolean doOutputAndRedirect;

Propchange: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-2006/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1588992-1596568
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1582150-1619000

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt Tue Aug 19 23:49:39 2014
@@ -37,6 +37,10 @@ ELSE (${CMAKE_SYSTEM_NAME} MATCHES "Linu
 ENDIF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
 
 IF(FUSE_FOUND)
+    add_library(posix_util
+        ../util/posix_util.c
+    )
+
     add_executable(fuse_dfs
         fuse_dfs.c
         fuse_options.c 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_trash.c Tue Aug 19 23:49:39 2014
@@ -16,111 +16,228 @@
  * limitations under the License.
  */
 
-
 #include <hdfs.h>
+#include <inttypes.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
 #include <strings.h>
 
+#include "fuse_context_handle.h"
 #include "fuse_dfs.h"
 #include "fuse_trash.h"
-#include "fuse_context_handle.h"
-
-
-const char *const TrashPrefixDir = "/user/root/.Trash";
-const char *const TrashDir = "/user/root/.Trash/Current";
+#include "fuse_users.h"
 
 #define TRASH_RENAME_TRIES  100
+#define ALREADY_IN_TRASH_ERR 9000
 
-//
-// NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
-//
-
-int move_to_trash(const char *item, hdfsFS userFS) {
-
-  // retrieve dfs specific data
-  dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
-
-  // check params and the context var
-  assert(item);
-  assert(dfs);
-  assert('/' == *item);
-  assert(rindex(item,'/') >= 0);
-
-
-  char fname[4096]; // or last element of the directory path
-  char parent_dir[4096]; // the directory the fname resides in
-
-  if (strlen(item) > sizeof(fname) - strlen(TrashDir)) {
-    ERROR("Buffer too small to accomodate path of len %d", (int)strlen(item));
-    return -EIO;
+/**
+ * Split a path into a parent directory and a base path component.
+ *
+ * @param abs_path    The absolute path.
+ * @param pcomp       (out param) Will be set to the last path component.
+ *                        Malloced.
+ * @param parent_dir  (out param) Will be set to the parent directory.
+ *                        Malloced.
+ *
+ * @return            0 on success.
+ *                    On success, both *pcomp and *parent_dir will contain
+ *                    malloc'ed strings.
+ *                    EINVAL if the path wasn't absolute.
+ *                    EINVAL if there is no parent directory (i.e. abs_path=/)
+ *                    ENOMEM if we ran out of memory.
+ */
+static int get_parent_dir(const char *abs_path, char **pcomp,
+                          char **parent_dir)
+{
+  int ret;
+  char *pdir = NULL, *pc = NULL, *last_slash;
+
+  pdir = strdup(abs_path);
+  if (!pdir) {
+    ret = ENOMEM;
+    goto done;
+  }
+  last_slash = rindex(pdir, '/');
+  if (!last_slash) {
+    ERROR("get_parent_dir(%s): expected absolute path.\n", abs_path);
+    ret = EINVAL;
+    goto done;
+  }
+  if (last_slash[1] == '\0') {
+    *last_slash = '\0';
+    last_slash = rindex(pdir, '/');
+    if (!last_slash) {
+      ERROR("get_parent_dir(%s): there is no parent dir.\n", abs_path);
+      ret = EINVAL;
+      goto done;
+    }
   }
-
-  // separate the file name and the parent directory of the item to be deleted
-  {
-    int length_of_parent_dir = rindex(item, '/') - item ;
-    int length_of_fname = strlen(item) - length_of_parent_dir - 1; // the '/'
-
-    // note - the below strncpys should be safe from overflow because of the check on item's string length above.
-    strncpy(parent_dir, item, length_of_parent_dir);
-    parent_dir[length_of_parent_dir ] = 0;
-    strncpy(fname, item + length_of_parent_dir + 1, strlen(item));
-    fname[length_of_fname + 1] = 0;
+  pc = strdup(last_slash + 1);
+  if (!pc) {
+    ret = ENOMEM;
+    goto done;
+  }
+  *last_slash = '\0';
+  ret = 0;
+done:
+  if (ret) {
+    free(pdir);
+    free(pc);
+    return ret;
   }
+  *pcomp = pc;
+  *parent_dir = pdir;
+  return 0;
+}
 
-  // create the target trash directory
-  char trash_dir[4096];
-  if (snprintf(trash_dir, sizeof(trash_dir), "%s%s", TrashDir, parent_dir) 
-      >= sizeof trash_dir) {
-    ERROR("Move to trash error target not big enough for %s", item);
-    return -EIO;
+/**
+ * Get the base path to the trash.  This will depend on the user ID.
+ * For example, a user whose ID maps to 'foo' will get back the path
+ * "/user/foo/.Trash/Current".
+ *
+ * @param trash_base       (out param) the base path to the trash.
+ *                             Malloced.
+ *
+ * @return                 0 on success; error code otherwise.
+ */
+static int get_trash_base(char **trash_base)
+{
+  const char * const PREFIX = "/user/";
+  const char * const SUFFIX = "/.Trash/Current";
+  char *user_name = NULL, *base = NULL;
+  uid_t uid = fuse_get_context()->uid;
+  int ret;
+
+  user_name = getUsername(uid);
+  if (!user_name) {
+    ERROR("get_trash_base(): failed to get username for uid %"PRId64"\n",
+          (uint64_t)uid);
+    ret = EIO;
+    goto done;
+  }
+  if (asprintf(&base, "%s%s%s", PREFIX, user_name, SUFFIX) < 0) {
+    base = NULL;
+    ret = ENOMEM;
+    goto done;
+  }
+  ret = 0;
+done:
+  free(user_name);
+  if (ret) {
+    free(base);
+    return ret;
   }
+  *trash_base = base;
+  return 0;
+}
 
+//
+// NOTE: this function is a c implementation of org.apache.hadoop.fs.Trash.moveToTrash(Path path).
+//
+int move_to_trash(const char *abs_path, hdfsFS userFS)
+{
+  int ret;
+  char *pcomp = NULL, *parent_dir = NULL, *trash_base = NULL;
+  char *target_dir = NULL, *target = NULL;
+
+  ret = get_parent_dir(abs_path, &pcomp, &parent_dir);
+  if (ret) {
+    goto done;
+  }
+  ret = get_trash_base(&trash_base);
+  if (ret) {
+    goto done;
+  }
+  if (!strncmp(trash_base, abs_path, strlen(trash_base))) {
+    INFO("move_to_trash(%s): file is already in the trash; deleting.",
+         abs_path);
+    ret = ALREADY_IN_TRASH_ERR;
+    goto done;
+  }
+  fprintf(stderr, "trash_base='%s'\n", trash_base);
+  if (asprintf(&target_dir, "%s%s", trash_base, parent_dir) < 0) {
+    ret = ENOMEM;
+    target_dir = NULL;
+    goto done;
+  }
+  if (asprintf(&target, "%s/%s", target_dir, pcomp) < 0) {
+    ret = ENOMEM;
+    target = NULL;
+    goto done;
+  }
   // create the target trash directory in trash (if needed)
-  if ( hdfsExists(userFS, trash_dir)) {
+  if (hdfsExists(userFS, target_dir) != 0) {
     // make the directory to put it in in the Trash - NOTE
     // hdfsCreateDirectory also creates parents, so Current will be created if it does not exist.
-    if (hdfsCreateDirectory(userFS, trash_dir)) {
-      return -EIO;
+    if (hdfsCreateDirectory(userFS, target_dir)) {
+      ret = errno;
+      ERROR("move_to_trash(%s) error: hdfsCreateDirectory(%s) failed with error %d",
+            abs_path, target_dir, ret);
+      goto done;
     }
-  }
-
-  //
-  // if the target path in Trash already exists, then append with
-  // a number. Start from 1.
-  //
-  char target[4096];
-  int j ;
-  if ( snprintf(target, sizeof target,"%s/%s",trash_dir, fname) >= sizeof target) {
-    ERROR("Move to trash error target not big enough for %s", item);
-    return -EIO;
-  }
-
-  // NOTE: this loop differs from the java version by capping the #of tries
-  for (j = 1; ! hdfsExists(userFS, target) && j < TRASH_RENAME_TRIES ; j++) {
-    if (snprintf(target, sizeof target,"%s/%s.%d",trash_dir, fname, j) >= sizeof target) {
-      ERROR("Move to trash error target not big enough for %s", item);
-      return -EIO;
+  } else if (hdfsExists(userFS, target) == 0) {
+    // If there is already a file in the trash with this path, append a number.
+    int idx;
+    for (idx = 1; idx < TRASH_RENAME_TRIES; idx++) {
+      free(target);
+      if (asprintf(&target, "%s%s.%d", target_dir, pcomp, idx) < 0) {
+        target = NULL;
+        ret = ENOMEM;
+        goto done;
+      }
+      if (hdfsExists(userFS, target) != 0) {
+        break;
+      }
+    }
+    if (idx == TRASH_RENAME_TRIES) {
+      ERROR("move_to_trash(%s) error: there are already %d files in the trash "
+            "with this name.\n", abs_path, TRASH_RENAME_TRIES);
+      ret = EINVAL;
+      goto done;
     }
   }
-  if (hdfsRename(userFS, item, target)) {
-    ERROR("Trying to rename %s to %s", item, target);
-    return -EIO;
-  }
-  return 0;
-} 
-
-
-int hdfsDeleteWithTrash(hdfsFS userFS, const char *path, int useTrash) {
+  if (hdfsRename(userFS, abs_path, target)) {
+    ret = errno;
+    ERROR("move_to_trash(%s): failed to rename the file to %s: error %d",
+          abs_path, target, ret);
+    goto done;
+  }
+
+  ret = 0;
+done:
+  if ((ret != 0) && (ret != ALREADY_IN_TRASH_ERR)) {
+    ERROR("move_to_trash(%s) failed with error %d", abs_path, ret);
+  }
+  free(pcomp);
+  free(parent_dir);
+  free(trash_base);
+  free(target_dir);
+  free(target);
+  return ret;
+}
 
-  // move the file to the trash if this is enabled and its not actually in the trash.
-  if (useTrash && strncmp(path, TrashPrefixDir, strlen(TrashPrefixDir)) != 0) {
-    int ret= move_to_trash(path, userFS);
-    return ret;
+int hdfsDeleteWithTrash(hdfsFS userFS, const char *path, int useTrash)
+{
+  int tried_to_move_to_trash = 0;
+  if (useTrash) {
+    tried_to_move_to_trash = 1;
+    if (move_to_trash(path, userFS) == 0) {
+      return 0;
+    }
   }
-
   if (hdfsDelete(userFS, path, 1)) {
-    ERROR("Trying to delete the file %s", path);
-    return -EIO;
+    int err = errno;
+    if (err < 0) {
+      err = -err;
+    }
+    ERROR("hdfsDeleteWithTrash(%s): hdfsDelete failed: error %d.",
+          path, err);
+    return -err;
+  }
+  if (tried_to_move_to_trash) {
+    ERROR("hdfsDeleteWithTrash(%s): deleted the file instead.\n", path);
   }
-
   return 0;
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c Tue Aug 19 23:49:39 2014
@@ -19,8 +19,8 @@
 #include "exception.h"
 #include "hdfs.h"
 #include "jni_helper.h"
+#include "platform.h"
 
-#include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -35,54 +35,54 @@ struct ExceptionInfo {
 
 static const struct ExceptionInfo gExceptionInfo[] = {
     {
-        .name = "java.io.FileNotFoundException",
-        .noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND,
-        .excErrno = ENOENT,
+        "java.io.FileNotFoundException",
+        NOPRINT_EXC_FILE_NOT_FOUND,
+        ENOENT,
     },
     {
-        .name = "org.apache.hadoop.security.AccessControlException",
-        .noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL,
-        .excErrno = EACCES,
+        "org.apache.hadoop.security.AccessControlException",
+        NOPRINT_EXC_ACCESS_CONTROL,
+        EACCES,
     },
     {
-        .name = "org.apache.hadoop.fs.UnresolvedLinkException",
-        .noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK,
-        .excErrno = ENOLINK,
+        "org.apache.hadoop.fs.UnresolvedLinkException",
+        NOPRINT_EXC_UNRESOLVED_LINK,
+        ENOLINK,
     },
     {
-        .name = "org.apache.hadoop.fs.ParentNotDirectoryException",
-        .noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY,
-        .excErrno = ENOTDIR,
+        "org.apache.hadoop.fs.ParentNotDirectoryException",
+        NOPRINT_EXC_PARENT_NOT_DIRECTORY,
+        ENOTDIR,
     },
     {
-        .name = "java.lang.IllegalArgumentException",
-        .noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT,
-        .excErrno = EINVAL,
+        "java.lang.IllegalArgumentException",
+        NOPRINT_EXC_ILLEGAL_ARGUMENT,
+        EINVAL,
     },
     {
-        .name = "java.lang.OutOfMemoryError",
-        .noPrintFlag = 0,
-        .excErrno = ENOMEM,
+        "java.lang.OutOfMemoryError",
+        0,
+        ENOMEM,
     },
     {
-        .name = "org.apache.hadoop.hdfs.server.namenode.SafeModeException",
-        .noPrintFlag = 0,
-        .excErrno = EROFS,
+        "org.apache.hadoop.hdfs.server.namenode.SafeModeException",
+        0,
+        EROFS,
     },
     {
-        .name = "org.apache.hadoop.fs.FileAlreadyExistsException",
-        .noPrintFlag = 0,
-        .excErrno = EEXIST,
+        "org.apache.hadoop.fs.FileAlreadyExistsException",
+        0,
+        EEXIST,
     },
     {
-        .name = "org.apache.hadoop.hdfs.protocol.QuotaExceededException",
-        .noPrintFlag = 0,
-        .excErrno = EDQUOT,
+        "org.apache.hadoop.hdfs.protocol.QuotaExceededException",
+        0,
+        EDQUOT,
     },
     {
-        .name = "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
-        .noPrintFlag = 0,
-        .excErrno = ESTALE,
+        "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
+        0,
+        ESTALE,
     },
 };
 
@@ -113,6 +113,7 @@ int printExceptionAndFreeV(JNIEnv *env, 
     jstring jStr = NULL;
     jvalue jVal;
     jthrowable jthr;
+    const char *stackTrace;
 
     jthr = classNameOfObject(exc, env, &className);
     if (jthr) {
@@ -148,7 +149,7 @@ int printExceptionAndFreeV(JNIEnv *env, 
             destroyLocalReference(env, jthr);
         } else {
             jStr = jVal.l;
-            const char *stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
+            stackTrace = (*env)->GetStringUTFChars(env, jStr, NULL);
             if (!stackTrace) {
                 fprintf(stderr, "(unable to get stack trace for %s exception: "
                         "GetStringUTFChars error.)\n", className);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.h Tue Aug 19 23:49:39 2014
@@ -34,13 +34,14 @@
  * usually not what you want.)
  */
 
+#include "platform.h"
+
 #include <jni.h>
 #include <stdio.h>
 
 #include <stdlib.h>
 #include <stdarg.h>
 #include <search.h>
-#include <pthread.h>
 #include <errno.h>
 
 /**
@@ -109,7 +110,7 @@ int printExceptionAndFreeV(JNIEnv *env, 
  *                        object.
  */
 int printExceptionAndFree(JNIEnv *env, jthrowable exc, int noPrintFlags,
-        const char *fmt, ...) __attribute__((format(printf, 4, 5)));  
+        const char *fmt, ...) TYPE_CHECKED_PRINTF_FORMAT(4, 5);
 
 /**
  * Print out information about the pending exception and free it.
@@ -124,7 +125,7 @@ int printExceptionAndFree(JNIEnv *env, j
  *                        object.
  */
 int printPendingExceptionAndFree(JNIEnv *env, int noPrintFlags,
-        const char *fmt, ...) __attribute__((format(printf, 3, 4)));  
+        const char *fmt, ...) TYPE_CHECKED_PRINTF_FORMAT(3, 4);
 
 /**
  * Get a local reference to the pending exception and clear it.
@@ -150,6 +151,7 @@ jthrowable getPendingExceptionAndClear(J
  * @return                A local reference to a RuntimeError
  */
 jthrowable newRuntimeError(JNIEnv *env, const char *fmt, ...)
-        __attribute__((format(printf, 2, 3)));
+        TYPE_CHECKED_PRINTF_FORMAT(2, 3);
 
+#undef TYPE_CHECKED_PRINTF_FORMAT
 #endif

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c Tue Aug 19 23:49:39 2014
@@ -49,18 +49,18 @@ int expectFileStats(hdfsFile file,
             stats->totalShortCircuitBytesRead,
             stats->totalZeroCopyBytesRead);
     if (expectedTotalBytesRead != UINT64_MAX) {
-        EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
+        EXPECT_UINT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
     }
     if (expectedTotalLocalBytesRead != UINT64_MAX) {
-        EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
+        EXPECT_UINT64_EQ(expectedTotalLocalBytesRead,
                       stats->totalLocalBytesRead);
     }
     if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
-        EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
+        EXPECT_UINT64_EQ(expectedTotalShortCircuitBytesRead,
                       stats->totalShortCircuitBytesRead);
     }
     if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
-        EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
+        EXPECT_UINT64_EQ(expectedTotalZeroCopyBytesRead,
                       stats->totalZeroCopyBytesRead);
     }
     hdfsFileFreeReadStatistics(stats);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Tue Aug 19 23:49:39 2014
@@ -126,6 +126,18 @@ struct hdfsFile_internal;
         } \
     } while (0);
 
+#define EXPECT_UINT64_EQ(x, y) \
+    do { \
+        uint64_t __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "value %"PRIu64" (errno: %d): expected %"PRIu64"\n", \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
 #define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
     ret = expr; \
     if (!ret) \