You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/06/04 01:23:32 UTC

[1/8] hadoop git commit: HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-9924 [created] 7b3653329


HDFS-10224. Implement asynchronous rename for DistributedFileSystem.  Contributed by Xiaobing Zhou

(cherry picked from commit fc94810d3f537e51e826fc21ade7867892b9d8dc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00c7e6db
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00c7e6db
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00c7e6db

Branch: refs/heads/HDFS-9924
Commit: 00c7e6dbca4f633fcb24b04b8e8a1103a6bfb52b
Parents: 106234d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Apr 26 17:10:13 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:14:38 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   |   1 -
 .../main/java/org/apache/hadoop/ipc/Client.java |  11 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  34 ++-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |   2 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 ++++++++
 .../hadoop/hdfs/DistributedFileSystem.java      |  21 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  45 +++-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 258 +++++++++++++++++++
 8 files changed, 462 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0ecd8b7..9e13a7a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1252,7 +1252,6 @@ public abstract class FileSystem extends Configured implements Closeable {
   /**
    * Renames Path src to Path dst
    * <ul>
-   * <li
    * <li>Fails if src is a file and dst is a directory.
    * <li>Fails if src is a directory and dst is a file.
    * <li>Fails if the parent of dst does not exist or is a file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index f206861..d59aeb89 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,7 +119,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>>
+      RETURN_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -130,8 +131,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnValue() {
-    return (Future<T>) returnValue.get();
+  public static <T> Future<T> getReturnRpcResponse() {
+    return (Future<T>) RETURN_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1396,7 +1397,7 @@ public class Client implements AutoCloseable {
         }
       };
 
-      returnValue.set(returnFuture);
+      RETURN_RPC_RESPONSE.set(returnFuture);
       return null;
     } else {
       return getRpcResponse(call, connection);
@@ -1410,7 +1411,7 @@ public class Client implements AutoCloseable {
    *          synchronous mode.
    */
   @Unstable
-  static boolean isAsynchronousMode() {
+  public static boolean isAsynchronousMode() {
     return asynchronousMode.get();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 071e2e8..8fcdb78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,7 +26,9 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputOutputStream;
 import org.apache.hadoop.io.Writable;
@@ -67,7 +70,9 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  
+  private static final ThreadLocal<Callable<?>>
+      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
         RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -76,6 +81,12 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> Callable<T> getReturnMessageCallback() {
+    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+  }
+
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
@@ -189,7 +200,7 @@ public class ProtobufRpcEngine implements RpcEngine {
      * the server.
      */
     @Override
-    public Object invoke(Object proxy, Method method, Object[] args)
+    public Object invoke(Object proxy, final Method method, Object[] args)
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
@@ -251,6 +262,23 @@ public class ProtobufRpcEngine implements RpcEngine {
         LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
+      if (Client.isAsynchronousMode()) {
+        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+        Callable<Message> callback = new Callable<Message>() {
+          @Override
+          public Message call() throws Exception {
+            return getReturnMessage(method, frrw.get());
+          }
+        };
+        RETURN_MESSAGE_CALLBACK.set(callback);
+        return null;
+      } else {
+        return getReturnMessage(method, val);
+      }
+    }
+
+    private Message getReturnMessage(final Method method,
+        final RpcResponseWrapper rrw) throws ServiceException {
       Message prototype = null;
       try {
         prototype = getReturnProtoType(method);
@@ -260,7 +288,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(val.theResponseRead).build();
+            .mergeFrom(rrw.theResponseRead).build();
 
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index de4395e..6cf75c7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnValue();
+          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
new file mode 100644
index 0000000..37899aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.ipc.Client;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+/****************************************************************
+ * Implementation of the asynchronous distributed file system.
+ * This instance of this class is the way end-user code interacts
+ * with a Hadoop DistributedFileSystem in an asynchronous manner.
+ *
+ *****************************************************************/
+@Unstable
+public class AsyncDistributedFileSystem {
+
+  private final DistributedFileSystem dfs;
+
+  AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
+    this.dfs = dfs;
+  }
+
+  static <T> Future<T> getReturnValue() {
+    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+        .getReturnValueCallback();
+    Future<T> returnFuture = new AbstractFuture<T>() {
+      public T get() throws InterruptedException, ExecutionException {
+        try {
+          set(returnValueCallback.call());
+        } catch (Exception e) {
+          setException(e);
+        }
+        return super.get();
+      }
+    };
+    return returnFuture;
+  }
+
+  /**
+   * Renames Path src to Path dst
+   * <ul>
+   * <li>Fails if src is a file and dst is a directory.
+   * <li>Fails if src is a directory and dst is a file.
+   * <li>Fails if the parent of dst does not exist or is a file.
+   * </ul>
+   * <p>
+   * If OVERWRITE option is not passed as an argument, rename fails if the dst
+   * already exists.
+   * <p>
+   * If OVERWRITE option is passed as an argument, rename overwrites the dst if
+   * it is a file or an empty directory. Rename fails if dst is a non-empty
+   * directory.
+   * <p>
+   * Note that atomicity of rename is dependent on the file system
+   * implementation. Please refer to the file system documentation for details.
+   * This default implementation is non atomic.
+   *
+   * @param src
+   *          path to be renamed
+   * @param dst
+   *          new path after rename
+   * @throws IOException
+   *           on failure
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<Void> rename(Path src, Path dst,
+      final Options.Rename... options) throws IOException {
+    dfs.getFsStatistics().incrementWriteOps(1);
+
+    final Path absSrc = dfs.fixRelativePart(src);
+    final Path absDst = dfs.fixRelativePart(dst);
+
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
+          options);
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 5e54edd..5a0697a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStoragePolicySpi;
@@ -204,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
    * @return path component of {file}
    * @throws IllegalArgumentException if URI does not belong to this DFS
    */
-  private String getPathName(Path file) {
+  String getPathName(Path file) {
     checkPath(file);
     String result = file.toUri().getPath();
     if (!DFSUtilClient.isValidName(result)) {
@@ -2509,4 +2510,22 @@ public class DistributedFileSystem extends FileSystem {
     }
     return ret;
   }
+
+  private final AsyncDistributedFileSystem adfs =
+      new AsyncDistributedFileSystem(this);
+
+  /** @return an {@link AsyncDistributedFileSystem} object. */
+  @Unstable
+  public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
+    return adfs;
+  }
+
+  @Override
+  protected Path fixRelativePart(Path p) {
+    return super.fixRelativePart(p);
+  }
+
+  Statistics getFsStatistics() {
+    return statistics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 513a5e3..f4074b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,11 +24,14 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -135,7 +139,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -153,13 +156,15 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -177,8 +182,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -190,12 +196,9 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 
-import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
-import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
-    .EncryptionZoneProto;
-
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
  * while translating from the parameter types used in ClientProtocol to the
@@ -206,6 +209,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
+  private static final ThreadLocal<Callable<?>>
+      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -239,6 +244,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> Callable<T> getReturnValueCallback() {
+    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+  }
+
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -475,6 +486,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     RenameRequestProto req = RenameRequestProto.newBuilder()
         .setSrc(src)
         .setDst(dst).build();
+
     try {
       return rpcProxy.rename(null, req).getResult();
     } catch (ServiceException e) {
@@ -499,7 +511,22 @@ public class ClientNamenodeProtocolTranslatorPB implements
         setDst(dst).setOverwriteDest(overwrite).
         build();
     try {
-      rpcProxy.rename2(null, req);
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.rename2(null, req);
+
+        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+            .getReturnMessageCallback();
+        Callable<Void> callBack = new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            returnMessageCallback.call();
+            return null;
+          }
+        };
+        RETURN_VALUE_CALLBACK.set(callBack);
+      } else {
+        rpcProxy.rename2(null, req);
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00c7e6db/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
new file mode 100644
index 0000000..9322e1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAsyncDFSRename {
+  final Path asyncRenameDir = new Path("/test/async_rename/");
+  public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+  final private static Configuration CONF = new HdfsConfiguration();
+
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String USER1_NAME = "user1";
+  private static final UserGroupInformation USER1;
+
+  private MiniDFSCluster gCluster;
+
+  static {
+    // explicitly turn on permission checking
+    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+    // Initiate all four users
+    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+        GROUP1_NAME, GROUP2_NAME });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+    gCluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (gCluster != null) {
+      gCluster.shutdown();
+      gCluster = null;
+    }
+  }
+
+  static int countLease(MiniDFSCluster cluster) {
+    return TestDFSRename.countLease(cluster);
+  }
+
+  void list(DistributedFileSystem dfs, String name) throws IOException {
+    FileSystem.LOG.info("\n\n" + name);
+    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+      FileSystem.LOG.info("" + s.getPath());
+    }
+  }
+
+  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+    DataOutputStream a_out = dfs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
+
+  /**
+   * Check the blocks of dst file are cleaned after rename with overwrite
+   * Restart NN to check the rename successfully
+   */
+  @Test
+  public void testAsyncRenameWithOverwrite() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        replFactor).build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+
+    try {
+
+      long fileLen = blockSize * 3;
+      String src = "/foo/src";
+      String dst = "/foo/dst";
+      String src2 = "/foo/src2";
+      String dst2 = "/foo/dst2";
+      Path srcPath = new Path(src);
+      Path dstPath = new Path(dst);
+      Path srcPath2 = new Path(src2);
+      Path dstPath2 = new Path(dst2);
+
+      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst, 0, fileLen);
+      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst2, 0, fileLen);
+      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+          .getBlockManager();
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+
+      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+      retVal1.get();
+      retVal2.get();
+
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+      assertFalse(dfs.exists(srcPath));
+      assertTrue(dfs.exists(dstPath));
+      assertFalse(dfs.exists(srcPath2));
+      assertTrue(dfs.exists(dstPath2));
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    final Path renameDir = new Path(
+        "/test/concurrent_reanme_with_overwrite_dir/");
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    int count = 1000;
+
+    try {
+      long fileLen = blockSize * 3;
+      assertTrue(dfs.mkdirs(renameDir));
+
+      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
+      }
+
+      // wait for completing the calls
+      for (int i = 0; i < count; i++) {
+        returnFutures.get(i).get();
+      }
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+
+      // very the src dir should not exist, dst should
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        assertFalse(dfs.exists(src));
+        assertTrue(dfs.exists(dst));
+      }
+    } finally {
+      dfs.delete(renameDir, true);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testAsyncRenameWithException() throws Exception {
+    FileSystem rootFs = FileSystem.get(CONF);
+    final Path renameDir = new Path("/test/async_rename_exception/");
+    final Path src = new Path(renameDir, "src");
+    final Path dst = new Path(renameDir, "dst");
+    rootFs.mkdirs(src);
+
+    AsyncDistributedFileSystem adfs = USER1
+        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+          @Override
+          public AsyncDistributedFileSystem run() throws Exception {
+            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+          }
+        });
+
+    try {
+      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+      returnFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src);
+    }
+  }
+
+  private void checkPermissionDenied(final Exception e, final Path dir) {
+    assertTrue(e.getCause() instanceof ExecutionException);
+    assertTrue("Permission denied messages must carry AccessControlException",
+        e.getMessage().contains("AccessControlException"));
+    assertTrue("Permission denied messages must carry the username", e
+        .getMessage().contains(USER1_NAME));
+    assertTrue("Permission denied messages must carry the path parent", e
+        .getMessage().contains(dir.getParent().toUri().getPath()));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[7/8] hadoop git commit: HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou.

Posted by wa...@apache.org.
HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou.

(cherry picked from commit 21890c4239b6a82fd6aab3454ce3922226efe7b5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1abed35
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1abed35
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1abed35

Branch: refs/heads/HDFS-9924
Commit: a1abed352c97b092874292547d39a433f7924616
Parents: 7591d5f
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri May 27 14:20:18 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:15:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 36 +-------------------
 1 file changed, 1 insertion(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1abed35/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index ddcf492..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -34,7 +34,6 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,19 +45,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -445,7 +441,7 @@ public class TestAsyncDFS {
     for (int i = 0; i < NUM_TESTS; i++) {
       assertTrue(fs.exists(dsts[i]));
       FsPermission fsPerm = new FsPermission(permissions[i]);
-      checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
+      fs.access(dsts[i], fsPerm.getUserAction());
     }
 
     // test setOwner
@@ -474,34 +470,4 @@ public class TestAsyncDFS {
       assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
     }
   }
-
-  static void checkAccessPermissions(FileStatus stat, FsAction mode)
-      throws IOException {
-    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
-  }
-
-  static void checkAccessPermissions(final UserGroupInformation ugi,
-      FileStatus stat, FsAction mode) throws IOException {
-    FsPermission perm = stat.getPermission();
-    String user = ugi.getShortUserName();
-    List<String> groups = Arrays.asList(ugi.getGroupNames());
-
-    if (user.equals(stat.getOwner())) {
-      if (perm.getUserAction().implies(mode)) {
-        return;
-      }
-    } else if (groups.contains(stat.getGroup())) {
-      if (perm.getGroupAction().implies(mode)) {
-        return;
-      }
-    } else {
-      if (perm.getOtherAction().implies(mode)) {
-        return;
-      }
-    }
-    throw new AccessControlException(String.format(
-        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
-            .getPath(), stat.getOwner(), stat.getGroup(),
-        stat.isDirectory() ? "d" : "-", perm));
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/8] hadoop git commit: HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem. Contributed by Xiaobing Zhou

Posted by wa...@apache.org.
HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem.  Contributed by Xiaobing Zhou

(cherry picked from commit 02d4e478a398c24a5e5e8ea2b0822a5b9d4a97ae)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e00909f4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e00909f4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e00909f4

Branch: refs/heads/HDFS-9924
Commit: e00909f4c7bf40481d415f2664d14035c512be7f
Parents: 35e21d9
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue May 24 14:51:27 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:15:02 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  59 ++++
 .../hadoop/hdfs/DistributedFileSystem.java      |   4 +
 .../ClientNamenodeProtocolTranslatorPB.java     |  30 +-
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 310 +++++++++++++++++++
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  |  15 +-
 .../hdfs/server/namenode/FSAclBaseTest.java     |  12 +-
 6 files changed, 412 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 6bfd71d..29bac2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,12 +19,16 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
 import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
@@ -83,6 +87,7 @@ public class AsyncDistributedFileSystem {
   public Future<Void> rename(Path src, Path dst,
       final Options.Rename... options) throws IOException {
     dfs.getFsStatistics().incrementWriteOps(1);
+    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME);
 
     final Path absSrc = dfs.fixRelativePart(src);
     final Path absDst = dfs.fixRelativePart(dst);
@@ -111,6 +116,7 @@ public class AsyncDistributedFileSystem {
   public Future<Void> setPermission(Path p, final FsPermission permission)
       throws IOException {
     dfs.getFsStatistics().incrementWriteOps(1);
+    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION);
     final Path absPath = dfs.fixRelativePart(p);
     final boolean isAsync = Client.isAsynchronousMode();
     Client.setAsynchronousMode(true);
@@ -142,6 +148,7 @@ public class AsyncDistributedFileSystem {
     }
 
     dfs.getFsStatistics().incrementWriteOps(1);
+    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER);
     final Path absPath = dfs.fixRelativePart(p);
     final boolean isAsync = Client.isAsynchronousMode();
     Client.setAsynchronousMode(true);
@@ -152,4 +159,56 @@ public class AsyncDistributedFileSystem {
       Client.setAsynchronousMode(isAsync);
     }
   }
+
+  /**
+   * Fully replaces ACL of files and directories, discarding all existing
+   * entries.
+   *
+   * @param p
+   *          Path to modify
+   * @param aclSpec
+   *          List<AclEntry> describing modifications, must include entries for
+   *          user, group, and others for compatibility with permission bits.
+   * @throws IOException
+   *           if an ACL could not be modified
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<Void> setAcl(Path p, final List<AclEntry> aclSpec)
+      throws IOException {
+    dfs.getFsStatistics().incrementWriteOps(1);
+    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL);
+    final Path absPath = dfs.fixRelativePart(p);
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec);
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
+
+  /**
+   * Gets the ACL of a file or directory.
+   *
+   * @param p
+   *          Path to get
+   * @return AclStatus describing the ACL of the file or directory
+   * @throws IOException
+   *           if an ACL could not be read
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<AclStatus> getAclStatus(Path p) throws IOException {
+    final Path absPath = dfs.fixRelativePart(p);
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().getAclStatus(dfs.getPathName(absPath));
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 5a0697a..66ee42f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2528,4 +2528,8 @@ public class DistributedFileSystem extends FileSystem {
   Statistics getFsStatistics() {
     return statistics;
   }
+
+  DFSOpsCountStatistics getDFSOpsCountStatistics() {
+    return storageStatistics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 939c1ac..2373da7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
@@ -163,7 +164,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
@@ -1346,7 +1347,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec))
         .build();
     try {
-      rpcProxy.setAcl(null, req);
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.setAcl(null, req);
+        setAsyncReturnValue();
+      } else {
+        rpcProxy.setAcl(null, req);
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -1357,7 +1363,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
     GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder()
         .setSrc(src).build();
     try {
-      return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.getAclStatus(null, req);
+        final AsyncGet<Message, Exception> asyncReturnMessage
+            = ProtobufRpcEngine.getAsyncReturnMessage();
+        final AsyncGet<AclStatus, Exception> asyncGet =
+            new AsyncGet<AclStatus, Exception>() {
+              @Override
+              public AclStatus get(long timeout, TimeUnit unit)
+                  throws Exception {
+                return PBHelperClient
+                    .convert((GetAclStatusResponseProto) asyncReturnMessage
+                        .get(timeout, unit));
+              }
+            };
+        ASYNC_RETURN_VALUE.set(asyncGet);
+        return null;
+      } else {
+        return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
new file mode 100644
index 0000000..67262dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.fs.permission.FsAction.NONE;
+import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
+import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Unit tests for asynchronous distributed filesystem.
+ * */
+public class TestAsyncDFS {
+  public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
+  private static final int NUM_TESTS = 1000;
+  private static final int NUM_NN_HANDLER = 10;
+  private static final int ASYNC_CALL_LIMIT = 100;
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    // explicitly turn on acl
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // explicitly turn on ACL
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // set the limit of max async calls
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        ASYNC_CALL_LIMIT);
+    // set server handlers
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+    fs = FileSystem.get(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  static class AclQueueEntry {
+    private final Object future;
+    private final Path path;
+    private final Boolean isSetAcl;
+
+    AclQueueEntry(final Object future, final Path path,
+        final Boolean isSetAcl) {
+      this.future = future;
+      this.path = path;
+      this.isSetAcl = isSetAcl;
+    }
+
+    public final Object getFuture() {
+      return future;
+    }
+
+    public final Path getPath() {
+      return path;
+    }
+
+    public final Boolean isSetAcl() {
+      return this.isSetAcl;
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testBatchAsyncAcl() throws Exception {
+    final String basePath = "testBatchAsyncAcl";
+    final Path parent = new Path(String.format("/test/%s/", basePath));
+
+    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+        .getAsyncDistributedFileSystem();
+
+    // prepare test
+    int count = NUM_TESTS;
+    final Path[] paths = new Path[count];
+    for (int i = 0; i < count; i++) {
+      paths[i] = new Path(parent, "acl" + i);
+      FileSystem.mkdirs(fs, paths[i],
+          FsPermission.createImmutable((short) 0750));
+      assertTrue(fs.exists(paths[i]));
+      assertTrue(fs.getFileStatus(paths[i]).isDirectory());
+    }
+
+    final List<AclEntry> aclSpec = getAclSpec();
+    final AclEntry[] expectedAclSpec = getExpectedAclSpec();
+    Map<Integer, Future<Void>> setAclRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<AclStatus>> getAclRetFutures =
+        new HashMap<Integer, Future<AclStatus>>();
+    int start = 0, end = 0;
+    try {
+      // test setAcl
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
+            setAclRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForAclReturnValues(setAclRetFutures, start, end);
+          }
+        }
+      }
+      waitForAclReturnValues(setAclRetFutures, end, count);
+
+      // test getAclStatus
+      start = 0;
+      end = 0;
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
+            getAclRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForAclReturnValues(getAclRetFutures, start, end, paths,
+                expectedAclSpec);
+          }
+        }
+      }
+      waitForAclReturnValues(getAclRetFutures, end, count, paths,
+          expectedAclSpec);
+    } catch (Exception e) {
+      throw e;
+    }
+  }
+
+  private void waitForAclReturnValues(
+      final Map<Integer, Future<Void>> aclRetFutures, final int start,
+      final int end) throws InterruptedException, ExecutionException {
+    for (int i = start; i < end; i++) {
+      aclRetFutures.get(i).get();
+    }
+  }
+
+  private void waitForAclReturnValues(
+      final Map<Integer, Future<AclStatus>> aclRetFutures, final int start,
+      final int end, final Path[] paths, final AclEntry[] expectedAclSpec)
+      throws InterruptedException, ExecutionException, IOException {
+    for (int i = start; i < end; i++) {
+      AclStatus aclStatus = aclRetFutures.get(i).get();
+      verifyGetAcl(aclStatus, expectedAclSpec, paths[i]);
+    }
+  }
+
+  private void verifyGetAcl(final AclStatus aclStatus,
+      final AclEntry[] expectedAclSpec, final Path path) throws IOException {
+    if (aclStatus == null) {
+      return;
+    }
+
+    // verify permission and acl
+    AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]);
+    assertArrayEquals(expectedAclSpec, returned);
+    assertPermission(path, (short) 010770);
+    FSAclBaseTest.assertAclFeature(cluster, path, true);
+  }
+
+  private List<AclEntry> getAclSpec() {
+    return Lists.newArrayList(
+        aclEntry(ACCESS, USER, ALL),
+        aclEntry(ACCESS, USER, "foo", ALL),
+        aclEntry(ACCESS, GROUP, READ_EXECUTE),
+        aclEntry(ACCESS, OTHER, NONE),
+        aclEntry(DEFAULT, USER, "foo", ALL));
+  }
+
+  private AclEntry[] getExpectedAclSpec() {
+    return new AclEntry[] {
+        aclEntry(ACCESS, USER, "foo", ALL),
+        aclEntry(ACCESS, GROUP, READ_EXECUTE),
+        aclEntry(DEFAULT, USER, ALL),
+        aclEntry(DEFAULT, USER, "foo", ALL),
+        aclEntry(DEFAULT, GROUP, READ_EXECUTE),
+        aclEntry(DEFAULT, MASK, ALL),
+        aclEntry(DEFAULT, OTHER, NONE) };
+  }
+
+  private void assertPermission(final Path pathToCheck, final short perm)
+      throws IOException {
+    AclTestHelpers.assertPermission(fs, pathToCheck, perm);
+  }
+
+  @Test(timeout=60000)
+  public void testAsyncAPIWithException() throws Exception {
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    UserGroupInformation ugi1;
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+    // Initiate all four users
+    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+        group1, group2 });
+
+    final Path parent = new Path("/test/async_api_exception/");
+    final Path aclDir = new Path(parent, "aclDir");
+    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
+
+    AsyncDistributedFileSystem adfs = ugi1
+        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+          @Override
+          public AsyncDistributedFileSystem run() throws Exception {
+            return cluster.getFileSystem().getAsyncDistributedFileSystem();
+          }
+        });
+
+    Future<Void> retFuture;
+    // test setAcl
+    try {
+      retFuture = adfs.setAcl(aclDir,
+          Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
+      retFuture.get();
+      fail("setAcl should fail with permission denied");
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, aclDir, user1);
+    }
+
+    // test getAclStatus
+    try {
+      Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
+      aclRetFuture.get();
+      fail("getAclStatus should fail with permission denied");
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, aclDir, user1);
+    }
+  }
+
+  public static void checkPermissionDenied(final Exception e, final Path dir,
+      final String user) {
+    assertTrue(e.getCause() instanceof ExecutionException);
+    assertTrue("Permission denied messages must carry AccessControlException",
+        e.getMessage().contains("AccessControlException"));
+    assertTrue("Permission denied messages must carry the username", e
+        .getMessage().contains(user));
+    assertTrue("Permission denied messages must carry the name of the path",
+        e.getMessage().contains(dir.getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 7539fbd..03c8151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -520,7 +520,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       retFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the path parent", e
           .getMessage().contains(src.getParent().toUri().getPath()));
     }
@@ -530,7 +530,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.setPermission(src, fsPerm);
       retFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the name of the path",
           e.getMessage().contains(src.getName()));
     }
@@ -539,7 +539,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.setOwner(src, "user1", "group2");
       retFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the name of the path",
           e.getMessage().contains(src.getName()));
     } finally {
@@ -551,13 +551,4 @@ public class TestAsyncDFSRename {
       }
     }
   }
-
-  private void checkPermissionDenied(final Exception e, final Path dir,
-      final String user) {
-    assertTrue(e.getCause() instanceof ExecutionException);
-    assertTrue("Permission denied messages must carry AccessControlException",
-        e.getMessage().contains("AccessControlException"));
-    assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(user));
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e00909f4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 002f7c0..216147a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -1637,17 +1637,23 @@ public abstract class FSAclBaseTest {
     assertAclFeature(path, expectAclFeature);
   }
 
+  private static void assertAclFeature(Path pathToCheck,
+      boolean expectAclFeature) throws IOException {
+    assertAclFeature(cluster, pathToCheck, expectAclFeature);
+  }
+
   /**
    * Asserts whether or not the inode for a specific path has an AclFeature.
    *
+   * @param miniCluster the cluster into which the path resides
    * @param pathToCheck Path inode to check
    * @param expectAclFeature boolean true if an AclFeature must be present,
    *   false if an AclFeature must not be present
    * @throws IOException thrown if there is an I/O error
    */
-  private static void assertAclFeature(Path pathToCheck,
-      boolean expectAclFeature) throws IOException {
-    AclFeature aclFeature = getAclFeature(pathToCheck, cluster);
+  public static void assertAclFeature(final MiniDFSCluster miniCluster,
+      Path pathToCheck, boolean expectAclFeature) throws IOException {
+    AclFeature aclFeature = getAclFeature(pathToCheck, miniCluster);
     if (expectAclFeature) {
       assertNotNull(aclFeature);
       // Intentionally capturing a reference to the entries, not using nested


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/8] hadoop git commit: HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem. Contributed by Xiaobing Zhou

Posted by wa...@apache.org.
HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem.  Contributed by  Xiaobing Zhou

(cherry picked from commit 7251bb922b20dae49c8c6854864095fb16d8cbd5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7bb7afe2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7bb7afe2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7bb7afe2

Branch: refs/heads/HDFS-9924
Commit: 7bb7afe2ff8900cb1b337204356a4162f1432027
Parents: 529feac
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed May 11 17:53:40 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:14:39 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  59 ++++
 .../ClientNamenodeProtocolTranslatorPB.java     |  39 ++-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 267 +++++++++++++++++--
 .../apache/hadoop/hdfs/TestDFSPermission.java   |  29 +-
 4 files changed, 351 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bb7afe2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3f..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
 import org.apache.hadoop.ipc.Client;
 
@@ -37,6 +38,9 @@ import com.google.common.util.concurrent.AbstractFuture;
  * This instance of this class is the way end-user code interacts
  * with a Hadoop DistributedFileSystem in an asynchronous manner.
  *
+ * This class is unstable, so no guarantee is provided as to reliability,
+ * stability or compatibility across any level of release granularity.
+ *
  *****************************************************************/
 @Unstable
 public class AsyncDistributedFileSystem {
@@ -111,4 +115,59 @@ public class AsyncDistributedFileSystem {
       Client.setAsynchronousMode(isAsync);
     }
   }
+
+  /**
+   * Set permission of a path.
+   *
+   * @param p
+   *          the path the permission is set to
+   * @param permission
+   *          the permission that is set to a path.
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<Void> setPermission(Path p, final FsPermission permission)
+      throws IOException {
+    dfs.getFsStatistics().incrementWriteOps(1);
+    final Path absPath = dfs.fixRelativePart(p);
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().setPermission(dfs.getPathName(absPath), permission);
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
+
+  /**
+   * Set owner of a path (i.e. a file or a directory). The parameters username
+   * and groupname cannot both be null.
+   *
+   * @param p
+   *          The path
+   * @param username
+   *          If it is null, the original username remains unchanged.
+   * @param groupname
+   *          If it is null, the original groupname remains unchanged.
+   * @return an instance of Future, #get of which is invoked to wait for
+   *         asynchronous call being finished.
+   */
+  public Future<Void> setOwner(Path p, String username, String groupname)
+      throws IOException {
+    if (username == null && groupname == null) {
+      throw new IOException("username == null && groupname == null");
+    }
+
+    dfs.getFsStatistics().incrementWriteOps(1);
+    final Path absPath = dfs.fixRelativePart(p);
+    final boolean isAsync = Client.isAsynchronousMode();
+    Client.setAsynchronousMode(true);
+    try {
+      dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname);
+      return getReturnValue();
+    } finally {
+      Client.setAsynchronousMode(isAsync);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bb7afe2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index f4074b6..faa925c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -367,12 +367,30 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setPermission(PBHelperClient.convert(permission))
         .build();
     try {
-      rpcProxy.setPermission(null, req);
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.setPermission(null, req);
+        setReturnValueCallback();
+      } else {
+        rpcProxy.setPermission(null, req);
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
   }
 
+  private void setReturnValueCallback() {
+    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+        .getReturnMessageCallback();
+    Callable<Void> callBack = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        returnMessageCallback.call();
+        return null;
+      }
+    };
+    RETURN_VALUE_CALLBACK.set(callBack);
+  }
+
   @Override
   public void setOwner(String src, String username, String groupname)
       throws IOException {
@@ -383,7 +401,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
     if (groupname != null)
       req.setGroupname(groupname);
     try {
-      rpcProxy.setOwner(null, req.build());
+      if (Client.isAsynchronousMode()) {
+        rpcProxy.setOwner(null, req.build());
+        setReturnValueCallback();
+      } else {
+        rpcProxy.setOwner(null, req.build());
+      }
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -513,17 +536,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-
-        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-            .getReturnMessageCallback();
-        Callable<Void> callBack = new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            returnMessageCallback.call();
-            return null;
-          }
-        };
-        RETURN_VALUE_CALLBACK.set(callBack);
+        setReturnValueCallback();
       } else {
         rpcProxy.rename2(null, req);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bb7afe2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299..7539fbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -22,8 +22,11 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -31,18 +34,30 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+  private final long seed = Time.now();
+  private final Random r = new Random(seed);
+  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+  private final short replFactor = 2;
+  private final long blockSize = 512;
+  private long fileLen = blockSize * 3;
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
@@ -50,8 +65,6 @@ public class TestAsyncDFSRename {
    */
   @Test(timeout = 60000)
   public void testAsyncRenameWithOverwrite() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
         replFactor).build();
@@ -60,8 +73,6 @@ public class TestAsyncDFSRename {
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
 
     try {
-
-      long fileLen = blockSize * 3;
       String src = "/foo/src";
       String dst = "/foo/dst";
       String src2 = "/foo/src2";
@@ -115,8 +126,6 @@ public class TestAsyncDFSRename {
 
   @Test(timeout = 60000)
   public void testCallGetReturnValueMultipleTimes() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
     final Path renameDir = new Path(
         "/test/testCallGetReturnValueMultipleTimes/");
     final Configuration conf = new HdfsConfiguration();
@@ -127,7 +136,6 @@ public class TestAsyncDFSRename {
     final DistributedFileSystem dfs = cluster.getFileSystem();
     final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     final int count = 100;
-    long fileLen = blockSize * 3;
     final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
     assertTrue(dfs.mkdirs(renameDir));
@@ -178,15 +186,15 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test(timeout = 120000)
-  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+  @Test
+  public void testConservativeConcurrentAsyncRenameWithOverwrite()
       throws Exception {
     internalTestConcurrentAsyncRenameWithOverwrite(100,
         "testAggressiveConcurrentAsyncRenameWithOverwrite");
   }
 
   @Test(timeout = 60000)
-  public void testConservativeConcurrentAsyncRenameWithOverwrite()
+  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
       throws Exception {
     internalTestConcurrentAsyncRenameWithOverwrite(10000,
         "testConservativeConcurrentAsyncRenameWithOverwrite");
@@ -194,8 +202,6 @@ public class TestAsyncDFSRename {
 
   private void internalTestConcurrentAsyncRenameWithOverwrite(
       final int asyncCallLimit, final String basePath) throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
     final Path renameDir = new Path(String.format("/test/%s/", basePath));
     Configuration conf = new HdfsConfiguration();
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
@@ -206,7 +212,6 @@ public class TestAsyncDFSRename {
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
-    long fileLen = blockSize * 3;
     int start = 0, end = 0;
     Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
@@ -274,8 +279,206 @@ public class TestAsyncDFSRename {
     }
   }
 
+  @Test
+  public void testConservativeConcurrentAsyncAPI() throws Exception {
+    internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
+  }
+
+  @Test(timeout = 60000)
+  public void testAggressiveConcurrentAsyncAPI() throws Exception {
+    internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
+  }
+
+  private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
+      final String basePath) throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    int count = 500;
+
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    // set the limit of max async calls
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        asyncCallLimit);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+    // start mini cluster
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    cluster.waitActive();
+    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+        .getAsyncDistributedFileSystem();
+
+    // prepare for test
+    FileSystem rootFs = FileSystem.get(conf);
+    final Path parent = new Path(String.format("/test/%s/", basePath));
+    final Path[] srcs = new Path[count];
+    final Path[] dsts = new Path[count];
+    short[] permissions = new short[count];
+    for (int i = 0; i < count; i++) {
+      srcs[i] = new Path(parent, "src" + i);
+      dsts[i] = new Path(parent, "dst" + i);
+      DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
+      assertTrue(rootFs.exists(srcs[i]));
+      assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
+      assertTrue(rootFs.exists(dsts[i]));
+      assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
+      permissions[i] = permGenerator.next();
+    }
+
+    Map<Integer, Future<Void>> renameRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> permRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> ownerRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    int start = 0, end = 0;
+    // test rename
+    for (int i = 0; i < count; i++) {
+      for (;;) {
+        try {
+          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+              Rename.OVERWRITE);
+          renameRetFutures.put(i, returnFuture);
+          break;
+        } catch (AsyncCallLimitExceededException e) {
+          start = end;
+          end = i;
+          waitForReturnValues(renameRetFutures, start, end);
+        }
+      }
+    }
+
+    // wait for completing the calls
+    for (int i = start; i < count; i++) {
+      renameRetFutures.get(i).get();
+    }
+
+    // Restart NN and check the rename successfully
+    cluster.restartNameNodes();
+
+    // very the src should not exist, dst should
+    for (int i = 0; i < count; i++) {
+      assertFalse(rootFs.exists(srcs[i]));
+      assertTrue(rootFs.exists(dsts[i]));
+    }
+
+    // test permissions
+    try {
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<Void> retFuture = adfs.setPermission(dsts[i],
+                new FsPermission(permissions[i]));
+            permRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForReturnValues(permRetFutures, start, end);
+          }
+        }
+      }
+      // wait for completing the calls
+      for (int i = start; i < count; i++) {
+        permRetFutures.get(i).get();
+      }
+
+      // Restart NN and check permission then
+      cluster.restartNameNodes();
+
+      // verify the permission
+      for (int i = 0; i < count; i++) {
+        assertTrue(rootFs.exists(dsts[i]));
+        FsPermission fsPerm = new FsPermission(permissions[i]);
+        checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
+            fsPerm.getUserAction());
+      }
+
+      // test setOwner
+      start = 0;
+      end = 0;
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
+                "group2");
+            ownerRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForReturnValues(ownerRetFutures, start, end);
+          }
+        }
+      }
+      // wait for completing the calls
+      for (int i = start; i < count; i++) {
+        ownerRetFutures.get(i).get();
+      }
+
+      // Restart NN and check owner then
+      cluster.restartNameNodes();
+
+      // verify the owner
+      for (int i = 0; i < count; i++) {
+        assertTrue(rootFs.exists(dsts[i]));
+        assertTrue(
+            "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
+        assertTrue(
+            "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
+      }
+    } catch (AccessControlException ace) {
+      throw ace;
+    } finally {
+      if (rootFs != null) {
+        rootFs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  static void checkAccessPermissions(FileStatus stat, FsAction mode)
+      throws IOException {
+    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+  }
+
+  static void checkAccessPermissions(final UserGroupInformation ugi,
+      FileStatus stat, FsAction mode) throws IOException {
+    FsPermission perm = stat.getPermission();
+    String user = ugi.getShortUserName();
+    List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+    if (user.equals(stat.getOwner())) {
+      if (perm.getUserAction().implies(mode)) {
+        return;
+      }
+    } else if (groups.contains(stat.getGroup())) {
+      if (perm.getGroupAction().implies(mode)) {
+        return;
+      }
+    } else {
+      if (perm.getOtherAction().implies(mode)) {
+        return;
+      }
+    }
+    throw new AccessControlException(String.format(
+        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+            .getPath(), stat.getOwner(), stat.getGroup(),
+        stat.isDirectory() ? "d" : "-", perm));
+  }
+
   @Test(timeout = 60000)
-  public void testAsyncRenameWithException() throws Exception {
+  public void testAsyncAPIWithException() throws Exception {
     Configuration conf = new HdfsConfiguration();
     String group1 = "group1";
     String group2 = "group2";
@@ -286,9 +489,9 @@ public class TestAsyncDFSRename {
     conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
 
     // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(user1, new String[] { group1, group2 });
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
 
     // Initiate all four users
     ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
@@ -299,7 +502,7 @@ public class TestAsyncDFSRename {
     cluster.waitActive();
 
     FileSystem rootFs = FileSystem.get(conf);
-    final Path renameDir = new Path("/test/async_rename_exception/");
+    final Path renameDir = new Path("/test/async_api_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
@@ -312,11 +515,33 @@ public class TestAsyncDFSRename {
           }
         });
 
+    Future<Void> retFuture;
+    try {
+      retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the path parent", e
+          .getMessage().contains(src.getParent().toUri().getPath()));
+    }
+
+    FsPermission fsPerm = new FsPermission(permGenerator.next());
+    try {
+      retFuture = adfs.setPermission(src, fsPerm);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the name of the path",
+          e.getMessage().contains(src.getName()));
+    }
+
     try {
-      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-      returnFuture.get();
+      retFuture = adfs.setOwner(src, "user1", "group2");
+      retFuture.get();
     } catch (ExecutionException e) {
       checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the name of the path",
+          e.getMessage().contains(src.getName()));
     } finally {
       if (rootFs != null) {
         rootFs.close();
@@ -334,7 +559,5 @@ public class TestAsyncDFSRename {
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
         .getMessage().contains(user));
-    assertTrue("Permission denied messages must carry the path parent", e
-        .getMessage().contains(dir.getParent().toUri().getPath()));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bb7afe2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
index aa204cd..66a0380 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -196,22 +196,35 @@ public class TestDFSPermission {
     return fs.getFileStatus(path).getPermission().toShort();
   }
 
-  /* create a file/directory with the default umask and permission */
   private void create(OpType op, Path name) throws IOException {
-    create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION));
+    create(fs, conf, op, name);
+  }
+
+  /* create a file/directory with the default umask and permission */
+  static void create(final FileSystem fs, final Configuration fsConf,
+      OpType op, Path name) throws IOException {
+    create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission(
+        DEFAULT_PERMISSION));
+  }
+
+  private void create(OpType op, Path name, short umask,
+      FsPermission permission)
+      throws IOException {
+    create(fs, conf, op, name, umask, permission);
   }
 
   /* create a file/directory with the given umask and permission */
-  private void create(OpType op, Path name, short umask, 
-      FsPermission permission) throws IOException {
+  static void create(final FileSystem fs, final Configuration fsConf,
+      OpType op, Path name, short umask, FsPermission permission)
+      throws IOException {
     // set umask in configuration, converting to padded octal
-    conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
+    fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
 
     // create the file/directory
     switch (op) {
     case CREATE:
       FSDataOutputStream out = fs.create(name, permission, true, 
-          conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+          fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
           fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null);
       out.close();
       break;
@@ -359,7 +372,7 @@ public class TestDFSPermission {
   final static private String DIR_NAME = "dir";
   final static private String FILE_DIR_NAME = "filedir";
 
-  private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
+  enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
     GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE
   };
 
@@ -615,7 +628,7 @@ public class TestDFSPermission {
   /* A random permission generator that guarantees that each permission
    * value is generated only once.
    */
-  static private class PermissionGenerator {
+  static class PermissionGenerator {
     private final Random r;
     private final short[] permissions = new short[MAX_PERMISSION + 1];
     private int numLeft = MAX_PERMISSION + 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[6/8] hadoop git commit: HDFS-10431 Refactor and speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou

Posted by wa...@apache.org.
HDFS-10431 Refactor and speedup TestAsyncDFSRename.  Contributed by Xiaobing Zhou

(cherry picked from commit f4b9bcd87c66a39f0c93983431630e9d1b6e36d3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7591d5fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7591d5fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7591d5fd

Branch: refs/heads/HDFS-9924
Commit: 7591d5fd90c70fb013d808a76be335e53da61750
Parents: e00909f
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu May 26 12:15:17 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:15:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 233 +++++++-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 563 ++++---------------
 2 files changed, 313 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7591d5fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 67262dd..ddcf492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -29,13 +29,16 @@ import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -43,15 +46,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,21 +72,28 @@ import com.google.common.collect.Lists;
  * */
 public class TestAsyncDFS {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
-  private static final int NUM_TESTS = 1000;
+  private final short replFactor = 1;
+  private final long blockSize = 512;
+  private long fileLen = blockSize * 3;
+  private final long seed = Time.now();
+  private final Random r = new Random(seed);
+  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+  private static final int NUM_TESTS = 50;
   private static final int NUM_NN_HANDLER = 10;
-  private static final int ASYNC_CALL_LIMIT = 100;
+  private static final int ASYNC_CALL_LIMIT = 1000;
 
   private Configuration conf;
   private MiniDFSCluster cluster;
   private FileSystem fs;
+  private AsyncDistributedFileSystem adfs;
 
   @Before
   public void setup() throws IOException {
     conf = new HdfsConfiguration();
     // explicitly turn on acl
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    // explicitly turn on ACL
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
     // set the limit of max async calls
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
         ASYNC_CALL_LIMIT);
@@ -86,6 +102,7 @@ public class TestAsyncDFS {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
     fs = FileSystem.get(conf);
+    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
   }
 
   @After
@@ -130,13 +147,9 @@ public class TestAsyncDFS {
     final String basePath = "testBatchAsyncAcl";
     final Path parent = new Path(String.format("/test/%s/", basePath));
 
-    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
-        .getAsyncDistributedFileSystem();
-
     // prepare test
-    int count = NUM_TESTS;
-    final Path[] paths = new Path[count];
-    for (int i = 0; i < count; i++) {
+    final Path[] paths = new Path[NUM_TESTS];
+    for (int i = 0; i < NUM_TESTS; i++) {
       paths[i] = new Path(parent, "acl" + i);
       FileSystem.mkdirs(fs, paths[i],
           FsPermission.createImmutable((short) 0750));
@@ -153,7 +166,7 @@ public class TestAsyncDFS {
     int start = 0, end = 0;
     try {
       // test setAcl
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < NUM_TESTS; i++) {
         for (;;) {
           try {
             Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
@@ -166,12 +179,12 @@ public class TestAsyncDFS {
           }
         }
       }
-      waitForAclReturnValues(setAclRetFutures, end, count);
+      waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS);
 
       // test getAclStatus
       start = 0;
       end = 0;
-      for (int i = 0; i < count; i++) {
+      for (int i = 0; i < NUM_TESTS; i++) {
         for (;;) {
           try {
             Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
@@ -185,13 +198,23 @@ public class TestAsyncDFS {
           }
         }
       }
-      waitForAclReturnValues(getAclRetFutures, end, count, paths,
+      waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths,
           expectedAclSpec);
     } catch (Exception e) {
       throw e;
     }
   }
 
+  static void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
+      final int start, final int end)
+      throws InterruptedException, ExecutionException {
+    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+    for (int i = start; i < end; i++) {
+      LOG.info("calling Future#get #" + i);
+      retFutures.get(i).get();
+    }
+  }
+
   private void waitForAclReturnValues(
       final Map<Integer, Future<Void>> aclRetFutures, final int start,
       final int end) throws InterruptedException, ExecutionException {
@@ -266,9 +289,12 @@ public class TestAsyncDFS {
 
     final Path parent = new Path("/test/async_api_exception/");
     final Path aclDir = new Path(parent, "aclDir");
-    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
+    final Path src = new Path(parent, "src");
+    final Path dst = new Path(parent, "dst");
+    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700));
+    fs.mkdirs(src);
 
-    AsyncDistributedFileSystem adfs = ugi1
+    AsyncDistributedFileSystem adfs1 = ugi1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
@@ -277,9 +303,36 @@ public class TestAsyncDFS {
         });
 
     Future<Void> retFuture;
+    // test rename
+    try {
+      retFuture = adfs1.rename(src, dst, Rename.OVERWRITE);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the path parent", e
+          .getMessage().contains(src.getParent().toUri().getPath()));
+    }
+
+    // test setPermission
+    FsPermission fsPerm = new FsPermission(permGenerator.next());
+    try {
+      retFuture = adfs1.setPermission(src, fsPerm);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src, user1);
+    }
+
+    // test setOwner
+    try {
+      retFuture = adfs1.setOwner(src, "user1", "group2");
+      retFuture.get();
+    } catch (ExecutionException e) {
+      checkPermissionDenied(e, src, user1);
+    }
+
     // test setAcl
     try {
-      retFuture = adfs.setAcl(aclDir,
+      retFuture = adfs1.setAcl(aclDir,
           Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
       retFuture.get();
       fail("setAcl should fail with permission denied");
@@ -289,7 +342,7 @@ public class TestAsyncDFS {
 
     // test getAclStatus
     try {
-      Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
+      Future<AclStatus> aclRetFuture = adfs1.getAclStatus(aclDir);
       aclRetFuture.get();
       fail("getAclStatus should fail with permission denied");
     } catch (ExecutionException e) {
@@ -307,4 +360,148 @@ public class TestAsyncDFS {
     assertTrue("Permission denied messages must carry the name of the path",
         e.getMessage().contains(dir.getName()));
   }
+
+
+  @Test(timeout = 120000)
+  public void testConcurrentAsyncAPI() throws Exception {
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+    // prepare for test
+    final Path parent = new Path(
+        String.format("/test/%s/", "testConcurrentAsyncAPI"));
+    final Path[] srcs = new Path[NUM_TESTS];
+    final Path[] dsts = new Path[NUM_TESTS];
+    short[] permissions = new short[NUM_TESTS];
+    for (int i = 0; i < NUM_TESTS; i++) {
+      srcs[i] = new Path(parent, "src" + i);
+      dsts[i] = new Path(parent, "dst" + i);
+      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+      assertTrue(fs.exists(srcs[i]));
+      assertTrue(fs.getFileStatus(srcs[i]).isFile());
+      assertTrue(fs.exists(dsts[i]));
+      assertTrue(fs.getFileStatus(dsts[i]).isFile());
+      permissions[i] = permGenerator.next();
+    }
+
+    Map<Integer, Future<Void>> renameRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> permRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> ownerRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    int start = 0, end = 0;
+    // test rename
+    for (int i = 0; i < NUM_TESTS; i++) {
+      for (;;) {
+        try {
+          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+              Rename.OVERWRITE);
+          renameRetFutures.put(i, returnFuture);
+          break;
+        } catch (AsyncCallLimitExceededException e) {
+          start = end;
+          end = i;
+          waitForReturnValues(renameRetFutures, start, end);
+        }
+      }
+    }
+
+    // wait for completing the calls
+    waitForAclReturnValues(renameRetFutures, end, NUM_TESTS);
+
+    // verify the src should not exist, dst should
+    for (int i = 0; i < NUM_TESTS; i++) {
+      assertFalse(fs.exists(srcs[i]));
+      assertTrue(fs.exists(dsts[i]));
+    }
+
+    // test permissions
+    for (int i = 0; i < NUM_TESTS; i++) {
+      for (;;) {
+        try {
+          Future<Void> retFuture = adfs.setPermission(dsts[i],
+              new FsPermission(permissions[i]));
+          permRetFutures.put(i, retFuture);
+          break;
+        } catch (AsyncCallLimitExceededException e) {
+          start = end;
+          end = i;
+          waitForReturnValues(permRetFutures, start, end);
+        }
+      }
+    }
+    // wait for completing the calls
+    waitForAclReturnValues(permRetFutures, end, NUM_TESTS);
+
+    // verify the permission
+    for (int i = 0; i < NUM_TESTS; i++) {
+      assertTrue(fs.exists(dsts[i]));
+      FsPermission fsPerm = new FsPermission(permissions[i]);
+      checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
+    }
+
+    // test setOwner
+    start = 0;
+    end = 0;
+    for (int i = 0; i < NUM_TESTS; i++) {
+      for (;;) {
+        try {
+          Future<Void> retFuture = adfs.setOwner(dsts[i], "user1", "group2");
+          ownerRetFutures.put(i, retFuture);
+          break;
+        } catch (AsyncCallLimitExceededException e) {
+          start = end;
+          end = i;
+          waitForReturnValues(ownerRetFutures, start, end);
+        }
+      }
+    }
+    // wait for completing the calls
+    waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS);
+
+    // verify the owner
+    for (int i = 0; i < NUM_TESTS; i++) {
+      assertTrue(fs.exists(dsts[i]));
+      assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner()));
+      assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
+    }
+  }
+
+  static void checkAccessPermissions(FileStatus stat, FsAction mode)
+      throws IOException {
+    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+  }
+
+  static void checkAccessPermissions(final UserGroupInformation ugi,
+      FileStatus stat, FsAction mode) throws IOException {
+    FsPermission perm = stat.getPermission();
+    String user = ugi.getShortUserName();
+    List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+    if (user.equals(stat.getOwner())) {
+      if (perm.getUserAction().implies(mode)) {
+        return;
+      }
+    } else if (groups.contains(stat.getGroup())) {
+      if (perm.getGroupAction().implies(mode)) {
+        return;
+      }
+    } else {
+      if (perm.getOtherAction().implies(mode)) {
+        return;
+      }
+    }
+    throw new AccessControlException(String.format(
+        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+            .getPath(), stat.getOwner(), stat.getGroup(),
+        stat.isDirectory() ? "d" : "-", perm));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7591d5fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 03c8151..8d3e509 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -19,14 +19,11 @@ package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -34,521 +31,157 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  private final long seed = Time.now();
-  private final Random r = new Random(seed);
-  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
-  private final short replFactor = 2;
+  private final short replFactor = 1;
   private final long blockSize = 512;
   private long fileLen = blockSize * 3;
-
-  /**
-   * Check the blocks of dst file are cleaned after rename with overwrite
-   * Restart NN to check the rename successfully
-   */
-  @Test(timeout = 60000)
-  public void testAsyncRenameWithOverwrite() throws Exception {
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
-        replFactor).build();
+  private static final int NUM_TESTS = 50;
+  private static final int NUM_NN_HANDLER = 10;
+  private static final int ASYNC_CALL_LIMIT = 1000;
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private AsyncDistributedFileSystem adfs;
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    // set the limit of max async calls
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        ASYNC_CALL_LIMIT);
+    // set server handlers
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
-    try {
-      String src = "/foo/src";
-      String dst = "/foo/dst";
-      String src2 = "/foo/src2";
-      String dst2 = "/foo/dst2";
-      Path srcPath = new Path(src);
-      Path dstPath = new Path(dst);
-      Path srcPath2 = new Path(src2);
-      Path dstPath2 = new Path(dst2);
-
-      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
-      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst, 0, fileLen);
-      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst2, 0, fileLen);
-      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
-          .getBlockManager();
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-
-      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
-      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
-      retVal1.get();
-      retVal2.get();
-
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
+    fs = FileSystem.get(conf);
+    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
+  }
 
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-      assertFalse(dfs.exists(srcPath));
-      assertTrue(dfs.exists(dstPath));
-      assertFalse(dfs.exists(srcPath2));
-      assertTrue(dfs.exists(dstPath2));
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+  @After
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
     }
   }
 
   @Test(timeout = 60000)
   public void testCallGetReturnValueMultipleTimes() throws Exception {
-    final Path renameDir = new Path(
-        "/test/testCallGetReturnValueMultipleTimes/");
-    final Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    final int count = 100;
-    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+    final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/");
+    assertTrue(fs.mkdirs(parent));
 
-    assertTrue(dfs.mkdirs(renameDir));
-
-    try {
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
-      }
-
-      for (int i = 0; i < 5; i++) {
-        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
-            renameDir, dfs);
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+    // prepare test
+    final Path[] srcs = new Path[NUM_TESTS];
+    final Path[] dsts = new Path[NUM_TESTS];
+    for (int i = 0; i < NUM_TESTS; i++) {
+      srcs[i] = new Path(parent, "src" + i);
+      dsts[i] = new Path(parent, "dst" + i);
+      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
     }
-  }
 
-  private void verifyCallGetReturnValueMultipleTimes(
-      Map<Integer, Future<Void>> returnFutures, int count,
-      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
-      throws InterruptedException, ExecutionException, IOException {
-    // wait for completing the calls
-    for (int i = 0; i < count; i++) {
-      returnFutures.get(i).get();
+    // concurrently invoking many rename
+    final Map<Integer, Future<Void>> reFutures =
+        new HashMap<Integer, Future<Void>>();
+    for (int i = 0; i < NUM_TESTS; i++) {
+      Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
+          Rename.OVERWRITE);
+      reFutures.put(i, retFuture);
     }
 
-    // Restart NN and check the rename successfully
-    cluster.restartNameNodes();
+    assertEquals(NUM_TESTS, reFutures.size());
 
-    // very the src dir should not exist, dst should
-    for (int i = 0; i < count; i++) {
-      Path src = new Path(renameDir, "src" + i);
-      Path dst = new Path(renameDir, "dst" + i);
-      assertFalse(dfs.exists(src));
-      assertTrue(dfs.exists(dst));
+    for (int i = 0; i < 5; i++) {
+      verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts);
     }
   }
 
-  @Test
-  public void testConservativeConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(100,
-        "testAggressiveConcurrentAsyncRenameWithOverwrite");
-  }
-
-  @Test(timeout = 60000)
-  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(10000,
-        "testConservativeConcurrentAsyncRenameWithOverwrite");
-  }
-
-  private void internalTestConcurrentAsyncRenameWithOverwrite(
-      final int asyncCallLimit, final String basePath) throws Exception {
-    final Path renameDir = new Path(String.format("/test/%s/", basePath));
-    Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        asyncCallLimit);
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
-        .build();
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    int count = 1000;
-    int start = 0, end = 0;
-    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-    assertTrue(dfs.mkdirs(renameDir));
-
-    try {
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        for (;;) {
-          try {
-            LOG.info("rename #" + i);
-            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-            returnFutures.put(i, returnFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            /**
-             * reached limit of async calls, fetch results of finished async
-             * calls to let follow-on calls go
-             */
-            LOG.error(e);
-            start = end;
-            end = i;
-            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
-            waitForReturnValues(returnFutures, start, end);
-          }
-        }
-      }
-
-      // wait for completing the calls
-      for (int i = start; i < count; i++) {
-        returnFutures.get(i).get();
-      }
-
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-
-      // very the src dir should not exist, dst should
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        assertFalse(dfs.exists(src));
-        assertTrue(dfs.exists(dst));
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
+  private void verifyCallGetReturnValueMultipleTimes(
+      final Map<Integer, Future<Void>> reFutures, final Path[] srcs,
+      final Path[] dsts)
+      throws InterruptedException, ExecutionException, IOException {
 
-  private void waitForReturnValues(
-      final Map<Integer, Future<Void>> returnFutures, final int start,
-      final int end) throws InterruptedException, ExecutionException {
-    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
-    for (int i = start; i < end; i++) {
-      LOG.info("calling Future#get #" + i);
-      returnFutures.get(i).get();
-    }
-  }
+    // wait for completing the calls
+    waitForReturnValues(reFutures, 0, NUM_TESTS);
 
-  @Test
-  public void testConservativeConcurrentAsyncAPI() throws Exception {
-    internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
+    // verify the src dir should not exist, dst should
+    verifyRenames(srcs, dsts);
   }
 
   @Test(timeout = 60000)
-  public void testAggressiveConcurrentAsyncAPI() throws Exception {
-    internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
-  }
-
-  private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
-      final String basePath) throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    int count = 500;
-
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-    // set the limit of max async calls
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        asyncCallLimit);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
-    // start mini cluster
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3).build();
-    cluster.waitActive();
-    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
-        .getAsyncDistributedFileSystem();
-
-    // prepare for test
-    FileSystem rootFs = FileSystem.get(conf);
-    final Path parent = new Path(String.format("/test/%s/", basePath));
-    final Path[] srcs = new Path[count];
-    final Path[] dsts = new Path[count];
-    short[] permissions = new short[count];
-    for (int i = 0; i < count; i++) {
+  public void testConcurrentAsyncRename() throws Exception {
+    final Path parent = new Path(
+        String.format("/test/%s/", "testConcurrentAsyncRename"));
+    assertTrue(fs.mkdirs(parent));
+
+    // prepare test
+    final Path[] srcs = new Path[NUM_TESTS];
+    final Path[] dsts = new Path[NUM_TESTS];
+    for (int i = 0; i < NUM_TESTS; i++) {
       srcs[i] = new Path(parent, "src" + i);
       dsts[i] = new Path(parent, "dst" + i);
-      DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
-      DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
-      assertTrue(rootFs.exists(srcs[i]));
-      assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
-      assertTrue(rootFs.exists(dsts[i]));
-      assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
-      permissions[i] = permGenerator.next();
+      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
     }
 
-    Map<Integer, Future<Void>> renameRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> permRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> ownerRetFutures =
-        new HashMap<Integer, Future<Void>>();
+    // concurrently invoking many rename
     int start = 0, end = 0;
-    // test rename
-    for (int i = 0; i < count; i++) {
+    Map<Integer, Future<Void>> retFutures =
+        new HashMap<Integer, Future<Void>>();
+    for (int i = 0; i < NUM_TESTS; i++) {
       for (;;) {
         try {
-          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
+          LOG.info("rename #" + i);
+          Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
               Rename.OVERWRITE);
-          renameRetFutures.put(i, returnFuture);
+          retFutures.put(i, retFuture);
           break;
         } catch (AsyncCallLimitExceededException e) {
+          /**
+           * reached limit of async calls, fetch results of finished async calls
+           * to let follow-on calls go
+           */
+          LOG.error(e);
           start = end;
           end = i;
-          waitForReturnValues(renameRetFutures, start, end);
+          LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+          waitForReturnValues(retFutures, start, end);
         }
       }
     }
 
     // wait for completing the calls
-    for (int i = start; i < count; i++) {
-      renameRetFutures.get(i).get();
-    }
-
-    // Restart NN and check the rename successfully
-    cluster.restartNameNodes();
-
-    // very the src should not exist, dst should
-    for (int i = 0; i < count; i++) {
-      assertFalse(rootFs.exists(srcs[i]));
-      assertTrue(rootFs.exists(dsts[i]));
-    }
-
-    // test permissions
-    try {
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<Void> retFuture = adfs.setPermission(dsts[i],
-                new FsPermission(permissions[i]));
-            permRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForReturnValues(permRetFutures, start, end);
-          }
-        }
-      }
-      // wait for completing the calls
-      for (int i = start; i < count; i++) {
-        permRetFutures.get(i).get();
-      }
-
-      // Restart NN and check permission then
-      cluster.restartNameNodes();
-
-      // verify the permission
-      for (int i = 0; i < count; i++) {
-        assertTrue(rootFs.exists(dsts[i]));
-        FsPermission fsPerm = new FsPermission(permissions[i]);
-        checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
-            fsPerm.getUserAction());
-      }
-
-      // test setOwner
-      start = 0;
-      end = 0;
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
-                "group2");
-            ownerRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForReturnValues(ownerRetFutures, start, end);
-          }
-        }
-      }
-      // wait for completing the calls
-      for (int i = start; i < count; i++) {
-        ownerRetFutures.get(i).get();
-      }
+    waitForReturnValues(retFutures, end, NUM_TESTS);
 
-      // Restart NN and check owner then
-      cluster.restartNameNodes();
-
-      // verify the owner
-      for (int i = 0; i < count; i++) {
-        assertTrue(rootFs.exists(dsts[i]));
-        assertTrue(
-            "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
-        assertTrue(
-            "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
-      }
-    } catch (AccessControlException ace) {
-      throw ace;
-    } finally {
-      if (rootFs != null) {
-        rootFs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+    // verify the src dir should not exist, dst should
+    verifyRenames(srcs, dsts);
   }
 
-  static void checkAccessPermissions(FileStatus stat, FsAction mode)
+  private void verifyRenames(final Path[] srcs, final Path[] dsts)
       throws IOException {
-    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
-  }
-
-  static void checkAccessPermissions(final UserGroupInformation ugi,
-      FileStatus stat, FsAction mode) throws IOException {
-    FsPermission perm = stat.getPermission();
-    String user = ugi.getShortUserName();
-    List<String> groups = Arrays.asList(ugi.getGroupNames());
-
-    if (user.equals(stat.getOwner())) {
-      if (perm.getUserAction().implies(mode)) {
-        return;
-      }
-    } else if (groups.contains(stat.getGroup())) {
-      if (perm.getGroupAction().implies(mode)) {
-        return;
-      }
-    } else {
-      if (perm.getOtherAction().implies(mode)) {
-        return;
-      }
+    for (int i = 0; i < NUM_TESTS; i++) {
+      assertFalse(fs.exists(srcs[i]));
+      assertTrue(fs.exists(dsts[i]));
     }
-    throw new AccessControlException(String.format(
-        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
-            .getPath(), stat.getOwner(), stat.getGroup(),
-        stat.isDirectory() ? "d" : "-", perm));
   }
 
-  @Test(timeout = 60000)
-  public void testAsyncAPIWithException() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    UserGroupInformation ugi1;
-
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
-    // Initiate all four users
-    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
-        group1, group2 });
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3).build();
-    cluster.waitActive();
-
-    FileSystem rootFs = FileSystem.get(conf);
-    final Path renameDir = new Path("/test/async_api_exception/");
-    final Path src = new Path(renameDir, "src");
-    final Path dst = new Path(renameDir, "dst");
-    rootFs.mkdirs(src);
-
-    AsyncDistributedFileSystem adfs = ugi1
-        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
-          @Override
-          public AsyncDistributedFileSystem run() throws Exception {
-            return cluster.getFileSystem().getAsyncDistributedFileSystem();
-          }
-        });
-
-    Future<Void> retFuture;
-    try {
-      retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the path parent", e
-          .getMessage().contains(src.getParent().toUri().getPath()));
-    }
-
-    FsPermission fsPerm = new FsPermission(permGenerator.next());
-    try {
-      retFuture = adfs.setPermission(src, fsPerm);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the name of the path",
-          e.getMessage().contains(src.getName()));
-    }
-
-    try {
-      retFuture = adfs.setOwner(src, "user1", "group2");
-      retFuture.get();
-    } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the name of the path",
-          e.getMessage().contains(src.getName()));
-    } finally {
-      if (rootFs != null) {
-        rootFs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
+  void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
+      final int start, final int end)
+      throws InterruptedException, ExecutionException {
+    TestAsyncDFS.waitForReturnValues(retFutures, start, end);
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/8] hadoop git commit: HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou

Posted by wa...@apache.org.
HADOOP-12957. Limit the number of outstanding async calls.  Contributed by Xiaobing Zhou

(cherry picked from commit 1b9f18623ab55507bea94888317c7d63d0f4a6f2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/529feac7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/529feac7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/529feac7

Branch: refs/heads/HDFS-9924
Commit: 529feac79aea89bfc2d87ef1101e7b8734bcb2da
Parents: 00c7e6d
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon May 2 11:15:12 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:14:39 2016 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |   3 +
 .../ipc/AsyncCallLimitExceededException.java    |  36 +++
 .../main/java/org/apache/hadoop/ipc/Client.java |  66 ++++-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 199 ++++++++++++++--
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  12 +-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 238 +++++++++++++------
 6 files changed, 445 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 86e1b43..06614db 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -324,6 +324,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
     4*60*60; // 4 hours
   
+  public static final String  IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
+      "ipc.client.async.calls.max";
+  public static final int     IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
new file mode 100644
index 0000000..db97b6c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public AsyncCallLimitExceededException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d59aeb89..9be4649 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,7 +159,9 @@ public class Client implements AutoCloseable {
 
   private final boolean fallbackAllowed;
   private final byte[] clientId;
-  
+  private final int maxAsyncCalls;
+  private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
+
   /**
    * Executor on which IPC calls' parameters are sent.
    * Deferring the sending of parameters to a separate
@@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable {
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
+    this.maxAsyncCalls = conf.getInt(
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
   }
 
   /**
@@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable {
       fallbackToSimpleAuth);
   }
 
+  private void checkAsyncCall() throws IOException {
+    if (isAsynchronousMode()) {
+      if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+        asyncCallCounter.decrementAndGet();
+        String errMsg = String.format(
+            "Exceeded limit of max asynchronous calls: %d, " +
+            "please configure %s to adjust it.",
+            maxAsyncCalls,
+            CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+        throw new AsyncCallLimitExceededException(errMsg);
+      }
+    }
+  }
+
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1374,24 +1393,38 @@ public class Client implements AutoCloseable {
     final Call call = createCall(rpcKind, rpcRequest);
     final Connection connection = getConnection(remoteId, call, serviceClass,
         fallbackToSimpleAuth);
+
     try {
-      connection.sendRpcRequest(call);                 // send the rpc request
-    } catch (RejectedExecutionException e) {
-      throw new IOException("connection has been closed", e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn("interrupted waiting to send rpc request to server", e);
-      throw new IOException(e);
+      checkAsyncCall();
+      try {
+        connection.sendRpcRequest(call);                 // send the rpc request
+      } catch (RejectedExecutionException e) {
+        throw new IOException("connection has been closed", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("interrupted waiting to send rpc request to server", e);
+        throw new IOException(e);
+      }
+    } catch(Exception e) {
+      if (isAsynchronousMode()) {
+        releaseAsyncCall();
+      }
+      throw e;
     }
 
     if (isAsynchronousMode()) {
       Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
         public Writable get() throws InterruptedException, ExecutionException {
-          try {
-            set(getRpcResponse(call, connection));
-          } catch (IOException ie) {
-            setException(ie);
+          if (callled.compareAndSet(false, true)) {
+            try {
+              set(getRpcResponse(call, connection));
+            } catch (IOException ie) {
+              setException(ie);
+            } finally {
+              releaseAsyncCall();
+            }
           }
           return super.get();
         }
@@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable {
     asynchronousMode.set(async);
   }
 
+  private void releaseAsyncCall() {
+    asyncCallCounter.decrementAndGet();
+  }
+
+  @VisibleForTesting
+  int getAsyncCallCount() {
+    return asyncCallCounter.get();
+  }
+
   private Writable getRpcResponse(final Call call, final Connection connection)
       throws IOException {
     synchronized (call) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -34,6 +35,7 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -54,12 +56,13 @@ public class TestAsyncIPC {
   @Before
   public void setupConf() {
     conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
     Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
     // set asynchronous mode for main thread
     Client.setAsynchronousMode(true);
   }
 
-  protected static class SerialCaller extends Thread {
+  static class AsyncCaller extends Thread {
     private Client client;
     private InetSocketAddress server;
     private int count;
@@ -68,11 +71,11 @@ public class TestAsyncIPC {
         new HashMap<Integer, Future<LongWritable>>();
     Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
 
-    public SerialCaller(Client client, InetSocketAddress server, int count) {
+    public AsyncCaller(Client client, InetSocketAddress server, int count) {
       this.client = client;
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since SerialCaller extends Thread
+      // set asynchronous mode, since AsyncCaller extends Thread
       Client.setAsynchronousMode(true);
     }
 
@@ -107,14 +110,111 @@ public class TestAsyncIPC {
     }
   }
 
-  @Test
-  public void testSerial() throws IOException, InterruptedException,
+  static class AsyncLimitlCaller extends Thread {
+    private Client client;
+    private InetSocketAddress server;
+    private int count;
+    private boolean failed;
+    Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
+    Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
+    int start = 0, end = 0;
+
+    int getStart() {
+      return start;
+    }
+
+    int getEnd() {
+      return end;
+    }
+
+    int getCount() {
+      return count;
+    }
+
+    public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
+      this(0, client, server, count);
+    }
+
+    final int callerId;
+
+    public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
+        int count) {
+      this.client = client;
+      this.server = server;
+      this.count = count;
+      // set asynchronous mode, since AsyncLimitlCaller extends Thread
+      Client.setAsynchronousMode(true);
+      this.callerId = callerId;
+    }
+
+    @Override
+    public void run() {
+      // in case Thread#Start is called, which will spawn new thread
+      Client.setAsynchronousMode(true);
+      for (int i = 0; i < count; i++) {
+        try {
+          final long param = TestIPC.RANDOM.nextLong();
+          runCall(i, param);
+        } catch (Exception e) {
+          LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
+              StringUtils.stringifyException(e)));
+          failed = true;
+        }
+      }
+    }
+
+    private void runCall(final int idx, final long param)
+        throws InterruptedException, ExecutionException, IOException {
+      for (;;) {
+        try {
+          doCall(idx, param);
+          return;
+        } catch (AsyncCallLimitExceededException e) {
+          /**
+           * reached limit of async calls, fetch results of finished async calls
+           * to let follow-on calls go
+           */
+          start = end;
+          end = idx;
+          waitForReturnValues(start, end);
+        }
+      }
+    }
+
+    private void doCall(final int idx, final long param) throws IOException {
+      TestIPC.call(client, param, server, conf);
+      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+      returnFutures.put(idx, returnFuture);
+      expectedValues.put(idx, param);
+    }
+
+    private void waitForReturnValues(final int start, final int end)
+        throws InterruptedException, ExecutionException {
+      for (int i = start; i < end; i++) {
+        LongWritable value = returnFutures.get(i).get();
+        if (expectedValues.get(i) != value.get()) {
+          LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
+          failed = true;
+          break;
+        }
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testAsyncCall() throws IOException, InterruptedException,
       ExecutionException {
-    internalTestSerial(3, false, 2, 5, 100);
-    internalTestSerial(3, true, 2, 5, 10);
+    internalTestAsyncCall(3, false, 2, 5, 100);
+    internalTestAsyncCall(3, true, 2, 5, 10);
   }
 
-  public void internalTestSerial(int handlerCount, boolean handlerSleep,
+  @Test(timeout = 60000)
+  public void testAsyncCallLimit() throws IOException,
+      InterruptedException, ExecutionException {
+    internalTestAsyncCallLimit(100, false, 5, 10, 500);
+  }
+
+  public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
     Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -126,9 +226,9 @@ public class TestAsyncIPC {
       clients[i] = new Client(LongWritable.class, conf);
     }
 
-    SerialCaller[] callers = new SerialCaller[callerCount];
+    AsyncCaller[] callers = new AsyncCaller[callerCount];
     for (int i = 0; i < callerCount; i++) {
-      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
       callers[i].start();
     }
     for (int i = 0; i < callerCount; i++) {
@@ -144,6 +244,75 @@ public class TestAsyncIPC {
     server.stop();
   }
 
+  @Test(timeout = 60000)
+  public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
+      InterruptedException, ExecutionException {
+    int handlerCount = 10, callCount = 100;
+    Server server = new TestIPC.TestServer(handlerCount, false, conf);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+    final Client client = new Client(LongWritable.class, conf);
+
+    int asyncCallCount = client.getAsyncCallCount();
+
+    try {
+      AsyncCaller caller = new AsyncCaller(client, addr, callCount);
+      caller.run();
+
+      caller.waitForReturnValues();
+      String msg = String.format(
+          "First time, expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
+
+      caller.waitForReturnValues();
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+      msg = String.format("Second time, expected not failed for caller: %s.",
+          caller);
+      assertFalse(msg, caller.failed);
+
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
+
+  public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
+      int clientCount, int callerCount, int callCount) throws IOException,
+      InterruptedException, ExecutionException {
+    Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
+    Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
+
+    Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+
+    AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
+          callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      callers[i].waitForReturnValues(callers[i].getStart(),
+          callers[i].getCount());
+      String msg = String.format("Expected not failed for caller-%d: %s.", i,
+          callers[i]);
+      assertFalse(msg, callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+
   /**
    * Test if (1) the rpc server uses the call id/retry provided by the rpc
    * client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -196,7 +365,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 4);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -235,7 +404,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -272,7 +441,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -313,9 +482,9 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      SerialCaller[] callers = new SerialCaller[callerCount];
+      AsyncCaller[] callers = new AsyncCaller[callerCount];
       for (int i = 0; i < callerCount; ++i) {
-        callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+        callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
         callers[i].start();
       }
       for (int i = 0; i < callerCount; ++i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 37899aa..356ae3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
@@ -50,11 +51,14 @@ public class AsyncDistributedFileSystem {
     final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
         .getReturnValueCallback();
     Future<T> returnFuture = new AbstractFuture<T>() {
+      private final AtomicBoolean called = new AtomicBoolean(false);
       public T get() throws InterruptedException, ExecutionException {
-        try {
-          set(returnValueCallback.call());
-        } catch (Exception e) {
-          setException(e);
+        if (called.compareAndSet(false, true)) {
+          try {
+            set(returnValueCallback.call());
+          } catch (Exception e) {
+            setException(e);
+          }
         }
         return super.get();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/529feac7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 9322e1a..d129299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -31,80 +30,25 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
-  final Path asyncRenameDir = new Path("/test/async_rename/");
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  final private static Configuration CONF = new HdfsConfiguration();
-
-  final private static String GROUP1_NAME = "group1";
-  final private static String GROUP2_NAME = "group2";
-  final private static String USER1_NAME = "user1";
-  private static final UserGroupInformation USER1;
-
-  private MiniDFSCluster gCluster;
-
-  static {
-    // explicitly turn on permission checking
-    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
-    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
-    // Initiate all four users
-    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
-        GROUP1_NAME, GROUP2_NAME });
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
-    gCluster.waitActive();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (gCluster != null) {
-      gCluster.shutdown();
-      gCluster = null;
-    }
-  }
-
-  static int countLease(MiniDFSCluster cluster) {
-    return TestDFSRename.countLease(cluster);
-  }
-
-  void list(DistributedFileSystem dfs, String name) throws IOException {
-    FileSystem.LOG.info("\n\n" + name);
-    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
-      FileSystem.LOG.info("" + s.getPath());
-    }
-  }
-
-  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
-    DataOutputStream a_out = dfs.create(f);
-    a_out.writeBytes("something");
-    a_out.close();
-  }
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
    * Restart NN to check the rename successfully
    */
-  @Test
+  @Test(timeout = 60000)
   public void testAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
@@ -169,38 +113,134 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test
-  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+  @Test(timeout = 60000)
+  public void testCallGetReturnValueMultipleTimes() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
     final Path renameDir = new Path(
-        "/test/concurrent_reanme_with_overwrite_dir/");
+        "/test/testCallGetReturnValueMultipleTimes/");
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    final int count = 100;
+    long fileLen = blockSize * 3;
+    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+    assertTrue(dfs.mkdirs(renameDir));
+
+    try {
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
+      }
+
+      for (int i = 0; i < 5; i++) {
+        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+            renameDir, dfs);
+      }
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void verifyCallGetReturnValueMultipleTimes(
+      Map<Integer, Future<Void>> returnFutures, int count,
+      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+      throws InterruptedException, ExecutionException, IOException {
+    // wait for completing the calls
+    for (int i = 0; i < count; i++) {
+      returnFutures.get(i).get();
+    }
+
+    // Restart NN and check the rename successfully
+    cluster.restartNameNodes();
+
+    // very the src dir should not exist, dst should
+    for (int i = 0; i < count; i++) {
+      Path src = new Path(renameDir, "src" + i);
+      Path dst = new Path(renameDir, "dst" + i);
+      assertFalse(dfs.exists(src));
+      assertTrue(dfs.exists(dst));
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(100,
+        "testAggressiveConcurrentAsyncRenameWithOverwrite");
+  }
+
+  @Test(timeout = 60000)
+  public void testConservativeConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(10000,
+        "testConservativeConcurrentAsyncRenameWithOverwrite");
+  }
+
+  private void internalTestConcurrentAsyncRenameWithOverwrite(
+      final int asyncCallLimit, final String basePath) throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
+    final Path renameDir = new Path(String.format("/test/%s/", basePath));
     Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        asyncCallLimit);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
         .build();
     cluster.waitActive();
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
+    long fileLen = blockSize * 3;
+    int start = 0, end = 0;
+    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
-    try {
-      long fileLen = blockSize * 3;
-      assertTrue(dfs.mkdirs(renameDir));
-
-      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+    assertTrue(dfs.mkdirs(renameDir));
 
+    try {
       // concurrently invoking many rename
       for (int i = 0; i < count; i++) {
         Path src = new Path(renameDir, "src" + i);
         Path dst = new Path(renameDir, "dst" + i);
         DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
         DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
+        for (;;) {
+          try {
+            LOG.info("rename #" + i);
+            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+            returnFutures.put(i, returnFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            /**
+             * reached limit of async calls, fetch results of finished async
+             * calls to let follow-on calls go
+             */
+            LOG.error(e);
+            start = end;
+            end = i;
+            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+            waitForReturnValues(returnFutures, start, end);
+          }
+        }
       }
 
       // wait for completing the calls
-      for (int i = 0; i < count; i++) {
+      for (int i = start; i < count; i++) {
         returnFutures.get(i).get();
       }
 
@@ -215,26 +255,60 @@ public class TestAsyncDFSRename {
         assertTrue(dfs.exists(dst));
       }
     } finally {
-      dfs.delete(renameDir, true);
+      if (dfs != null) {
+        dfs.close();
+      }
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
 
-  @Test
+  private void waitForReturnValues(
+      final Map<Integer, Future<Void>> returnFutures, final int start,
+      final int end) throws InterruptedException, ExecutionException {
+    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+    for (int i = start; i < end; i++) {
+      LOG.info("calling Future#get #" + i);
+      returnFutures.get(i).get();
+    }
+  }
+
+  @Test(timeout = 60000)
   public void testAsyncRenameWithException() throws Exception {
-    FileSystem rootFs = FileSystem.get(CONF);
+    Configuration conf = new HdfsConfiguration();
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    UserGroupInformation ugi1;
+
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(user1, new String[] { group1, group2 });
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
+    // Initiate all four users
+    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+        group1, group2 });
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem rootFs = FileSystem.get(conf);
     final Path renameDir = new Path("/test/async_rename_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
 
-    AsyncDistributedFileSystem adfs = USER1
+    AsyncDistributedFileSystem adfs = ugi1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
-            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+            return cluster.getFileSystem().getAsyncDistributedFileSystem();
           }
         });
 
@@ -242,16 +316,24 @@ public class TestAsyncDFSRename {
       Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       returnFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src);
+      checkPermissionDenied(e, src, user1);
+    } finally {
+      if (rootFs != null) {
+        rootFs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
-  private void checkPermissionDenied(final Exception e, final Path dir) {
+  private void checkPermissionDenied(final Exception e, final Path dir,
+      final String user) {
     assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(USER1_NAME));
+        .getMessage().contains(user));
     assertTrue("Permission denied messages must carry the path parent", e
         .getMessage().contains(dir.getParent().toUri().getPath()));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[8/8] hadoop git commit: HADOOP-13226 Support async call retry and failover.

Posted by wa...@apache.org.
HADOOP-13226 Support async call retry and failover.

(cherry picked from commit 83f2f78c118a7e52aba5104bd97b0acedc96be7b)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b365332
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b365332
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b365332

Branch: refs/heads/HDFS-9924
Commit: 7b365332998070426ec0474129a46ec8a21a025d
Parents: a1abed3
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue May 31 16:30:11 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:15:09 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   8 +-
 .../hadoop/io/retry/AsyncCallHandler.java       | 321 +++++++++++++++++++
 .../org/apache/hadoop/io/retry/CallReturn.java  |  75 +++++
 .../hadoop/io/retry/RetryInvocationHandler.java | 134 ++++++--
 .../apache/hadoop/io/retry/RetryPolicies.java   |   4 +-
 .../main/java/org/apache/hadoop/ipc/Client.java |  25 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  13 +-
 .../apache/hadoop/util/concurrent/AsyncGet.java |  17 +-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |  10 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |   7 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  42 +--
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    |  43 +--
 .../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 181 +++++++++++
 .../hdfs/server/namenode/ha/HATestUtil.java     |   9 +-
 14 files changed, 775 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ab8673b..a644aa5 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -345,7 +345,13 @@
        <Bug pattern="SF_SWITCH_FALLTHROUGH" />
      </Match>
 
-     <!-- Synchronization performed on util.concurrent instance. -->
+     <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
+     <Match>
+       <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
+       <Method name="wait" />
+       <Bug pattern="WA_NOT_IN_LOOP" />
+     </Match>
+
      <Match>
        <Class name="org.apache.hadoop.service.AbstractService" />
        <Method name="stop" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
new file mode 100644
index 0000000..5a03b03
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Handle async calls. */
+@InterfaceAudience.Private
+public class AsyncCallHandler {
+  static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
+
+  private static final ThreadLocal<AsyncGet<?, Exception>>
+      LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<Object, Throwable>>
+      ASYNC_RETURN = new ThreadLocal<>();
+
+  /** @return the async return value from {@link AsyncCallHandler}. */
+  @InterfaceStability.Unstable
+  @SuppressWarnings("unchecked")
+  public static <R, T extends  Throwable> AsyncGet<R, T> getAsyncReturn() {
+    final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
+    if (asyncGet != null) {
+      ASYNC_RETURN.set(null);
+      return asyncGet;
+    } else {
+      return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
+    }
+  }
+
+  /** For the lower rpc layers to set the async return value. */
+  @InterfaceStability.Unstable
+  public static void setLowerLayerAsyncReturn(
+      AsyncGet<?, Exception> asyncReturn) {
+    LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
+  }
+
+  private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
+    final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
+    Preconditions.checkNotNull(asyncGet);
+    LOWER_LAYER_ASYNC_RETURN.set(null);
+    return asyncGet;
+  }
+
+  /** A simple concurrent queue which keeping track the empty start time. */
+  static class ConcurrentQueue<T> {
+    private final Queue<T> queue = new LinkedList<>();
+    private long emptyStartTime = Time.monotonicNow();
+
+    synchronized int size() {
+      return queue.size();
+    }
+
+    /** Is the queue empty for more than the given time in millisecond? */
+    synchronized boolean isEmpty(long time) {
+      return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
+    }
+
+    synchronized void offer(T c) {
+      final boolean added = queue.offer(c);
+      Preconditions.checkState(added);
+    }
+
+    synchronized T poll() {
+      Preconditions.checkState(!queue.isEmpty());
+      final T t = queue.poll();
+      if (queue.isEmpty()) {
+        emptyStartTime = Time.monotonicNow();
+      }
+      return t;
+    }
+  }
+
+  /** A queue for handling async calls. */
+  static class AsyncCallQueue {
+    private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
+    private final Processor processor = new Processor();
+
+    void addCall(AsyncCall call) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("add " + call);
+      }
+      queue.offer(call);
+      processor.tryStart();
+    }
+
+    void checkCalls() {
+      final int size = queue.size();
+      for (int i = 0; i < size; i++) {
+        final AsyncCall c = queue.poll();
+        if (!c.isDone()) {
+          queue.offer(c); // the call is not done yet, add it back.
+        }
+      }
+    }
+
+    /** Process the async calls in the queue. */
+    private class Processor {
+      static final long GRACE_PERIOD = 10*1000L;
+      static final long SLEEP_PERIOD = 100L;
+
+      private final AtomicReference<Thread> running = new AtomicReference<>();
+
+      boolean isRunning(Daemon d) {
+        return d == running.get();
+      }
+
+      void tryStart() {
+        final Thread current = Thread.currentThread();
+        if (running.compareAndSet(null, current)) {
+          final Daemon daemon = new Daemon() {
+            @Override
+            public void run() {
+              for (; isRunning(this);) {
+                try {
+                  Thread.sleep(SLEEP_PERIOD);
+                } catch (InterruptedException e) {
+                  kill(this);
+                  return;
+                }
+
+                checkCalls();
+                tryStop(this);
+              }
+            }
+          };
+
+          final boolean set = running.compareAndSet(current, daemon);
+          Preconditions.checkState(set);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
+          }
+          daemon.start();
+        }
+      }
+
+      void tryStop(Daemon d) {
+        if (queue.isEmpty(GRACE_PERIOD)) {
+          kill(d);
+        }
+      }
+
+      void kill(Daemon d) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Killing " + d);
+        }
+        final boolean set = running.compareAndSet(d, null);
+        Preconditions.checkState(set);
+      }
+    }
+  }
+
+  static class AsyncValue<V> {
+    private V value;
+
+    synchronized V waitAsyncValue(long timeout, TimeUnit unit)
+        throws InterruptedException, TimeoutException {
+      if (value != null) {
+        return value;
+      }
+      AsyncGet.Util.wait(this, timeout, unit);
+      if (value != null) {
+        return value;
+      }
+
+      throw new TimeoutException("waitCallReturn timed out "
+          + timeout + " " + unit);
+    }
+
+    synchronized void set(V v) {
+      Preconditions.checkNotNull(v);
+      Preconditions.checkState(value == null);
+      value = v;
+      notify();
+    }
+
+    synchronized boolean isDone() {
+      return value != null;
+    }
+  }
+
+  static class AsyncCall extends RetryInvocationHandler.Call {
+    private final AsyncCallHandler asyncCallHandler;
+
+    private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
+    private AsyncGet<?, Exception> lowerLayerAsyncGet;
+
+    AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
+              RetryInvocationHandler.Counters counters,
+              RetryInvocationHandler<?> retryInvocationHandler,
+              AsyncCallHandler asyncCallHandler) {
+      super(method, args, isRpc, callId, counters, retryInvocationHandler);
+
+      this.asyncCallHandler = asyncCallHandler;
+    }
+
+    /** @return true if the call is done; otherwise, return false. */
+    boolean isDone() {
+      final CallReturn r = invokeOnce();
+      switch (r.getState()) {
+        case RETURNED:
+        case EXCEPTION:
+          asyncCallReturn.set(r); // the async call is done
+          return true;
+        case RETRY:
+          invokeOnce();
+          break;
+        case ASYNC_CALL_IN_PROGRESS:
+        case ASYNC_INVOKED:
+          // nothing to do
+          break;
+        default:
+          Preconditions.checkState(false);
+      }
+      return false;
+    }
+
+    @Override
+    CallReturn invoke() throws Throwable {
+      LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
+      if (lowerLayerAsyncGet != null) {
+        // async call was submitted early, check the lower level async call
+        final boolean isDone = lowerLayerAsyncGet.isDone();
+        LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
+        if (!isDone) {
+          return CallReturn.ASYNC_CALL_IN_PROGRESS;
+        }
+        try {
+          return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
+        } finally {
+          lowerLayerAsyncGet = null;
+        }
+      }
+
+      // submit a new async call
+      LOG.trace("invoke: ASYNC_INVOKED");
+      final boolean mode = Client.isAsynchronousMode();
+      try {
+        Client.setAsynchronousMode(true);
+        final Object r = invokeMethod();
+        // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
+        Preconditions.checkState(r == null);
+        lowerLayerAsyncGet = getLowerLayerAsyncReturn();
+
+        if (counters.isZeros()) {
+          // first async attempt, initialize
+          LOG.trace("invoke: initAsyncCall");
+          asyncCallHandler.initAsyncCall(this, asyncCallReturn);
+        }
+        return CallReturn.ASYNC_INVOKED;
+      } finally {
+        Client.setAsynchronousMode(mode);
+      }
+    }
+  }
+
+  private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
+  private volatile boolean hasSuccessfulCall = false;
+
+  AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
+                         int callId, RetryInvocationHandler.Counters counters,
+                         RetryInvocationHandler<?> retryInvocationHandler) {
+    return new AsyncCall(method, args, isRpc, callId, counters,
+        retryInvocationHandler, this);
+  }
+
+  boolean hasSuccessfulCall() {
+    return hasSuccessfulCall;
+  }
+
+  private void initAsyncCall(final AsyncCall asyncCall,
+                             final AsyncValue<CallReturn> asyncCallReturn) {
+    asyncCalls.addCall(asyncCall);
+
+    final AsyncGet<Object, Throwable> asyncGet
+        = new AsyncGet<Object, Throwable>() {
+      @Override
+      public Object get(long timeout, TimeUnit unit) throws Throwable {
+        final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
+        final Object r = c.getReturnValue();
+        hasSuccessfulCall = true;
+        return r;
+      }
+
+      @Override
+      public boolean isDone() {
+        return asyncCallReturn.isDone();
+      }
+    };
+    ASYNC_RETURN.set(asyncGet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
new file mode 100644
index 0000000..943725c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import com.google.common.base.Preconditions;
+
+/** The call return from a method invocation. */
+class CallReturn {
+  /** The return state. */
+  enum State {
+    /** Call is returned successfully. */
+    RETURNED,
+    /** Call throws an exception. */
+    EXCEPTION,
+    /** Call should be retried according to the {@link RetryPolicy}. */
+    RETRY,
+    /** Call, which is async, is still in progress. */
+    ASYNC_CALL_IN_PROGRESS,
+    /** Call, which is async, just has been invoked. */
+    ASYNC_INVOKED
+  }
+
+  static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
+      State.ASYNC_CALL_IN_PROGRESS);
+  static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
+  static final CallReturn RETRY = new CallReturn(State.RETRY);
+
+  private final Object returnValue;
+  private final Throwable thrown;
+  private final State state;
+
+  CallReturn(Object r) {
+    this(r, null, State.RETURNED);
+  }
+  CallReturn(Throwable t) {
+    this(null, t, State.EXCEPTION);
+    Preconditions.checkNotNull(t);
+  }
+  private CallReturn(State s) {
+    this(null, null, s);
+  }
+  private CallReturn(Object r, Throwable t, State s) {
+    Preconditions.checkArgument(r == null || t == null);
+    returnValue = r;
+    thrown = t;
+    state = s;
+  }
+
+  State getState() {
+    return state;
+  }
+
+  Object getReturnValue() throws Throwable {
+    if (state == State.EXCEPTION) {
+      throw thrown;
+    }
+    Preconditions.checkState(state == State.RETURNED, "state == %s", state);
+    return returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 300d0c2..f2b2c99 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,11 +42,83 @@ import java.util.Map;
 public class RetryInvocationHandler<T> implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
 
-  private static class Counters {
+  static class Call {
+    private final Method method;
+    private final Object[] args;
+    private final boolean isRpc;
+    private final int callId;
+    final Counters counters;
+
+    private final RetryPolicy retryPolicy;
+    private final RetryInvocationHandler<?> retryInvocationHandler;
+
+    Call(Method method, Object[] args, boolean isRpc, int callId,
+         Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
+      this.method = method;
+      this.args = args;
+      this.isRpc = isRpc;
+      this.callId = callId;
+      this.counters = counters;
+
+      this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
+      this.retryInvocationHandler = retryInvocationHandler;
+    }
+
+    /** Invoke the call once without retrying. */
+    synchronized CallReturn invokeOnce() {
+      try {
+        // The number of times this invocation handler has ever been failed over
+        // before this method invocation attempt. Used to prevent concurrent
+        // failed method invocations from triggering multiple failover attempts.
+        final long failoverCount = retryInvocationHandler.getFailoverCount();
+        try {
+          return invoke();
+        } catch (Exception e) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this, e);
+          }
+          if (Thread.currentThread().isInterrupted()) {
+            // If interrupted, do not retry.
+            throw e;
+          }
+          retryInvocationHandler.handleException(
+              method, retryPolicy, failoverCount, counters, e);
+          return CallReturn.RETRY;
+        }
+      } catch(Throwable t) {
+        return new CallReturn(t);
+      }
+    }
+
+    CallReturn invoke() throws Throwable {
+      return new CallReturn(invokeMethod());
+    }
+
+    Object invokeMethod() throws Throwable {
+      if (isRpc) {
+        Client.setCallIdAndRetryCount(callId, counters.retries);
+      }
+      return retryInvocationHandler.invokeMethod(method, args);
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "#" + callId + ": "
+          + method.getDeclaringClass().getSimpleName() + "." + method.getName()
+          + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
+          +  ")";
+    }
+  }
+
+  static class Counters {
     /** Counter for retries. */
     private int retries;
     /** Counter for method invocation has been failed over. */
     private int failovers;
+
+    boolean isZeros() {
+      return retries == 0 && failovers == 0;
+    }
   }
 
   private static class ProxyDescriptor<T> {
@@ -144,11 +216,13 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
 
   private final ProxyDescriptor<T> proxyDescriptor;
 
-  private volatile boolean hasMadeASuccessfulCall = false;
-  
+  private volatile boolean hasSuccessfulCall = false;
+
   private final RetryPolicy defaultPolicy;
   private final Map<String,RetryPolicy> methodNameToPolicyMap;
 
+  private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
+
   protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
       RetryPolicy retryPolicy) {
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -167,38 +241,35 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     return policy != null? policy: defaultPolicy;
   }
 
+  private long getFailoverCount() {
+    return proxyDescriptor.getFailoverCount();
+  }
+
+  private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
+                       Counters counters) {
+    if (Client.isAsynchronousMode()) {
+      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
+          counters, this);
+    } else {
+      return new Call(method, args, isRpc, callId, counters, this);
+    }
+  }
+
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
     final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
     final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
-    return invoke(method, args, isRpc, callId, new Counters());
-  }
-
-  private Object invoke(final Method method, final Object[] args,
-      final boolean isRpc, final int callId, final Counters counters)
-      throws Throwable {
-    final RetryPolicy policy = getRetryPolicy(method);
+    final Counters counters = new Counters();
 
+    final Call call = newCall(method, args, isRpc, callId, counters);
     while (true) {
-      // The number of times this invocation handler has ever been failed over,
-      // before this method invocation attempt. Used to prevent concurrent
-      // failed method invocations from triggering multiple failover attempts.
-      final long failoverCount = proxyDescriptor.getFailoverCount();
-
-      if (isRpc) {
-        Client.setCallIdAndRetryCount(callId, counters.retries);
-      }
-      try {
-        final Object ret = invokeMethod(method, args);
-        hasMadeASuccessfulCall = true;
-        return ret;
-      } catch (Exception ex) {
-        if (Thread.currentThread().isInterrupted()) {
-          // If interrupted, do not retry.
-          throw ex;
-        }
-        handleException(method, policy, failoverCount, counters, ex);
+      final CallReturn c = call.invokeOnce();
+      final CallReturn.State state = c.getState();
+      if (state == CallReturn.State.ASYNC_INVOKED) {
+        return null; // return null for async calls
+      } else if (c.getState() != CallReturn.State.RETRY) {
+        return c.getReturnValue();
       }
     }
   }
@@ -239,7 +310,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       final int failovers, final long delay, final Exception ex) {
     // log info if this has made some successful calls or
     // this is not the first failover
-    final boolean info = hasMadeASuccessfulCall || failovers != 0;
+    final boolean info = hasSuccessfulCall || failovers != 0
+        || asyncCallHandler.hasSuccessfulCall();
     if (!info && !LOG.isDebugEnabled()) {
       return;
     }
@@ -265,7 +337,9 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       if (!method.isAccessible()) {
         method.setAccessible(true);
       }
-      return method.invoke(proxyDescriptor.getProxy(), args);
+      final Object r = method.invoke(proxyDescriptor.getProxy(), args);
+      hasSuccessfulCall = true;
+      return r;
     } catch (InvocationTargetException e) {
       throw e.getCause();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index 131aa8f..c0a14b7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
@@ -647,8 +648,9 @@ public class RetryPolicies {
         return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
             + retries + ") exceeded maximum allowed (" + maxRetries + ")");
       }
-      
+
       if (e instanceof ConnectException ||
+          e instanceof EOFException ||
           e instanceof NoRouteToHostException ||
           e instanceof UnknownHostException ||
           e instanceof StandbyException ||

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d1d5b17..ed8d905 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
@@ -94,8 +93,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
-      = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
+      ASYNC_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -106,8 +105,9 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getAsyncRpcResponse() {
-    return (Future<T>) ASYNC_RPC_RESPONSE.get();
+  public static <T extends Writable> AsyncGet<T, IOException>
+      getAsyncRpcResponse() {
+    return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1413,9 +1413,16 @@ public class Client implements AutoCloseable {
             }
           }
         }
+
+        @Override
+        public boolean isDone() {
+          synchronized (call) {
+            return call.done;
+          }
+        }
       };
 
-      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+      ASYNC_RPC_RESPONSE.set(asyncGet);
       return null;
     } else {
       return getRpcResponse(call, connection, -1, null);
@@ -1460,10 +1467,8 @@ public class Client implements AutoCloseable {
     synchronized (call) {
       while (!call.done) {
         try {
-          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
-              timeout, unit);
-          call.wait(waitTimeout); // wait for the result
-          if (waitTimeout > 0 && !call.done) {
+          AsyncGet.Util.wait(call, timeout, unit);
+          if (timeout >= 0 && !call.done) {
             return null;
           }
         } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0f43fc6..315ec67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,7 +54,6 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -256,14 +255,18 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
+        final AsyncGet<RpcResponseWrapper, IOException> arr
+            = Client.getAsyncRpcResponse();
         final AsyncGet<Message, Exception> asyncGet
             = new AsyncGet<Message, Exception>() {
           @Override
           public Message get(long timeout, TimeUnit unit) throws Exception {
-            final RpcResponseWrapper rrw = timeout < 0?
-                frrw.get(): frrw.get(timeout, unit);
-            return getReturnMessage(method, rrw);
+            return getReturnMessage(method, arr.get(timeout, unit));
+          }
+
+          @Override
+          public boolean isDone() {
+            return arr.isDone();
           }
         };
         ASYNC_RETURN_MESSAGE.set(asyncGet);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index 5eac869..f124890 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,14 +47,19 @@ public interface AsyncGet<R, E extends Throwable> {
   R get(long timeout, TimeUnit unit)
       throws E, TimeoutException, InterruptedException;
 
+  /** @return true if the underlying computation is done; false, otherwise. */
+  boolean isDone();
+
   /** Utility */
   class Util {
-    /**
-     * @return {@link Object#wait(long)} timeout converted
-     *         from {@link #get(long, TimeUnit)} timeout.
-     */
-    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
-      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
+    /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
+    public static void wait(Object obj, long timeout, TimeUnit unit)
+        throws InterruptedException {
+      if (timeout < 0) {
+        obj.wait();
+      } else if (timeout > 0) {
+        obj.wait(unit.toMillis(timeout));
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 0ad191b..4450c0c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,6 +51,11 @@ public class TestAsyncIPC {
   private static Configuration conf;
   private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
 
+  static <T extends Writable> AsyncGetFuture<T, IOException>
+      getAsyncRpcResponseFuture() {
+    return new AsyncGetFuture<>(Client.getAsyncRpcResponse());
+  }
+
   @Before
   public void setupConf() {
     conf = new Configuration();
@@ -84,7 +90,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          returnFutures.put(i, Client.getAsyncRpcResponse());
+          returnFutures.put(i, getAsyncRpcResponseFuture());
           expectedValues.put(i, param);
         } catch (Exception e) {
           failed = true;
@@ -204,7 +210,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      returnFutures.put(idx, Client.getAsyncRpcResponse());
+      returnFutures.put(idx, getAsyncRpcResponseFuture());
       expectedValues.put(idx, param);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 29bac2a..824336a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
 import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
@@ -51,9 +51,8 @@ public class AsyncDistributedFileSystem {
     this.dfs = dfs;
   }
 
-  static <T> Future<T> getReturnValue() {
-    return new AsyncGetFuture<>(
-        ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
+  private static <T> Future<T> getReturnValue() {
+    return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 2373da7..bcf5269 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -184,6 +183,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -212,8 +212,6 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<AsyncGet<?, Exception>>
-      ASYNC_RETURN_VALUE = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -247,12 +245,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
-  @SuppressWarnings("unchecked")
-  @Unstable
-  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
-    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
-  }
-
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -391,8 +383,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
         asyncReturnMessage.get(timeout, unit);
         return null;
       }
+
+      @Override
+      public boolean isDone() {
+        return asyncReturnMessage.isDone();
+      }
     };
-    ASYNC_RETURN_VALUE.set(asyncGet);
+    AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
   }
 
   @Override
@@ -1367,17 +1364,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
         rpcProxy.getAclStatus(null, req);
         final AsyncGet<Message, Exception> asyncReturnMessage
             = ProtobufRpcEngine.getAsyncReturnMessage();
-        final AsyncGet<AclStatus, Exception> asyncGet =
-            new AsyncGet<AclStatus, Exception>() {
-              @Override
-              public AclStatus get(long timeout, TimeUnit unit)
-                  throws Exception {
-                return PBHelperClient
-                    .convert((GetAclStatusResponseProto) asyncReturnMessage
-                        .get(timeout, unit));
-              }
-            };
-        ASYNC_RETURN_VALUE.set(asyncGet);
+        final AsyncGet<AclStatus, Exception> asyncGet
+            = new AsyncGet<AclStatus, Exception>() {
+          @Override
+          public AclStatus get(long timeout, TimeUnit unit) throws Exception {
+            return PBHelperClient.convert((GetAclStatusResponseProto)
+                asyncReturnMessage.get(timeout, unit));
+          }
+
+          @Override
+          public boolean isDone() {
+            return asyncReturnMessage.isDone();
+          }
+        };
+        AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
         return null;
       } else {
         return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index c7615a9..6a60290 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -70,7 +71,7 @@ public class TestAsyncDFS {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
   private final short replFactor = 1;
   private final long blockSize = 512;
-  private long fileLen = blockSize * 3;
+  private long fileLen = 0;
   private final long seed = Time.now();
   private final Random r = new Random(seed);
   private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -80,7 +81,7 @@ public class TestAsyncDFS {
 
   private Configuration conf;
   private MiniDFSCluster cluster;
-  private FileSystem fs;
+  private DistributedFileSystem fs;
   private AsyncDistributedFileSystem adfs;
 
   @Before
@@ -95,10 +96,10 @@ public class TestAsyncDFS {
         ASYNC_CALL_LIMIT);
     // set server handlers
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
     cluster.waitActive();
-    fs = FileSystem.get(conf);
-    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
+    fs = cluster.getFileSystem();
+    adfs = fs.getAsyncDistributedFileSystem();
   }
 
   @After
@@ -113,31 +114,6 @@ public class TestAsyncDFS {
     }
   }
 
-  static class AclQueueEntry {
-    private final Object future;
-    private final Path path;
-    private final Boolean isSetAcl;
-
-    AclQueueEntry(final Object future, final Path path,
-        final Boolean isSetAcl) {
-      this.future = future;
-      this.path = path;
-      this.isSetAcl = isSetAcl;
-    }
-
-    public final Object getFuture() {
-      return future;
-    }
-
-    public final Path getPath() {
-      return path;
-    }
-
-    public final Boolean isSetAcl() {
-      return this.isSetAcl;
-    }
-  }
-
   @Test(timeout=60000)
   public void testBatchAsyncAcl() throws Exception {
     final String basePath = "testBatchAsyncAcl";
@@ -348,7 +324,7 @@ public class TestAsyncDFS {
 
   public static void checkPermissionDenied(final Exception e, final Path dir,
       final String user) {
-    assertTrue(e.getCause() instanceof ExecutionException);
+    assertTrue(e.getCause() instanceof RemoteException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
@@ -470,4 +446,9 @@ public class TestAsyncDFS {
       assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
     }
   }
+
+  @Test
+  public void testAsyncWithoutRetry() throws Exception {
+    TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
new file mode 100644
index 0000000..9ade8ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/** Test async methods with HA setup. */
+public class TestAsyncHDFSWithHA {
+  static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
+  static {
+    GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
+  }
+
+  private static <T> Future<T> getReturnValue() {
+    return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+  }
+
+  static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
+                     Path[] dsts) throws IOException {
+    for (int i = 0; i < srcs.length; i++) {
+      srcs[i] = new Path(dir, "src" + i);
+      dsts[i] = new Path(dir, "dst" + i);
+      dfs.mkdirs(srcs[i]);
+    }
+  }
+
+  static void runTestAsyncWithoutRetry(Configuration conf,
+      MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
+    final int num = 5;
+
+    final String renameDir = "/testAsyncWithoutRetry/";
+    final Path[] srcs = new Path[num + 1];
+    final Path[] dsts = new Path[num + 1];
+    mkdirs(dfs, renameDir, srcs, dsts);
+
+    // create a proxy without retry.
+    final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
+        = NameNodeProxies.createNonHAProxy(conf,
+        cluster.getNameNode(0).getNameNodeAddress(),
+        ClientProtocol.class, UserGroupInformation.getCurrentUser(),
+        false);
+    final ClientProtocol cp = proxyInfo.getProxy();
+
+    // submit async calls
+    Client.setAsynchronousMode(true);
+    final List<Future<Void>> results = new ArrayList<>();
+    for (int i = 0; i < num; i++) {
+      final String src = srcs[i].toString();
+      final String dst = dsts[i].toString();
+      LOG.info(i + ") rename " + src + " -> " + dst);
+      cp.rename2(src, dst);
+      results.add(getReturnValue());
+    }
+    Client.setAsynchronousMode(false);
+
+    // wait for the async calls
+    for (Future<Void> f : results) {
+      f.get();
+    }
+
+    //check results
+    for (int i = 0; i < num; i++) {
+      Assert.assertEquals(false, dfs.exists(srcs[i]));
+      Assert.assertEquals(true, dfs.exists(dsts[i]));
+    }
+  }
+
+  /** Testing HDFS async methods with HA setup. */
+  @Test(timeout = 120000)
+  public void testAsyncWithHAFailover() throws Exception {
+    final int num = 10;
+
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0).build();
+
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
+          cluster, conf);
+      runTestAsyncWithoutRetry(conf, cluster, dfs);
+
+      final String renameDir = "/testAsyncWithHAFailover/";
+      final Path[] srcs = new Path[num + 1];
+      final Path[] dsts = new Path[num + 1];
+      mkdirs(dfs, renameDir, srcs, dsts);
+
+      // submit async calls and trigger failover in the middle.
+      final AsyncDistributedFileSystem adfs
+          = dfs.getAsyncDistributedFileSystem();
+      final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
+
+      final List<Future<Void>> results = new ArrayList<>();
+      final List<IOException> exceptions = new ArrayList<>();
+      final List<Future<?>> futures = new ArrayList<>();
+      final int half = num/2;
+      for(int i = 0; i <= num; i++) {
+        final int id = i;
+        futures.add(executor.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              if (id == half) {
+                // failover
+                cluster.shutdownNameNode(0);
+                cluster.transitionToActive(1);
+              } else {
+                // rename
+                results.add(adfs.rename(srcs[id], dsts[id]));
+              }
+            } catch (IOException e) {
+              exceptions.add(e);
+            }
+          }
+        }));
+      }
+
+      // wait for the tasks
+      Assert.assertEquals(num + 1, futures.size());
+      for(int i = 0; i <= num; i++) {
+        futures.get(i).get();
+      }
+      // wait for the async calls
+      Assert.assertEquals(num, results.size());
+      Assert.assertTrue(exceptions.isEmpty());
+      for(Future<Void> r : results) {
+        r.get();
+      }
+
+      // check results
+      for(int i = 0; i <= num; i++) {
+        final boolean renamed = i != half;
+        Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
+        Assert.assertEquals(renamed, dfs.exists(dsts[i]));
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b365332/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 42cf3d4..169bbee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -135,7 +136,8 @@ public abstract class HATestUtil {
   }
   
   /** Gets the filesystem instance by setting the failover configurations */
-  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
+  public static DistributedFileSystem configureFailoverFs(
+      MiniDFSCluster cluster, Configuration conf)
       throws IOException, URISyntaxException {
     return configureFailoverFs(cluster, conf, 0);
   }
@@ -147,13 +149,14 @@ public abstract class HATestUtil {
    * @param nsIndex namespace index starting with zero
    * @throws IOException if an error occurs rolling the edit log
    */
-  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
+  public static DistributedFileSystem configureFailoverFs(
+      MiniDFSCluster cluster, Configuration conf,
       int nsIndex) throws IOException, URISyntaxException {
     conf = new Configuration(conf);
     String logicalName = getLogicalHostname(cluster);
     setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
     FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
-    return fs;
+    return (DistributedFileSystem)fs;
   }
   
   public static void setFailoverConfigurations(MiniDFSCluster cluster,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/8] hadoop git commit: HADOOP-13168. Support Future.get with timeout in ipc async calls.

Posted by wa...@apache.org.
HADOOP-13168. Support Future.get with timeout in ipc async calls.

(cherry picked from commit 42c22f7e3d6e88bf1115f617f6e803288886d1ac)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/35e21d9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/35e21d9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/35e21d9c

Branch: refs/heads/HDFS-9924
Commit: 35e21d9c3c9998d5314ee38c14efd99182095a92
Parents: 7bb7afe
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu May 19 15:34:04 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:14:40 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 119 ++++++++----------
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  62 +++++-----
 .../apache/hadoop/util/concurrent/AsyncGet.java |  60 +++++++++
 .../hadoop/util/concurrent/AsyncGetFuture.java  |  73 +++++++++++
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 124 +++++++++++--------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  24 +---
 .../ClientNamenodeProtocolTranslatorPB.java     |  33 ++---
 7 files changed, 310 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 9be4649..d1d5b17 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,46 +18,10 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.apache.hadoop.ipc.RpcConstants.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -93,14 +57,25 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+import java.io.*;
+import java.net.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -119,8 +94,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>>
-      RETURN_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+      = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -131,8 +106,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnRpcResponse() {
-    return (Future<T>) RETURN_RPC_RESPONSE.get();
+  public static <T> Future<T> getAsyncRpcResponse() {
+    return (Future<T>) ASYNC_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -379,6 +354,11 @@ public class Client implements AutoCloseable {
       }
     }
 
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + id;
+    }
+
     /** Indicate when the call is complete and the
      * value or error are available.  Notifies by default.  */
     protected synchronized void callComplete() {
@@ -1413,27 +1393,32 @@ public class Client implements AutoCloseable {
     }
 
     if (isAsynchronousMode()) {
-      Future<Writable> returnFuture = new AbstractFuture<Writable>() {
-        private final AtomicBoolean callled = new AtomicBoolean(false);
+      final AsyncGet<Writable, IOException> asyncGet
+          = new AsyncGet<Writable, IOException>() {
         @Override
-        public Writable get() throws InterruptedException, ExecutionException {
-          if (callled.compareAndSet(false, true)) {
-            try {
-              set(getRpcResponse(call, connection));
-            } catch (IOException ie) {
-              setException(ie);
-            } finally {
+        public Writable get(long timeout, TimeUnit unit)
+            throws IOException, TimeoutException{
+          boolean done = true;
+          try {
+            final Writable w = getRpcResponse(call, connection, timeout, unit);
+            if (w == null) {
+              done = false;
+              throw new TimeoutException(call + " timed out "
+                  + timeout + " " + unit);
+            }
+            return w;
+          } finally {
+            if (done) {
               releaseAsyncCall();
             }
           }
-          return super.get();
         }
       };
 
-      RETURN_RPC_RESPONSE.set(returnFuture);
+      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
       return null;
     } else {
-      return getRpcResponse(call, connection);
+      return getRpcResponse(call, connection, -1, null);
     }
   }
 
@@ -1469,12 +1454,18 @@ public class Client implements AutoCloseable {
     return asyncCallCounter.get();
   }
 
-  private Writable getRpcResponse(final Call call, final Connection connection)
-      throws IOException {
+  /** @return the rpc response or, in case of timeout, null. */
+  private Writable getRpcResponse(final Call call, final Connection connection,
+      final long timeout, final TimeUnit unit) throws IOException {
     synchronized (call) {
       while (!call.done) {
         try {
-          call.wait();                           // wait for the result
+          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+              timeout, unit);
+          call.wait(waitTimeout); // wait for the result
+          if (waitTimeout > 0 && !call.done) {
+            return null;
+          }
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Call interrupted");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 8fcdb78..0f43fc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,21 +18,9 @@
 
 package org.apache.hadoop.ipc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.SocketFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -52,17 +40,23 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.Message;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import javax.net.SocketFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * RPC Engine for for protobuf based RPCs.
@@ -70,8 +64,8 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<Callable<?>>
-      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<Message, Exception>>
+      ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
 
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -81,10 +75,9 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
-  @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Callable<T> getReturnMessageCallback() {
-    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+  public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
+    return ASYNC_RETURN_MESSAGE.get();
   }
 
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -263,14 +256,17 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
-        Callable<Message> callback = new Callable<Message>() {
+        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
+        final AsyncGet<Message, Exception> asyncGet
+            = new AsyncGet<Message, Exception>() {
           @Override
-          public Message call() throws Exception {
-            return getReturnMessage(method, frrw.get());
+          public Message get(long timeout, TimeUnit unit) throws Exception {
+            final RpcResponseWrapper rrw = timeout < 0?
+                frrw.get(): frrw.get(timeout, unit);
+            return getReturnMessage(method, rrw);
           }
         };
-        RETURN_MESSAGE_CALLBACK.set(callback);
+        ASYNC_RETURN_MESSAGE.set(asyncGet);
         return null;
       } else {
         return getReturnMessage(method, val);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
new file mode 100644
index 0000000..5eac869
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
+ *
+ * When the return value is still being computed, invoking
+ * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
+ * The method should be invoked again and again
+ * until the underlying computation is completed.
+ *
+ * @param <R> The type of the return value.
+ * @param <E> The exception type that the underlying implementation may throw.
+ */
+public interface AsyncGet<R, E extends Throwable> {
+  /**
+   * Get the result.
+   *
+   * @param timeout The maximum time period to wait.
+   *                When timeout == 0, it does not wait at all.
+   *                When timeout < 0, it waits indefinitely.
+   * @param unit The unit of the timeout value
+   * @return the result, which is possibly null.
+   * @throws E an exception thrown by the underlying implementation.
+   * @throws TimeoutException if it cannot return after the given time period.
+   * @throws InterruptedException if the thread is interrupted.
+   */
+  R get(long timeout, TimeUnit unit)
+      throws E, TimeoutException, InterruptedException;
+
+  /** Utility */
+  class Util {
+    /**
+     * @return {@link Object#wait(long)} timeout converted
+     *         from {@link #get(long, TimeUnit)} timeout.
+     */
+    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
new file mode 100644
index 0000000..d687867
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** A {@link Future} implemented using an {@link AsyncGet} object. */
+public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
+  public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
+
+  private final AtomicBoolean called = new AtomicBoolean(false);
+  private final AsyncGet<T, E> asyncGet;
+
+  public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
+    this.asyncGet = asyncGet;
+  }
+
+  private void callAsyncGet(long timeout, TimeUnit unit) {
+    if (!isCancelled() && called.compareAndSet(false, true)) {
+      try {
+        set(asyncGet.get(timeout, unit));
+      } catch (TimeoutException te) {
+        LOG.trace("TRACE", te);
+        called.compareAndSet(true, false);
+      } catch (Throwable e) {
+        LOG.trace("TRACE", e);
+        setException(e);
+      }
+    }
+  }
+
+  @Override
+  public T get() throws InterruptedException, ExecutionException {
+    callAsyncGet(-1, TimeUnit.MILLISECONDS);
+    return super.get();
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException, ExecutionException {
+    callAsyncGet(timeout, unit);
+    return super.get(0, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public boolean isDone() {
+    callAsyncGet(0, TimeUnit.MILLISECONDS);
+    return super.isDone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..0ad191b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,20 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +34,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 public class TestAsyncIPC {
 
   private static Configuration conf;
@@ -87,26 +84,50 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
-          returnFutures.put(i, returnFuture);
+          returnFutures.put(i, Client.getAsyncRpcResponse());
           expectedValues.put(i, param);
         } catch (Exception e) {
-          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
+          throw new RuntimeException(e);
         }
       }
     }
 
-    public void waitForReturnValues() throws InterruptedException,
-        ExecutionException {
+    void assertReturnValues() throws InterruptedException, ExecutionException {
       for (int i = 0; i < count; i++) {
         LongWritable value = returnFutures.get(i).get();
-        if (expectedValues.get(i) != value.get()) {
-          LOG.fatal(String.format("Call-%d failed!", i));
-          failed = true;
-          break;
+        Assert.assertEquals("call" + i + " failed.",
+            expectedValues.get(i).longValue(), value.get());
+      }
+      Assert.assertFalse(failed);
+    }
+
+    void assertReturnValues(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException {
+      final boolean[] checked = new boolean[count];
+      for(boolean done = false; !done;) {
+        done = true;
+        for (int i = 0; i < count; i++) {
+          if (checked[i]) {
+            continue;
+          } else {
+            done = false;
+          }
+
+          final LongWritable value;
+          try {
+            value = returnFutures.get(i).get(timeout, unit);
+          } catch (TimeoutException e) {
+            LOG.info("call" + i + " caught ", e);
+            continue;
+          }
+
+          Assert.assertEquals("call" + i + " failed.",
+              expectedValues.get(i).longValue(), value.get());
+          checked[i] = true;
         }
       }
+      Assert.assertFalse(failed);
     }
   }
 
@@ -183,8 +204,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
-      returnFutures.put(idx, returnFuture);
+      returnFutures.put(idx, Client.getAsyncRpcResponse());
       expectedValues.put(idx, param);
     }
 
@@ -233,10 +253,7 @@ public class TestAsyncIPC {
     }
     for (int i = 0; i < callerCount; i++) {
       callers[i].join();
-      callers[i].waitForReturnValues();
-      String msg = String.format("Expected not failed for caller-%d: %s.", i,
-          callers[i]);
-      assertFalse(msg, callers[i].failed);
+      callers[i].assertReturnValues();
     }
     for (int i = 0; i < clientCount; i++) {
       clients[i].stop();
@@ -258,25 +275,37 @@ public class TestAsyncIPC {
     try {
       AsyncCaller caller = new AsyncCaller(client, addr, callCount);
       caller.run();
+      caller.assertReturnValues();
+      caller.assertReturnValues();
+      caller.assertReturnValues();
+      Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
 
-      caller.waitForReturnValues();
-      String msg = String.format(
-          "First time, expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
+  @Test(timeout = 60000)
+  public void testFutureGetWithTimeout() throws IOException,
+      InterruptedException, ExecutionException {
+//    GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
+    final Server server = new TestIPC.TestServer(10, true, conf);
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
 
-      caller.waitForReturnValues();
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-      msg = String.format("Second time, expected not failed for caller: %s.",
-          caller);
-      assertFalse(msg, caller.failed);
+    final Client client = new Client(LongWritable.class, conf);
 
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
+    try {
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      caller.run();
+      caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
     } finally {
       client.stop();
       server.stop();
     }
   }
 
+
   public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
@@ -367,9 +396,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -406,9 +433,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -443,9 +468,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -489,10 +512,7 @@ public class TestAsyncIPC {
       }
       for (int i = 0; i < callerCount; ++i) {
         callers[i].join();
-        callers[i].waitForReturnValues();
-        String msg = String.format("Expected not failed for caller-%d: %s.", i,
-            callers[i]);
-        assertFalse(msg, callers[i].failed);
+        callers[i].assertReturnValues();
       }
     } finally {
       client.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 4fe0861..6bfd71d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,20 +19,16 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
-import com.google.common.util.concurrent.AbstractFuture;
-
 /****************************************************************
  * Implementation of the asynchronous distributed file system.
  * This instance of this class is the way end-user code interacts
@@ -52,22 +48,8 @@ public class AsyncDistributedFileSystem {
   }
 
   static <T> Future<T> getReturnValue() {
-    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
-        .getReturnValueCallback();
-    Future<T> returnFuture = new AbstractFuture<T>() {
-      private final AtomicBoolean called = new AtomicBoolean(false);
-      public T get() throws InterruptedException, ExecutionException {
-        if (called.compareAndSet(false, true)) {
-          try {
-            set(returnValueCallback.call());
-          } catch (Exception e) {
-            setException(e);
-          }
-        }
-        return super.get();
-      }
-    };
-    return returnFuture;
+    return new AsyncGetFuture<>(
+        ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/35e21d9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index faa925c..939c1ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,7 +24,8 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
+
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -198,6 +199,7 @@ import org.apache.hadoop.security.token.Token;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -209,8 +211,8 @@ import com.google.protobuf.ServiceException;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<Callable<?>>
-      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<?, Exception>>
+      ASYNC_RETURN_VALUE = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -246,8 +248,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Callable<T> getReturnValueCallback() {
-    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
   }
 
   @Override
@@ -369,7 +371,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setPermission(null, req);
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.setPermission(null, req);
       }
@@ -378,17 +380,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
-  private void setReturnValueCallback() {
-    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-        .getReturnMessageCallback();
-    Callable<Void> callBack = new Callable<Void>() {
+  private void setAsyncReturnValue() {
+    final AsyncGet<Message, Exception> asyncReturnMessage
+        = ProtobufRpcEngine.getAsyncReturnMessage();
+    final AsyncGet<Void, Exception> asyncGet
+        = new AsyncGet<Void, Exception>() {
       @Override
-      public Void call() throws Exception {
-        returnMessageCallback.call();
+      public Void get(long timeout, TimeUnit unit) throws Exception {
+        asyncReturnMessage.get(timeout, unit);
         return null;
       }
     };
-    RETURN_VALUE_CALLBACK.set(callBack);
+    ASYNC_RETURN_VALUE.set(asyncGet);
   }
 
   @Override
@@ -403,7 +406,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setOwner(null, req.build());
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.setOwner(null, req.build());
       }
@@ -536,7 +539,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.rename2(null, req);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org