You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/06/04 01:23:00 UTC

[1/8] hadoop git commit: Revert "HADOOP-13226 Support async call retry and failover."

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d5609e349 -> c11c2ee64


Revert "HADOOP-13226 Support async call retry and failover."

This reverts commit f7189d6a1c08075920bb6f8efd41a8730a4eb7c7.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5bfadb8e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5bfadb8e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5bfadb8e

Branch: refs/heads/branch-2
Commit: 5bfadb8e6bd9af791890da84707f4ebee0f664d8
Parents: d5609e3
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:47 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:47 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   8 +-
 .../hadoop/io/retry/AsyncCallHandler.java       | 321 -------------------
 .../org/apache/hadoop/io/retry/CallReturn.java  |  75 -----
 .../hadoop/io/retry/RetryInvocationHandler.java | 134 ++------
 .../apache/hadoop/io/retry/RetryPolicies.java   |   4 +-
 .../main/java/org/apache/hadoop/ipc/Client.java |  25 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  13 +-
 .../apache/hadoop/util/concurrent/AsyncGet.java |  17 +-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |  11 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |   9 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  42 +--
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    |  43 ++-
 .../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 182 -----------
 .../hdfs/server/namenode/ha/HATestUtil.java     |   9 +-
 14 files changed, 116 insertions(+), 777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 4bf1762..4a8cbaf 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -363,13 +363,7 @@
        <Bug pattern="SF_SWITCH_FALLTHROUGH" />
      </Match>
 
-     <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
-     <Match>
-       <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
-       <Method name="wait" />
-       <Bug pattern="WA_NOT_IN_LOOP" />
-     </Match>
-
+     <!-- Synchronization performed on util.concurrent instance. -->
      <Match>
        <Class name="org.apache.hadoop.service.AbstractService" />
        <Method name="stop" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
deleted file mode 100644
index 5a03b03..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/** Handle async calls. */
-@InterfaceAudience.Private
-public class AsyncCallHandler {
-  static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
-
-  private static final ThreadLocal<AsyncGet<?, Exception>>
-      LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
-  private static final ThreadLocal<AsyncGet<Object, Throwable>>
-      ASYNC_RETURN = new ThreadLocal<>();
-
-  /** @return the async return value from {@link AsyncCallHandler}. */
-  @InterfaceStability.Unstable
-  @SuppressWarnings("unchecked")
-  public static <R, T extends  Throwable> AsyncGet<R, T> getAsyncReturn() {
-    final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
-    if (asyncGet != null) {
-      ASYNC_RETURN.set(null);
-      return asyncGet;
-    } else {
-      return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
-    }
-  }
-
-  /** For the lower rpc layers to set the async return value. */
-  @InterfaceStability.Unstable
-  public static void setLowerLayerAsyncReturn(
-      AsyncGet<?, Exception> asyncReturn) {
-    LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
-  }
-
-  private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
-    final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
-    Preconditions.checkNotNull(asyncGet);
-    LOWER_LAYER_ASYNC_RETURN.set(null);
-    return asyncGet;
-  }
-
-  /** A simple concurrent queue which keeping track the empty start time. */
-  static class ConcurrentQueue<T> {
-    private final Queue<T> queue = new LinkedList<>();
-    private long emptyStartTime = Time.monotonicNow();
-
-    synchronized int size() {
-      return queue.size();
-    }
-
-    /** Is the queue empty for more than the given time in millisecond? */
-    synchronized boolean isEmpty(long time) {
-      return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
-    }
-
-    synchronized void offer(T c) {
-      final boolean added = queue.offer(c);
-      Preconditions.checkState(added);
-    }
-
-    synchronized T poll() {
-      Preconditions.checkState(!queue.isEmpty());
-      final T t = queue.poll();
-      if (queue.isEmpty()) {
-        emptyStartTime = Time.monotonicNow();
-      }
-      return t;
-    }
-  }
-
-  /** A queue for handling async calls. */
-  static class AsyncCallQueue {
-    private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
-    private final Processor processor = new Processor();
-
-    void addCall(AsyncCall call) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("add " + call);
-      }
-      queue.offer(call);
-      processor.tryStart();
-    }
-
-    void checkCalls() {
-      final int size = queue.size();
-      for (int i = 0; i < size; i++) {
-        final AsyncCall c = queue.poll();
-        if (!c.isDone()) {
-          queue.offer(c); // the call is not done yet, add it back.
-        }
-      }
-    }
-
-    /** Process the async calls in the queue. */
-    private class Processor {
-      static final long GRACE_PERIOD = 10*1000L;
-      static final long SLEEP_PERIOD = 100L;
-
-      private final AtomicReference<Thread> running = new AtomicReference<>();
-
-      boolean isRunning(Daemon d) {
-        return d == running.get();
-      }
-
-      void tryStart() {
-        final Thread current = Thread.currentThread();
-        if (running.compareAndSet(null, current)) {
-          final Daemon daemon = new Daemon() {
-            @Override
-            public void run() {
-              for (; isRunning(this);) {
-                try {
-                  Thread.sleep(SLEEP_PERIOD);
-                } catch (InterruptedException e) {
-                  kill(this);
-                  return;
-                }
-
-                checkCalls();
-                tryStop(this);
-              }
-            }
-          };
-
-          final boolean set = running.compareAndSet(current, daemon);
-          Preconditions.checkState(set);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
-          }
-          daemon.start();
-        }
-      }
-
-      void tryStop(Daemon d) {
-        if (queue.isEmpty(GRACE_PERIOD)) {
-          kill(d);
-        }
-      }
-
-      void kill(Daemon d) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Killing " + d);
-        }
-        final boolean set = running.compareAndSet(d, null);
-        Preconditions.checkState(set);
-      }
-    }
-  }
-
-  static class AsyncValue<V> {
-    private V value;
-
-    synchronized V waitAsyncValue(long timeout, TimeUnit unit)
-        throws InterruptedException, TimeoutException {
-      if (value != null) {
-        return value;
-      }
-      AsyncGet.Util.wait(this, timeout, unit);
-      if (value != null) {
-        return value;
-      }
-
-      throw new TimeoutException("waitCallReturn timed out "
-          + timeout + " " + unit);
-    }
-
-    synchronized void set(V v) {
-      Preconditions.checkNotNull(v);
-      Preconditions.checkState(value == null);
-      value = v;
-      notify();
-    }
-
-    synchronized boolean isDone() {
-      return value != null;
-    }
-  }
-
-  static class AsyncCall extends RetryInvocationHandler.Call {
-    private final AsyncCallHandler asyncCallHandler;
-
-    private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
-    private AsyncGet<?, Exception> lowerLayerAsyncGet;
-
-    AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
-              RetryInvocationHandler.Counters counters,
-              RetryInvocationHandler<?> retryInvocationHandler,
-              AsyncCallHandler asyncCallHandler) {
-      super(method, args, isRpc, callId, counters, retryInvocationHandler);
-
-      this.asyncCallHandler = asyncCallHandler;
-    }
-
-    /** @return true if the call is done; otherwise, return false. */
-    boolean isDone() {
-      final CallReturn r = invokeOnce();
-      switch (r.getState()) {
-        case RETURNED:
-        case EXCEPTION:
-          asyncCallReturn.set(r); // the async call is done
-          return true;
-        case RETRY:
-          invokeOnce();
-          break;
-        case ASYNC_CALL_IN_PROGRESS:
-        case ASYNC_INVOKED:
-          // nothing to do
-          break;
-        default:
-          Preconditions.checkState(false);
-      }
-      return false;
-    }
-
-    @Override
-    CallReturn invoke() throws Throwable {
-      LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
-      if (lowerLayerAsyncGet != null) {
-        // async call was submitted early, check the lower level async call
-        final boolean isDone = lowerLayerAsyncGet.isDone();
-        LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
-        if (!isDone) {
-          return CallReturn.ASYNC_CALL_IN_PROGRESS;
-        }
-        try {
-          return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
-        } finally {
-          lowerLayerAsyncGet = null;
-        }
-      }
-
-      // submit a new async call
-      LOG.trace("invoke: ASYNC_INVOKED");
-      final boolean mode = Client.isAsynchronousMode();
-      try {
-        Client.setAsynchronousMode(true);
-        final Object r = invokeMethod();
-        // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
-        Preconditions.checkState(r == null);
-        lowerLayerAsyncGet = getLowerLayerAsyncReturn();
-
-        if (counters.isZeros()) {
-          // first async attempt, initialize
-          LOG.trace("invoke: initAsyncCall");
-          asyncCallHandler.initAsyncCall(this, asyncCallReturn);
-        }
-        return CallReturn.ASYNC_INVOKED;
-      } finally {
-        Client.setAsynchronousMode(mode);
-      }
-    }
-  }
-
-  private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
-  private volatile boolean hasSuccessfulCall = false;
-
-  AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
-                         int callId, RetryInvocationHandler.Counters counters,
-                         RetryInvocationHandler<?> retryInvocationHandler) {
-    return new AsyncCall(method, args, isRpc, callId, counters,
-        retryInvocationHandler, this);
-  }
-
-  boolean hasSuccessfulCall() {
-    return hasSuccessfulCall;
-  }
-
-  private void initAsyncCall(final AsyncCall asyncCall,
-                             final AsyncValue<CallReturn> asyncCallReturn) {
-    asyncCalls.addCall(asyncCall);
-
-    final AsyncGet<Object, Throwable> asyncGet
-        = new AsyncGet<Object, Throwable>() {
-      @Override
-      public Object get(long timeout, TimeUnit unit) throws Throwable {
-        final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
-        final Object r = c.getReturnValue();
-        hasSuccessfulCall = true;
-        return r;
-      }
-
-      @Override
-      public boolean isDone() {
-        return asyncCallReturn.isDone();
-      }
-    };
-    ASYNC_RETURN.set(asyncGet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
deleted file mode 100644
index 943725c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-
-/** The call return from a method invocation. */
-class CallReturn {
-  /** The return state. */
-  enum State {
-    /** Call is returned successfully. */
-    RETURNED,
-    /** Call throws an exception. */
-    EXCEPTION,
-    /** Call should be retried according to the {@link RetryPolicy}. */
-    RETRY,
-    /** Call, which is async, is still in progress. */
-    ASYNC_CALL_IN_PROGRESS,
-    /** Call, which is async, just has been invoked. */
-    ASYNC_INVOKED
-  }
-
-  static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
-      State.ASYNC_CALL_IN_PROGRESS);
-  static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
-  static final CallReturn RETRY = new CallReturn(State.RETRY);
-
-  private final Object returnValue;
-  private final Throwable thrown;
-  private final State state;
-
-  CallReturn(Object r) {
-    this(r, null, State.RETURNED);
-  }
-  CallReturn(Throwable t) {
-    this(null, t, State.EXCEPTION);
-    Preconditions.checkNotNull(t);
-  }
-  private CallReturn(State s) {
-    this(null, null, s);
-  }
-  private CallReturn(Object r, Throwable t, State s) {
-    Preconditions.checkArgument(r == null || t == null);
-    returnValue = r;
-    thrown = t;
-    state = s;
-  }
-
-  State getState() {
-    return state;
-  }
-
-  Object getReturnValue() throws Throwable {
-    if (state == State.EXCEPTION) {
-      throw thrown;
-    }
-    Preconditions.checkState(state == State.RETURNED, "state == %s", state);
-    return returnValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index f2b2c99..300d0c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,83 +42,11 @@ import java.util.Map;
 public class RetryInvocationHandler<T> implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
 
-  static class Call {
-    private final Method method;
-    private final Object[] args;
-    private final boolean isRpc;
-    private final int callId;
-    final Counters counters;
-
-    private final RetryPolicy retryPolicy;
-    private final RetryInvocationHandler<?> retryInvocationHandler;
-
-    Call(Method method, Object[] args, boolean isRpc, int callId,
-         Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
-      this.method = method;
-      this.args = args;
-      this.isRpc = isRpc;
-      this.callId = callId;
-      this.counters = counters;
-
-      this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
-      this.retryInvocationHandler = retryInvocationHandler;
-    }
-
-    /** Invoke the call once without retrying. */
-    synchronized CallReturn invokeOnce() {
-      try {
-        // The number of times this invocation handler has ever been failed over
-        // before this method invocation attempt. Used to prevent concurrent
-        // failed method invocations from triggering multiple failover attempts.
-        final long failoverCount = retryInvocationHandler.getFailoverCount();
-        try {
-          return invoke();
-        } catch (Exception e) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this, e);
-          }
-          if (Thread.currentThread().isInterrupted()) {
-            // If interrupted, do not retry.
-            throw e;
-          }
-          retryInvocationHandler.handleException(
-              method, retryPolicy, failoverCount, counters, e);
-          return CallReturn.RETRY;
-        }
-      } catch(Throwable t) {
-        return new CallReturn(t);
-      }
-    }
-
-    CallReturn invoke() throws Throwable {
-      return new CallReturn(invokeMethod());
-    }
-
-    Object invokeMethod() throws Throwable {
-      if (isRpc) {
-        Client.setCallIdAndRetryCount(callId, counters.retries);
-      }
-      return retryInvocationHandler.invokeMethod(method, args);
-    }
-
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + "#" + callId + ": "
-          + method.getDeclaringClass().getSimpleName() + "." + method.getName()
-          + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
-          +  ")";
-    }
-  }
-
-  static class Counters {
+  private static class Counters {
     /** Counter for retries. */
     private int retries;
     /** Counter for method invocation has been failed over. */
     private int failovers;
-
-    boolean isZeros() {
-      return retries == 0 && failovers == 0;
-    }
   }
 
   private static class ProxyDescriptor<T> {
@@ -216,13 +144,11 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
 
   private final ProxyDescriptor<T> proxyDescriptor;
 
-  private volatile boolean hasSuccessfulCall = false;
-
+  private volatile boolean hasMadeASuccessfulCall = false;
+  
   private final RetryPolicy defaultPolicy;
   private final Map<String,RetryPolicy> methodNameToPolicyMap;
 
-  private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
-
   protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
       RetryPolicy retryPolicy) {
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -241,35 +167,38 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     return policy != null? policy: defaultPolicy;
   }
 
-  private long getFailoverCount() {
-    return proxyDescriptor.getFailoverCount();
-  }
-
-  private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
-                       Counters counters) {
-    if (Client.isAsynchronousMode()) {
-      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
-          counters, this);
-    } else {
-      return new Call(method, args, isRpc, callId, counters, this);
-    }
-  }
-
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
     final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
     final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
-    final Counters counters = new Counters();
+    return invoke(method, args, isRpc, callId, new Counters());
+  }
+
+  private Object invoke(final Method method, final Object[] args,
+      final boolean isRpc, final int callId, final Counters counters)
+      throws Throwable {
+    final RetryPolicy policy = getRetryPolicy(method);
 
-    final Call call = newCall(method, args, isRpc, callId, counters);
     while (true) {
-      final CallReturn c = call.invokeOnce();
-      final CallReturn.State state = c.getState();
-      if (state == CallReturn.State.ASYNC_INVOKED) {
-        return null; // return null for async calls
-      } else if (c.getState() != CallReturn.State.RETRY) {
-        return c.getReturnValue();
+      // The number of times this invocation handler has ever been failed over,
+      // before this method invocation attempt. Used to prevent concurrent
+      // failed method invocations from triggering multiple failover attempts.
+      final long failoverCount = proxyDescriptor.getFailoverCount();
+
+      if (isRpc) {
+        Client.setCallIdAndRetryCount(callId, counters.retries);
+      }
+      try {
+        final Object ret = invokeMethod(method, args);
+        hasMadeASuccessfulCall = true;
+        return ret;
+      } catch (Exception ex) {
+        if (Thread.currentThread().isInterrupted()) {
+          // If interrupted, do not retry.
+          throw ex;
+        }
+        handleException(method, policy, failoverCount, counters, ex);
       }
     }
   }
@@ -310,8 +239,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       final int failovers, final long delay, final Exception ex) {
     // log info if this has made some successful calls or
     // this is not the first failover
-    final boolean info = hasSuccessfulCall || failovers != 0
-        || asyncCallHandler.hasSuccessfulCall();
+    final boolean info = hasMadeASuccessfulCall || failovers != 0;
     if (!info && !LOG.isDebugEnabled()) {
       return;
     }
@@ -337,9 +265,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       if (!method.isAccessible()) {
         method.setAccessible(true);
       }
-      final Object r = method.invoke(proxyDescriptor.getProxy(), args);
-      hasSuccessfulCall = true;
-      return r;
+      return method.invoke(proxyDescriptor.getProxy(), args);
     } catch (InvocationTargetException e) {
       throw e.getCause();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index c0a14b7..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.io.retry;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
@@ -648,9 +647,8 @@ public class RetryPolicies {
         return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
             + retries + ") exceeded maximum allowed (" + maxRetries + ")");
       }
-
+      
       if (e instanceof ConnectException ||
-          e instanceof EOFException ||
           e instanceof NoRouteToHostException ||
           e instanceof UnknownHostException ||
           e instanceof StandbyException ||

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 6d8c967..892df89 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
@@ -93,8 +94,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
-      ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+      = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -105,9 +106,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T extends Writable> AsyncGet<T, IOException>
-      getAsyncRpcResponse() {
-    return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+  public static <T> Future<T> getAsyncRpcResponse() {
+    return (Future<T>) ASYNC_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1415,16 +1415,9 @@ public class Client implements AutoCloseable {
             }
           }
         }
-
-        @Override
-        public boolean isDone() {
-          synchronized (call) {
-            return call.done;
-          }
-        }
       };
 
-      ASYNC_RPC_RESPONSE.set(asyncGet);
+      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
       return null;
     } else {
       return getRpcResponse(call, connection, -1, null);
@@ -1469,8 +1462,10 @@ public class Client implements AutoCloseable {
     synchronized (call) {
       while (!call.done) {
         try {
-          AsyncGet.Util.wait(call, timeout, unit);
-          if (timeout >= 0 && !call.done) {
+          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+              timeout, unit);
+          call.wait(waitTimeout); // wait for the result
+          if (waitTimeout > 0 && !call.done) {
             return null;
           }
         } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index ea629b1..4641a67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,6 +54,7 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -255,18 +256,14 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final AsyncGet<RpcResponseWrapper, IOException> arr
-            = Client.getAsyncRpcResponse();
+        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
         final AsyncGet<Message, Exception> asyncGet
             = new AsyncGet<Message, Exception>() {
           @Override
           public Message get(long timeout, TimeUnit unit) throws Exception {
-            return getReturnMessage(method, arr.get(timeout, unit));
-          }
-
-          @Override
-          public boolean isDone() {
-            return arr.isDone();
+            final RpcResponseWrapper rrw = timeout < 0?
+                frrw.get(): frrw.get(timeout, unit);
+            return getReturnMessage(method, rrw);
           }
         };
         ASYNC_RETURN_MESSAGE.set(asyncGet);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index f124890..5eac869 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,19 +47,14 @@ public interface AsyncGet<R, E extends Throwable> {
   R get(long timeout, TimeUnit unit)
       throws E, TimeoutException, InterruptedException;
 
-  /** @return true if the underlying computation is done; false, otherwise. */
-  boolean isDone();
-
   /** Utility */
   class Util {
-    /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
-    public static void wait(Object obj, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      if (timeout < 0) {
-        obj.wait();
-      } else if (timeout > 0) {
-        obj.wait(unit.toMillis(timeout));
-      }
+    /**
+     * @return {@link Object#wait(long)} timeout converted
+     *         from {@link #get(long, TimeUnit)} timeout.
+     */
+    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index ef27e12..7623975 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,12 +50,6 @@ public class TestAsyncIPC {
   private static Configuration conf;
   private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
 
-  static <T extends Writable> AsyncGetFuture<T, IOException>
-      getAsyncRpcResponseFuture() {
-    return (AsyncGetFuture<T, IOException>) new AsyncGetFuture<>(
-        Client.getAsyncRpcResponse());
-  }
-
   @Before
   public void setupConf() {
     conf = new Configuration();
@@ -91,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+          Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {
@@ -212,7 +205,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+      Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
       returnFutures.put(idx, returnFuture);
       expectedValues.put(idx, param);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 472b1d4..b507fa5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
@@ -51,8 +52,10 @@ public class AsyncDistributedFileSystem {
     this.dfs = dfs;
   }
 
-  private static <T> Future<T> getReturnValue() {
-    return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+  static <T> Future<T> getReturnValue() {
+    final AsyncGet<T, Exception> asyncGet
+        = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
+    return new AsyncGetFuture<>(asyncGet);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index c582991..3f10c98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -176,7 +177,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -205,6 +205,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
+  private static final ThreadLocal<AsyncGet<?, Exception>>
+      ASYNC_RETURN_VALUE = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -237,6 +239,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+  }
+
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -375,13 +383,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
         asyncReturnMessage.get(timeout, unit);
         return null;
       }
-
-      @Override
-      public boolean isDone() {
-        return asyncReturnMessage.isDone();
-      }
     };
-    AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+    ASYNC_RETURN_VALUE.set(asyncGet);
   }
 
   @Override
@@ -1353,20 +1356,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
         rpcProxy.getAclStatus(null, req);
         final AsyncGet<Message, Exception> asyncReturnMessage
             = ProtobufRpcEngine.getAsyncReturnMessage();
-        final AsyncGet<AclStatus, Exception> asyncGet
-            = new AsyncGet<AclStatus, Exception>() {
-          @Override
-          public AclStatus get(long timeout, TimeUnit unit) throws Exception {
-            return PBHelperClient.convert((GetAclStatusResponseProto)
-                asyncReturnMessage.get(timeout, unit));
-          }
-
-          @Override
-          public boolean isDone() {
-            return asyncReturnMessage.isDone();
-          }
-        };
-        AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+        final AsyncGet<AclStatus, Exception> asyncGet =
+            new AsyncGet<AclStatus, Exception>() {
+              @Override
+              public AclStatus get(long timeout, TimeUnit unit)
+                  throws Exception {
+                return PBHelperClient
+                    .convert((GetAclStatusResponseProto) asyncReturnMessage
+                        .get(timeout, unit));
+              }
+            };
+        ASYNC_RETURN_VALUE.set(asyncGet);
         return null;
       } else {
         return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 6a60290..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -71,7 +70,7 @@ public class TestAsyncDFS {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
   private final short replFactor = 1;
   private final long blockSize = 512;
-  private long fileLen = 0;
+  private long fileLen = blockSize * 3;
   private final long seed = Time.now();
   private final Random r = new Random(seed);
   private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -81,7 +80,7 @@ public class TestAsyncDFS {
 
   private Configuration conf;
   private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
+  private FileSystem fs;
   private AsyncDistributedFileSystem adfs;
 
   @Before
@@ -96,10 +95,10 @@ public class TestAsyncDFS {
         ASYNC_CALL_LIMIT);
     // set server handlers
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
-    fs = cluster.getFileSystem();
-    adfs = fs.getAsyncDistributedFileSystem();
+    fs = FileSystem.get(conf);
+    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
   }
 
   @After
@@ -114,6 +113,31 @@ public class TestAsyncDFS {
     }
   }
 
+  static class AclQueueEntry {
+    private final Object future;
+    private final Path path;
+    private final Boolean isSetAcl;
+
+    AclQueueEntry(final Object future, final Path path,
+        final Boolean isSetAcl) {
+      this.future = future;
+      this.path = path;
+      this.isSetAcl = isSetAcl;
+    }
+
+    public final Object getFuture() {
+      return future;
+    }
+
+    public final Path getPath() {
+      return path;
+    }
+
+    public final Boolean isSetAcl() {
+      return this.isSetAcl;
+    }
+  }
+
   @Test(timeout=60000)
   public void testBatchAsyncAcl() throws Exception {
     final String basePath = "testBatchAsyncAcl";
@@ -324,7 +348,7 @@ public class TestAsyncDFS {
 
   public static void checkPermissionDenied(final Exception e, final Path dir,
       final String user) {
-    assertTrue(e.getCause() instanceof RemoteException);
+    assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
@@ -446,9 +470,4 @@ public class TestAsyncDFS {
       assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
     }
   }
-
-  @Test
-  public void testAsyncWithoutRetry() throws Exception {
-    TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
deleted file mode 100644
index 70ca03d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/** Test async methods with HA setup. */
-public class TestAsyncHDFSWithHA {
-  static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
-  static {
-    GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
-  }
-
-  private static <T> Future<T> getReturnValue() {
-    return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
-  }
-
-  static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
-                     Path[] dsts) throws IOException {
-    for (int i = 0; i < srcs.length; i++) {
-      srcs[i] = new Path(dir, "src" + i);
-      dsts[i] = new Path(dir, "dst" + i);
-      dfs.mkdirs(srcs[i]);
-    }
-  }
-
-  static void runTestAsyncWithoutRetry(Configuration conf,
-      MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
-    final int num = 5;
-
-    final String renameDir = "/testAsyncWithoutRetry/";
-    final Path[] srcs = new Path[num + 1];
-    final Path[] dsts = new Path[num + 1];
-    mkdirs(dfs, renameDir, srcs, dsts);
-
-    // create a proxy without retry.
-    final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
-        = NameNodeProxies.createNonHAProxy(conf,
-        cluster.getNameNode(0).getNameNodeAddress(),
-        ClientProtocol.class, UserGroupInformation.getCurrentUser(),
-        false);
-    final ClientProtocol cp = proxyInfo.getProxy();
-
-    // submit async calls
-    Client.setAsynchronousMode(true);
-    final List<Future<Void>> results = new ArrayList<>();
-    for (int i = 0; i < num; i++) {
-      final String src = srcs[i].toString();
-      final String dst = dsts[i].toString();
-      LOG.info(i + ") rename " + src + " -> " + dst);
-      cp.rename2(src, dst);
-      final Future<Void> returnValue = getReturnValue();
-      results.add(returnValue);
-    }
-    Client.setAsynchronousMode(false);
-
-    // wait for the async calls
-    for (Future<Void> f : results) {
-      f.get();
-    }
-
-    //check results
-    for (int i = 0; i < num; i++) {
-      Assert.assertEquals(false, dfs.exists(srcs[i]));
-      Assert.assertEquals(true, dfs.exists(dsts[i]));
-    }
-  }
-
-  /** Testing HDFS async methods with HA setup. */
-  @Test(timeout = 120000)
-  public void testAsyncWithHAFailover() throws Exception {
-    final int num = 10;
-
-    final Configuration conf = new HdfsConfiguration();
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(0).build();
-
-    try {
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-
-      final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
-          cluster, conf);
-      runTestAsyncWithoutRetry(conf, cluster, dfs);
-
-      final String renameDir = "/testAsyncWithHAFailover/";
-      final Path[] srcs = new Path[num + 1];
-      final Path[] dsts = new Path[num + 1];
-      mkdirs(dfs, renameDir, srcs, dsts);
-
-      // submit async calls and trigger failover in the middle.
-      final AsyncDistributedFileSystem adfs
-          = dfs.getAsyncDistributedFileSystem();
-      final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
-
-      final List<Future<Void>> results = new ArrayList<>();
-      final List<IOException> exceptions = new ArrayList<>();
-      final List<Future<?>> futures = new ArrayList<>();
-      final int half = num/2;
-      for(int i = 0; i <= num; i++) {
-        final int id = i;
-        futures.add(executor.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              if (id == half) {
-                // failover
-                cluster.shutdownNameNode(0);
-                cluster.transitionToActive(1);
-              } else {
-                // rename
-                results.add(adfs.rename(srcs[id], dsts[id]));
-              }
-            } catch (IOException e) {
-              exceptions.add(e);
-            }
-          }
-        }));
-      }
-
-      // wait for the tasks
-      Assert.assertEquals(num + 1, futures.size());
-      for(int i = 0; i <= num; i++) {
-        futures.get(i).get();
-      }
-      // wait for the async calls
-      Assert.assertEquals(num, results.size());
-      Assert.assertTrue(exceptions.isEmpty());
-      for(Future<Void> r : results) {
-        r.get();
-      }
-
-      // check results
-      for(int i = 0; i <= num; i++) {
-        final boolean renamed = i != half;
-        Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
-        Assert.assertEquals(renamed, dfs.exists(dsts[i]));
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bfadb8e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index f504f36..356b223 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -132,8 +131,7 @@ public abstract class HATestUtil {
   }
   
   /** Gets the filesystem instance by setting the failover configurations */
-  public static DistributedFileSystem configureFailoverFs(
-      MiniDFSCluster cluster, Configuration conf)
+  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
       throws IOException, URISyntaxException {
     return configureFailoverFs(cluster, conf, 0);
   }
@@ -145,14 +143,13 @@ public abstract class HATestUtil {
    * @param nsIndex namespace index starting with zero
    * @throws IOException if an error occurs rolling the edit log
    */
-  public static DistributedFileSystem configureFailoverFs(
-      MiniDFSCluster cluster, Configuration conf,
+  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
       int nsIndex) throws IOException, URISyntaxException {
     conf = new Configuration(conf);
     String logicalName = getLogicalHostname(cluster);
     setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
     FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
-    return (DistributedFileSystem)fs;
+    return fs;
   }
   
   public static void setFailoverConfigurations(MiniDFSCluster cluster,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/8] hadoop git commit: Revert "HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou."

Posted by wa...@apache.org.
Revert "HDFS-10430. Reuse FileSystem#access in TestAsyncDFS. Contributed by Xiaobing Zhou."

This reverts commit 193f822b9a6e95575e90e2703e30abde70c6ba6c.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f8f40b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f8f40b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f8f40b2

Branch: refs/heads/branch-2
Commit: 6f8f40b2a5c01fc10eb7da1e8e9c3fc7c7b8f957
Parents: 5bfadb8
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:48 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:48 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 36 +++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f8f40b2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index c7615a9..ddcf492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,16 +46,19 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -441,7 +445,7 @@ public class TestAsyncDFS {
     for (int i = 0; i < NUM_TESTS; i++) {
       assertTrue(fs.exists(dsts[i]));
       FsPermission fsPerm = new FsPermission(permissions[i]);
-      fs.access(dsts[i], fsPerm.getUserAction());
+      checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
     }
 
     // test setOwner
@@ -470,4 +474,34 @@ public class TestAsyncDFS {
       assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
     }
   }
+
+  static void checkAccessPermissions(FileStatus stat, FsAction mode)
+      throws IOException {
+    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+  }
+
+  static void checkAccessPermissions(final UserGroupInformation ugi,
+      FileStatus stat, FsAction mode) throws IOException {
+    FsPermission perm = stat.getPermission();
+    String user = ugi.getShortUserName();
+    List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+    if (user.equals(stat.getOwner())) {
+      if (perm.getUserAction().implies(mode)) {
+        return;
+      }
+    } else if (groups.contains(stat.getGroup())) {
+      if (perm.getGroupAction().implies(mode)) {
+        return;
+      }
+    } else {
+      if (perm.getOtherAction().implies(mode)) {
+        return;
+      }
+    }
+    throw new AccessControlException(String.format(
+        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+            .getPath(), stat.getOwner(), stat.getGroup(),
+        stat.isDirectory() ? "d" : "-", perm));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/8] hadoop git commit: Revert "HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem. Contributed by Xiaobing Zhou"

Posted by wa...@apache.org.
Revert "HDFS-10346. Implement asynchronous setPermission/setOwner for DistributedFileSystem.  Contributed by  Xiaobing Zhou"

This reverts commit 69768bf9e057cb0b433e398b328c40fe8f1586c8.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2529cabf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2529cabf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2529cabf

Branch: refs/heads/branch-2
Commit: 2529cabf12e2c37740723fe7f52cc7bd94d92f0d
Parents: 071aeab
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:49 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:49 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  59 ----
 .../ClientNamenodeProtocolTranslatorPB.java     |  39 +--
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 267 ++-----------------
 .../apache/hadoop/hdfs/TestDFSPermission.java   |  29 +-
 4 files changed, 43 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2529cabf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 4fe0861..356ae3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
 import org.apache.hadoop.ipc.Client;
 
@@ -38,9 +37,6 @@ import com.google.common.util.concurrent.AbstractFuture;
  * This instance of this class is the way end-user code interacts
  * with a Hadoop DistributedFileSystem in an asynchronous manner.
  *
- * This class is unstable, so no guarantee is provided as to reliability,
- * stability or compatibility across any level of release granularity.
- *
  *****************************************************************/
 @Unstable
 public class AsyncDistributedFileSystem {
@@ -115,59 +111,4 @@ public class AsyncDistributedFileSystem {
       Client.setAsynchronousMode(isAsync);
     }
   }
-
-  /**
-   * Set permission of a path.
-   *
-   * @param p
-   *          the path the permission is set to
-   * @param permission
-   *          the permission that is set to a path.
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<Void> setPermission(Path p, final FsPermission permission)
-      throws IOException {
-    dfs.getFsStatistics().incrementWriteOps(1);
-    final Path absPath = dfs.fixRelativePart(p);
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().setPermission(dfs.getPathName(absPath), permission);
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
-
-  /**
-   * Set owner of a path (i.e. a file or a directory). The parameters username
-   * and groupname cannot both be null.
-   *
-   * @param p
-   *          The path
-   * @param username
-   *          If it is null, the original username remains unchanged.
-   * @param groupname
-   *          If it is null, the original groupname remains unchanged.
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<Void> setOwner(Path p, String username, String groupname)
-      throws IOException {
-    if (username == null && groupname == null) {
-      throw new IOException("username == null && groupname == null");
-    }
-
-    dfs.getFsStatistics().incrementWriteOps(1);
-    final Path absPath = dfs.fixRelativePart(p);
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname);
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2529cabf/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 28ac78d..9a52d93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -359,30 +359,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setPermission(PBHelperClient.convert(permission))
         .build();
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.setPermission(null, req);
-        setReturnValueCallback();
-      } else {
-        rpcProxy.setPermission(null, req);
-      }
+      rpcProxy.setPermission(null, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
   }
 
-  private void setReturnValueCallback() {
-    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-        .getReturnMessageCallback();
-    Callable<Void> callBack = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        returnMessageCallback.call();
-        return null;
-      }
-    };
-    RETURN_VALUE_CALLBACK.set(callBack);
-  }
-
   @Override
   public void setOwner(String src, String username, String groupname)
       throws IOException {
@@ -393,12 +375,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     if (groupname != null)
       req.setGroupname(groupname);
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.setOwner(null, req.build());
-        setReturnValueCallback();
-      } else {
-        rpcProxy.setOwner(null, req.build());
-      }
+      rpcProxy.setOwner(null, req.build());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -527,7 +504,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-        setReturnValueCallback();
+
+        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+            .getReturnMessageCallback();
+        Callable<Void> callBack = new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            returnMessageCallback.call();
+            return null;
+          }
+        };
+        RETURN_VALUE_CALLBACK.set(callBack);
       } else {
         rpcProxy.rename2(null, req);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2529cabf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 7539fbd..d129299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -22,11 +22,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -34,30 +31,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  private final long seed = Time.now();
-  private final Random r = new Random(seed);
-  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
-  private final short replFactor = 2;
-  private final long blockSize = 512;
-  private long fileLen = blockSize * 3;
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
@@ -65,6 +50,8 @@ public class TestAsyncDFSRename {
    */
   @Test(timeout = 60000)
   public void testAsyncRenameWithOverwrite() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
         replFactor).build();
@@ -73,6 +60,8 @@ public class TestAsyncDFSRename {
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
 
     try {
+
+      long fileLen = blockSize * 3;
       String src = "/foo/src";
       String dst = "/foo/dst";
       String src2 = "/foo/src2";
@@ -126,6 +115,8 @@ public class TestAsyncDFSRename {
 
   @Test(timeout = 60000)
   public void testCallGetReturnValueMultipleTimes() throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
     final Path renameDir = new Path(
         "/test/testCallGetReturnValueMultipleTimes/");
     final Configuration conf = new HdfsConfiguration();
@@ -136,6 +127,7 @@ public class TestAsyncDFSRename {
     final DistributedFileSystem dfs = cluster.getFileSystem();
     final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     final int count = 100;
+    long fileLen = blockSize * 3;
     final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
     assertTrue(dfs.mkdirs(renameDir));
@@ -186,15 +178,15 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test
-  public void testConservativeConcurrentAsyncRenameWithOverwrite()
+  @Test(timeout = 120000)
+  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
       throws Exception {
     internalTestConcurrentAsyncRenameWithOverwrite(100,
         "testAggressiveConcurrentAsyncRenameWithOverwrite");
   }
 
   @Test(timeout = 60000)
-  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+  public void testConservativeConcurrentAsyncRenameWithOverwrite()
       throws Exception {
     internalTestConcurrentAsyncRenameWithOverwrite(10000,
         "testConservativeConcurrentAsyncRenameWithOverwrite");
@@ -202,6 +194,8 @@ public class TestAsyncDFSRename {
 
   private void internalTestConcurrentAsyncRenameWithOverwrite(
       final int asyncCallLimit, final String basePath) throws Exception {
+    final short replFactor = 2;
+    final long blockSize = 512;
     final Path renameDir = new Path(String.format("/test/%s/", basePath));
     Configuration conf = new HdfsConfiguration();
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
@@ -212,6 +206,7 @@ public class TestAsyncDFSRename {
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
+    long fileLen = blockSize * 3;
     int start = 0, end = 0;
     Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
@@ -279,206 +274,8 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test
-  public void testConservativeConcurrentAsyncAPI() throws Exception {
-    internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
-  }
-
-  @Test(timeout = 60000)
-  public void testAggressiveConcurrentAsyncAPI() throws Exception {
-    internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
-  }
-
-  private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
-      final String basePath) throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    int count = 500;
-
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-    // set the limit of max async calls
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        asyncCallLimit);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
-    // start mini cluster
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3).build();
-    cluster.waitActive();
-    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
-        .getAsyncDistributedFileSystem();
-
-    // prepare for test
-    FileSystem rootFs = FileSystem.get(conf);
-    final Path parent = new Path(String.format("/test/%s/", basePath));
-    final Path[] srcs = new Path[count];
-    final Path[] dsts = new Path[count];
-    short[] permissions = new short[count];
-    for (int i = 0; i < count; i++) {
-      srcs[i] = new Path(parent, "src" + i);
-      dsts[i] = new Path(parent, "dst" + i);
-      DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
-      DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
-      assertTrue(rootFs.exists(srcs[i]));
-      assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
-      assertTrue(rootFs.exists(dsts[i]));
-      assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
-      permissions[i] = permGenerator.next();
-    }
-
-    Map<Integer, Future<Void>> renameRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> permRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> ownerRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    int start = 0, end = 0;
-    // test rename
-    for (int i = 0; i < count; i++) {
-      for (;;) {
-        try {
-          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
-              Rename.OVERWRITE);
-          renameRetFutures.put(i, returnFuture);
-          break;
-        } catch (AsyncCallLimitExceededException e) {
-          start = end;
-          end = i;
-          waitForReturnValues(renameRetFutures, start, end);
-        }
-      }
-    }
-
-    // wait for completing the calls
-    for (int i = start; i < count; i++) {
-      renameRetFutures.get(i).get();
-    }
-
-    // Restart NN and check the rename successfully
-    cluster.restartNameNodes();
-
-    // very the src should not exist, dst should
-    for (int i = 0; i < count; i++) {
-      assertFalse(rootFs.exists(srcs[i]));
-      assertTrue(rootFs.exists(dsts[i]));
-    }
-
-    // test permissions
-    try {
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<Void> retFuture = adfs.setPermission(dsts[i],
-                new FsPermission(permissions[i]));
-            permRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForReturnValues(permRetFutures, start, end);
-          }
-        }
-      }
-      // wait for completing the calls
-      for (int i = start; i < count; i++) {
-        permRetFutures.get(i).get();
-      }
-
-      // Restart NN and check permission then
-      cluster.restartNameNodes();
-
-      // verify the permission
-      for (int i = 0; i < count; i++) {
-        assertTrue(rootFs.exists(dsts[i]));
-        FsPermission fsPerm = new FsPermission(permissions[i]);
-        checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
-            fsPerm.getUserAction());
-      }
-
-      // test setOwner
-      start = 0;
-      end = 0;
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
-                "group2");
-            ownerRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForReturnValues(ownerRetFutures, start, end);
-          }
-        }
-      }
-      // wait for completing the calls
-      for (int i = start; i < count; i++) {
-        ownerRetFutures.get(i).get();
-      }
-
-      // Restart NN and check owner then
-      cluster.restartNameNodes();
-
-      // verify the owner
-      for (int i = 0; i < count; i++) {
-        assertTrue(rootFs.exists(dsts[i]));
-        assertTrue(
-            "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
-        assertTrue(
-            "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
-      }
-    } catch (AccessControlException ace) {
-      throw ace;
-    } finally {
-      if (rootFs != null) {
-        rootFs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  static void checkAccessPermissions(FileStatus stat, FsAction mode)
-      throws IOException {
-    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
-  }
-
-  static void checkAccessPermissions(final UserGroupInformation ugi,
-      FileStatus stat, FsAction mode) throws IOException {
-    FsPermission perm = stat.getPermission();
-    String user = ugi.getShortUserName();
-    List<String> groups = Arrays.asList(ugi.getGroupNames());
-
-    if (user.equals(stat.getOwner())) {
-      if (perm.getUserAction().implies(mode)) {
-        return;
-      }
-    } else if (groups.contains(stat.getGroup())) {
-      if (perm.getGroupAction().implies(mode)) {
-        return;
-      }
-    } else {
-      if (perm.getOtherAction().implies(mode)) {
-        return;
-      }
-    }
-    throw new AccessControlException(String.format(
-        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
-            .getPath(), stat.getOwner(), stat.getGroup(),
-        stat.isDirectory() ? "d" : "-", perm));
-  }
-
   @Test(timeout = 60000)
-  public void testAsyncAPIWithException() throws Exception {
+  public void testAsyncRenameWithException() throws Exception {
     Configuration conf = new HdfsConfiguration();
     String group1 = "group1";
     String group2 = "group2";
@@ -489,9 +286,9 @@ public class TestAsyncDFSRename {
     conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
 
     // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(user1, new String[] { group1, group2 });
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
 
     // Initiate all four users
     ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
@@ -502,7 +299,7 @@ public class TestAsyncDFSRename {
     cluster.waitActive();
 
     FileSystem rootFs = FileSystem.get(conf);
-    final Path renameDir = new Path("/test/async_api_exception/");
+    final Path renameDir = new Path("/test/async_rename_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
@@ -515,33 +312,11 @@ public class TestAsyncDFSRename {
           }
         });
 
-    Future<Void> retFuture;
-    try {
-      retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the path parent", e
-          .getMessage().contains(src.getParent().toUri().getPath()));
-    }
-
-    FsPermission fsPerm = new FsPermission(permGenerator.next());
-    try {
-      retFuture = adfs.setPermission(src, fsPerm);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the name of the path",
-          e.getMessage().contains(src.getName()));
-    }
-
     try {
-      retFuture = adfs.setOwner(src, "user1", "group2");
-      retFuture.get();
+      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+      returnFuture.get();
     } catch (ExecutionException e) {
       checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the name of the path",
-          e.getMessage().contains(src.getName()));
     } finally {
       if (rootFs != null) {
         rootFs.close();
@@ -559,5 +334,7 @@ public class TestAsyncDFSRename {
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
         .getMessage().contains(user));
+    assertTrue("Permission denied messages must carry the path parent", e
+        .getMessage().contains(dir.getParent().toUri().getPath()));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2529cabf/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
index 66a0380..aa204cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -196,35 +196,22 @@ public class TestDFSPermission {
     return fs.getFileStatus(path).getPermission().toShort();
   }
 
-  private void create(OpType op, Path name) throws IOException {
-    create(fs, conf, op, name);
-  }
-
   /* create a file/directory with the default umask and permission */
-  static void create(final FileSystem fs, final Configuration fsConf,
-      OpType op, Path name) throws IOException {
-    create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission(
-        DEFAULT_PERMISSION));
-  }
-
-  private void create(OpType op, Path name, short umask,
-      FsPermission permission)
-      throws IOException {
-    create(fs, conf, op, name, umask, permission);
+  private void create(OpType op, Path name) throws IOException {
+    create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION));
   }
 
   /* create a file/directory with the given umask and permission */
-  static void create(final FileSystem fs, final Configuration fsConf,
-      OpType op, Path name, short umask, FsPermission permission)
-      throws IOException {
+  private void create(OpType op, Path name, short umask, 
+      FsPermission permission) throws IOException {
     // set umask in configuration, converting to padded octal
-    fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
+    conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
 
     // create the file/directory
     switch (op) {
     case CREATE:
       FSDataOutputStream out = fs.create(name, permission, true, 
-          fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+          conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
           fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null);
       out.close();
       break;
@@ -372,7 +359,7 @@ public class TestDFSPermission {
   final static private String DIR_NAME = "dir";
   final static private String FILE_DIR_NAME = "filedir";
 
-  enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
+  private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION,
     GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE
   };
 
@@ -628,7 +615,7 @@ public class TestDFSPermission {
   /* A random permission generator that guarantees that each permission
    * value is generated only once.
    */
-  static class PermissionGenerator {
+  static private class PermissionGenerator {
     private final Random r;
     private final short[] permissions = new short[MAX_PERMISSION + 1];
     private int numLeft = MAX_PERMISSION + 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/8] hadoop git commit: Revert "HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem. Contributed by Xiaobing Zhou"

Posted by wa...@apache.org.
Revert "HDFS-10390. Implement asynchronous setAcl/getAclStatus for DistributedFileSystem.  Contributed by Xiaobing Zhou"

This reverts commit 56745a0cf4cd17a348225bc28a16e9280c9db4a5.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/87ea0784
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/87ea0784
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/87ea0784

Branch: refs/heads/branch-2
Commit: 87ea0784eb5f2f39f89f951c119c0fe7c5d6786a
Parents: 8e2245d
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:48 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:48 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  59 ----
 .../hadoop/hdfs/DistributedFileSystem.java      |   3 -
 .../ClientNamenodeProtocolTranslatorPB.java     |  30 +-
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 310 -------------------
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  |  15 +-
 .../hdfs/server/namenode/FSAclBaseTest.java     |  12 +-
 6 files changed, 18 insertions(+), 411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index b507fa5..1f60df2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,16 +19,12 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
 import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.hadoop.util.concurrent.AsyncGetFuture;
@@ -89,7 +85,6 @@ public class AsyncDistributedFileSystem {
   public Future<Void> rename(Path src, Path dst,
       final Options.Rename... options) throws IOException {
     dfs.getFsStatistics().incrementWriteOps(1);
-    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME);
 
     final Path absSrc = dfs.fixRelativePart(src);
     final Path absDst = dfs.fixRelativePart(dst);
@@ -118,7 +113,6 @@ public class AsyncDistributedFileSystem {
   public Future<Void> setPermission(Path p, final FsPermission permission)
       throws IOException {
     dfs.getFsStatistics().incrementWriteOps(1);
-    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION);
     final Path absPath = dfs.fixRelativePart(p);
     final boolean isAsync = Client.isAsynchronousMode();
     Client.setAsynchronousMode(true);
@@ -150,7 +144,6 @@ public class AsyncDistributedFileSystem {
     }
 
     dfs.getFsStatistics().incrementWriteOps(1);
-    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER);
     final Path absPath = dfs.fixRelativePart(p);
     final boolean isAsync = Client.isAsynchronousMode();
     Client.setAsynchronousMode(true);
@@ -161,56 +154,4 @@ public class AsyncDistributedFileSystem {
       Client.setAsynchronousMode(isAsync);
     }
   }
-
-  /**
-   * Fully replaces ACL of files and directories, discarding all existing
-   * entries.
-   *
-   * @param p
-   *          Path to modify
-   * @param aclSpec
-   *          List<AclEntry> describing modifications, must include entries for
-   *          user, group, and others for compatibility with permission bits.
-   * @throws IOException
-   *           if an ACL could not be modified
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<Void> setAcl(Path p, final List<AclEntry> aclSpec)
-      throws IOException {
-    dfs.getFsStatistics().incrementWriteOps(1);
-    dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL);
-    final Path absPath = dfs.fixRelativePart(p);
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec);
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
-
-  /**
-   * Gets the ACL of a file or directory.
-   *
-   * @param p
-   *          Path to get
-   * @return AclStatus describing the ACL of the file or directory
-   * @throws IOException
-   *           if an ACL could not be read
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<AclStatus> getAclStatus(Path p) throws IOException {
-    final Path absPath = dfs.fixRelativePart(p);
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().getAclStatus(dfs.getPathName(absPath));
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index eebacb5..7fc767f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -2525,7 +2525,4 @@ public class DistributedFileSystem extends FileSystem {
     return statistics;
   }
 
-  DFSOpsCountStatistics getDFSOpsCountStatistics() {
-    return storageStatistics;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 3f10c98..796aa29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto;
@@ -163,7 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.*;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
@@ -1336,12 +1335,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec))
         .build();
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.setAcl(null, req);
-        setAsyncReturnValue();
-      } else {
-        rpcProxy.setAcl(null, req);
-      }
+      rpcProxy.setAcl(null, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -1352,25 +1346,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder()
         .setSrc(src).build();
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.getAclStatus(null, req);
-        final AsyncGet<Message, Exception> asyncReturnMessage
-            = ProtobufRpcEngine.getAsyncReturnMessage();
-        final AsyncGet<AclStatus, Exception> asyncGet =
-            new AsyncGet<AclStatus, Exception>() {
-              @Override
-              public AclStatus get(long timeout, TimeUnit unit)
-                  throws Exception {
-                return PBHelperClient
-                    .convert((GetAclStatusResponseProto) asyncReturnMessage
-                        .get(timeout, unit));
-              }
-            };
-        ASYNC_RETURN_VALUE.set(asyncGet);
-        return null;
-      } else {
-        return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
-      }
+      return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
deleted file mode 100644
index 67262dd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
-import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
-import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
-import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
-import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
-import static org.apache.hadoop.fs.permission.AclEntryType.USER;
-import static org.apache.hadoop.fs.permission.FsAction.ALL;
-import static org.apache.hadoop.fs.permission.FsAction.NONE;
-import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
-import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
-import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Unit tests for asynchronous distributed filesystem.
- * */
-public class TestAsyncDFS {
-  public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
-  private static final int NUM_TESTS = 1000;
-  private static final int NUM_NN_HANDLER = 10;
-  private static final int ASYNC_CALL_LIMIT = 100;
-
-  private Configuration conf;
-  private MiniDFSCluster cluster;
-  private FileSystem fs;
-
-  @Before
-  public void setup() throws IOException {
-    conf = new HdfsConfiguration();
-    // explicitly turn on acl
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    // explicitly turn on ACL
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    // set the limit of max async calls
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        ASYNC_CALL_LIMIT);
-    // set server handlers
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-    fs = FileSystem.get(conf);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  static class AclQueueEntry {
-    private final Object future;
-    private final Path path;
-    private final Boolean isSetAcl;
-
-    AclQueueEntry(final Object future, final Path path,
-        final Boolean isSetAcl) {
-      this.future = future;
-      this.path = path;
-      this.isSetAcl = isSetAcl;
-    }
-
-    public final Object getFuture() {
-      return future;
-    }
-
-    public final Path getPath() {
-      return path;
-    }
-
-    public final Boolean isSetAcl() {
-      return this.isSetAcl;
-    }
-  }
-
-  @Test(timeout=60000)
-  public void testBatchAsyncAcl() throws Exception {
-    final String basePath = "testBatchAsyncAcl";
-    final Path parent = new Path(String.format("/test/%s/", basePath));
-
-    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
-        .getAsyncDistributedFileSystem();
-
-    // prepare test
-    int count = NUM_TESTS;
-    final Path[] paths = new Path[count];
-    for (int i = 0; i < count; i++) {
-      paths[i] = new Path(parent, "acl" + i);
-      FileSystem.mkdirs(fs, paths[i],
-          FsPermission.createImmutable((short) 0750));
-      assertTrue(fs.exists(paths[i]));
-      assertTrue(fs.getFileStatus(paths[i]).isDirectory());
-    }
-
-    final List<AclEntry> aclSpec = getAclSpec();
-    final AclEntry[] expectedAclSpec = getExpectedAclSpec();
-    Map<Integer, Future<Void>> setAclRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<AclStatus>> getAclRetFutures =
-        new HashMap<Integer, Future<AclStatus>>();
-    int start = 0, end = 0;
-    try {
-      // test setAcl
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
-            setAclRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForAclReturnValues(setAclRetFutures, start, end);
-          }
-        }
-      }
-      waitForAclReturnValues(setAclRetFutures, end, count);
-
-      // test getAclStatus
-      start = 0;
-      end = 0;
-      for (int i = 0; i < count; i++) {
-        for (;;) {
-          try {
-            Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
-            getAclRetFutures.put(i, retFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            start = end;
-            end = i;
-            waitForAclReturnValues(getAclRetFutures, start, end, paths,
-                expectedAclSpec);
-          }
-        }
-      }
-      waitForAclReturnValues(getAclRetFutures, end, count, paths,
-          expectedAclSpec);
-    } catch (Exception e) {
-      throw e;
-    }
-  }
-
-  private void waitForAclReturnValues(
-      final Map<Integer, Future<Void>> aclRetFutures, final int start,
-      final int end) throws InterruptedException, ExecutionException {
-    for (int i = start; i < end; i++) {
-      aclRetFutures.get(i).get();
-    }
-  }
-
-  private void waitForAclReturnValues(
-      final Map<Integer, Future<AclStatus>> aclRetFutures, final int start,
-      final int end, final Path[] paths, final AclEntry[] expectedAclSpec)
-      throws InterruptedException, ExecutionException, IOException {
-    for (int i = start; i < end; i++) {
-      AclStatus aclStatus = aclRetFutures.get(i).get();
-      verifyGetAcl(aclStatus, expectedAclSpec, paths[i]);
-    }
-  }
-
-  private void verifyGetAcl(final AclStatus aclStatus,
-      final AclEntry[] expectedAclSpec, final Path path) throws IOException {
-    if (aclStatus == null) {
-      return;
-    }
-
-    // verify permission and acl
-    AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]);
-    assertArrayEquals(expectedAclSpec, returned);
-    assertPermission(path, (short) 010770);
-    FSAclBaseTest.assertAclFeature(cluster, path, true);
-  }
-
-  private List<AclEntry> getAclSpec() {
-    return Lists.newArrayList(
-        aclEntry(ACCESS, USER, ALL),
-        aclEntry(ACCESS, USER, "foo", ALL),
-        aclEntry(ACCESS, GROUP, READ_EXECUTE),
-        aclEntry(ACCESS, OTHER, NONE),
-        aclEntry(DEFAULT, USER, "foo", ALL));
-  }
-
-  private AclEntry[] getExpectedAclSpec() {
-    return new AclEntry[] {
-        aclEntry(ACCESS, USER, "foo", ALL),
-        aclEntry(ACCESS, GROUP, READ_EXECUTE),
-        aclEntry(DEFAULT, USER, ALL),
-        aclEntry(DEFAULT, USER, "foo", ALL),
-        aclEntry(DEFAULT, GROUP, READ_EXECUTE),
-        aclEntry(DEFAULT, MASK, ALL),
-        aclEntry(DEFAULT, OTHER, NONE) };
-  }
-
-  private void assertPermission(final Path pathToCheck, final short perm)
-      throws IOException {
-    AclTestHelpers.assertPermission(fs, pathToCheck, perm);
-  }
-
-  @Test(timeout=60000)
-  public void testAsyncAPIWithException() throws Exception {
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    UserGroupInformation ugi1;
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
-    // Initiate all four users
-    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
-        group1, group2 });
-
-    final Path parent = new Path("/test/async_api_exception/");
-    final Path aclDir = new Path(parent, "aclDir");
-    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
-
-    AsyncDistributedFileSystem adfs = ugi1
-        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
-          @Override
-          public AsyncDistributedFileSystem run() throws Exception {
-            return cluster.getFileSystem().getAsyncDistributedFileSystem();
-          }
-        });
-
-    Future<Void> retFuture;
-    // test setAcl
-    try {
-      retFuture = adfs.setAcl(aclDir,
-          Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
-      retFuture.get();
-      fail("setAcl should fail with permission denied");
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, aclDir, user1);
-    }
-
-    // test getAclStatus
-    try {
-      Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
-      aclRetFuture.get();
-      fail("getAclStatus should fail with permission denied");
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, aclDir, user1);
-    }
-  }
-
-  public static void checkPermissionDenied(final Exception e, final Path dir,
-      final String user) {
-    assertTrue(e.getCause() instanceof ExecutionException);
-    assertTrue("Permission denied messages must carry AccessControlException",
-        e.getMessage().contains("AccessControlException"));
-    assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(user));
-    assertTrue("Permission denied messages must carry the name of the path",
-        e.getMessage().contains(dir.getName()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 03c8151..7539fbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -520,7 +520,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       retFuture.get();
     } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the path parent", e
           .getMessage().contains(src.getParent().toUri().getPath()));
     }
@@ -530,7 +530,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.setPermission(src, fsPerm);
       retFuture.get();
     } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the name of the path",
           e.getMessage().contains(src.getName()));
     }
@@ -539,7 +539,7 @@ public class TestAsyncDFSRename {
       retFuture = adfs.setOwner(src, "user1", "group2");
       retFuture.get();
     } catch (ExecutionException e) {
-      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      checkPermissionDenied(e, src, user1);
       assertTrue("Permission denied messages must carry the name of the path",
           e.getMessage().contains(src.getName()));
     } finally {
@@ -551,4 +551,13 @@ public class TestAsyncDFSRename {
       }
     }
   }
+
+  private void checkPermissionDenied(final Exception e, final Path dir,
+      final String user) {
+    assertTrue(e.getCause() instanceof ExecutionException);
+    assertTrue("Permission denied messages must carry AccessControlException",
+        e.getMessage().contains("AccessControlException"));
+    assertTrue("Permission denied messages must carry the username", e
+        .getMessage().contains(user));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87ea0784/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
index 52e638e..f481bc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java
@@ -1637,23 +1637,17 @@ public abstract class FSAclBaseTest {
     assertAclFeature(path, expectAclFeature);
   }
 
-  private static void assertAclFeature(Path pathToCheck,
-      boolean expectAclFeature) throws IOException {
-    assertAclFeature(cluster, pathToCheck, expectAclFeature);
-  }
-
   /**
    * Asserts whether or not the inode for a specific path has an AclFeature.
    *
-   * @param miniCluster the cluster into which the path resides
    * @param pathToCheck Path inode to check
    * @param expectAclFeature boolean true if an AclFeature must be present,
    *   false if an AclFeature must not be present
    * @throws IOException thrown if there is an I/O error
    */
-  public static void assertAclFeature(final MiniDFSCluster miniCluster,
-      Path pathToCheck, boolean expectAclFeature) throws IOException {
-    AclFeature aclFeature = getAclFeature(pathToCheck, miniCluster);
+  private static void assertAclFeature(Path pathToCheck,
+      boolean expectAclFeature) throws IOException {
+    AclFeature aclFeature = getAclFeature(pathToCheck, cluster);
     if (expectAclFeature) {
       assertNotNull(aclFeature);
       // Intentionally capturing a reference to the entries, not using nested


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[7/8] hadoop git commit: Revert "HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou"

Posted by wa...@apache.org.
Revert "HADOOP-12957. Limit the number of outstanding async calls.  Contributed by Xiaobing Zhou"

This reverts commit 7e0824faa9970de608fabd8101bdb2652db4362b.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7d4c16ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7d4c16ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7d4c16ba

Branch: refs/heads/branch-2
Commit: 7d4c16bae9b69a722807b3ca200f97bb8cc95f61
Parents: 2529cab
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:49 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:49 2016 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |   3 -
 .../ipc/AsyncCallLimitExceededException.java    |  36 ---
 .../main/java/org/apache/hadoop/ipc/Client.java |  66 +----
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 199 ++--------------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  12 +-
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 238 ++++++-------------
 6 files changed, 109 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 7f510bd..e706104 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -296,9 +296,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
     4*60*60; // 4 hours
   
-  public static final String  IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
-      "ipc.client.async.calls.max";
-  public static final int     IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
   public static final String  IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
deleted file mode 100644
index db97b6c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import java.io.IOException;
-
-/**
- * Signals that an AsyncCallLimitExceededException has occurred. This class is
- * used to make application code using async RPC aware that limit of max async
- * calls is reached, application code need to retrieve results from response of
- * established async calls to avoid buffer overflow in order for follow-on async
- * calls going correctly.
- */
-public class AsyncCallLimitExceededException extends IOException {
-  private static final long serialVersionUID = 1L;
-
-  public AsyncCallLimitExceededException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index afad066..4ff1ff0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,11 +159,9 @@ public class Client implements AutoCloseable {
 
   private final boolean fallbackAllowed;
   private final byte[] clientId;
-  private final int maxAsyncCalls;
-  private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
   
   final static int CONNECTION_CONTEXT_CALL_ID = -3;
-
+  
   /**
    * Executor on which IPC calls' parameters are sent.
    * Deferring the sending of parameters to a separate
@@ -1292,9 +1290,6 @@ public class Client implements AutoCloseable {
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
-    this.maxAsyncCalls = conf.getInt(
-        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
   }
 
   /**
@@ -1361,20 +1356,6 @@ public class Client implements AutoCloseable {
       fallbackToSimpleAuth);
   }
 
-  private void checkAsyncCall() throws IOException {
-    if (isAsynchronousMode()) {
-      if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
-        asyncCallCounter.decrementAndGet();
-        String errMsg = String.format(
-            "Exceeded limit of max asynchronous calls: %d, " +
-            "please configure %s to adjust it.",
-            maxAsyncCalls,
-            CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
-        throw new AsyncCallLimitExceededException(errMsg);
-      }
-    }
-  }
-
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1395,38 +1376,24 @@ public class Client implements AutoCloseable {
     final Call call = createCall(rpcKind, rpcRequest);
     final Connection connection = getConnection(remoteId, call, serviceClass,
         fallbackToSimpleAuth);
-
     try {
-      checkAsyncCall();
-      try {
-        connection.sendRpcRequest(call);                 // send the rpc request
-      } catch (RejectedExecutionException e) {
-        throw new IOException("connection has been closed", e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("interrupted waiting to send rpc request to server", e);
-        throw new IOException(e);
-      }
-    } catch(Exception e) {
-      if (isAsynchronousMode()) {
-        releaseAsyncCall();
-      }
-      throw e;
+      connection.sendRpcRequest(call);                 // send the rpc request
+    } catch (RejectedExecutionException e) {
+      throw new IOException("connection has been closed", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("interrupted waiting to send rpc request to server", e);
+      throw new IOException(e);
     }
 
     if (isAsynchronousMode()) {
       Future<Writable> returnFuture = new AbstractFuture<Writable>() {
-        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
         public Writable get() throws InterruptedException, ExecutionException {
-          if (callled.compareAndSet(false, true)) {
-            try {
-              set(getRpcResponse(call, connection));
-            } catch (IOException ie) {
-              setException(ie);
-            } finally {
-              releaseAsyncCall();
-            }
+          try {
+            set(getRpcResponse(call, connection));
+          } catch (IOException ie) {
+            setException(ie);
           }
           return super.get();
         }
@@ -1462,15 +1429,6 @@ public class Client implements AutoCloseable {
     asynchronousMode.set(async);
   }
 
-  private void releaseAsyncCall() {
-    asyncCallCounter.decrementAndGet();
-  }
-
-  @VisibleForTesting
-  int getAsyncCallCount() {
-    return asyncCallCounter.get();
-  }
-
   private Writable getRpcResponse(final Call call, final Connection connection)
       throws IOException {
     synchronized (call) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..6cf75c7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -35,7 +34,6 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -56,13 +54,12 @@ public class TestAsyncIPC {
   @Before
   public void setupConf() {
     conf = new Configuration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
     Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
     // set asynchronous mode for main thread
     Client.setAsynchronousMode(true);
   }
 
-  static class AsyncCaller extends Thread {
+  protected static class SerialCaller extends Thread {
     private Client client;
     private InetSocketAddress server;
     private int count;
@@ -71,11 +68,11 @@ public class TestAsyncIPC {
         new HashMap<Integer, Future<LongWritable>>();
     Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
 
-    public AsyncCaller(Client client, InetSocketAddress server, int count) {
+    public SerialCaller(Client client, InetSocketAddress server, int count) {
       this.client = client;
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since AsyncCaller extends Thread
+      // set asynchronous mode, since SerialCaller extends Thread
       Client.setAsynchronousMode(true);
     }
 
@@ -110,111 +107,14 @@ public class TestAsyncIPC {
     }
   }
 
-  static class AsyncLimitlCaller extends Thread {
-    private Client client;
-    private InetSocketAddress server;
-    private int count;
-    private boolean failed;
-    Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
-    Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
-    int start = 0, end = 0;
-
-    int getStart() {
-      return start;
-    }
-
-    int getEnd() {
-      return end;
-    }
-
-    int getCount() {
-      return count;
-    }
-
-    public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
-      this(0, client, server, count);
-    }
-
-    final int callerId;
-
-    public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
-        int count) {
-      this.client = client;
-      this.server = server;
-      this.count = count;
-      // set asynchronous mode, since AsyncLimitlCaller extends Thread
-      Client.setAsynchronousMode(true);
-      this.callerId = callerId;
-    }
-
-    @Override
-    public void run() {
-      // in case Thread#Start is called, which will spawn new thread
-      Client.setAsynchronousMode(true);
-      for (int i = 0; i < count; i++) {
-        try {
-          final long param = TestIPC.RANDOM.nextLong();
-          runCall(i, param);
-        } catch (Exception e) {
-          LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
-              StringUtils.stringifyException(e)));
-          failed = true;
-        }
-      }
-    }
-
-    private void runCall(final int idx, final long param)
-        throws InterruptedException, ExecutionException, IOException {
-      for (;;) {
-        try {
-          doCall(idx, param);
-          return;
-        } catch (AsyncCallLimitExceededException e) {
-          /**
-           * reached limit of async calls, fetch results of finished async calls
-           * to let follow-on calls go
-           */
-          start = end;
-          end = idx;
-          waitForReturnValues(start, end);
-        }
-      }
-    }
-
-    private void doCall(final int idx, final long param) throws IOException {
-      TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
-      returnFutures.put(idx, returnFuture);
-      expectedValues.put(idx, param);
-    }
-
-    private void waitForReturnValues(final int start, final int end)
-        throws InterruptedException, ExecutionException {
-      for (int i = start; i < end; i++) {
-        LongWritable value = returnFutures.get(i).get();
-        if (expectedValues.get(i) != value.get()) {
-          LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
-          failed = true;
-          break;
-        }
-      }
-    }
-  }
-
-  @Test(timeout = 60000)
-  public void testAsyncCall() throws IOException, InterruptedException,
+  @Test
+  public void testSerial() throws IOException, InterruptedException,
       ExecutionException {
-    internalTestAsyncCall(3, false, 2, 5, 100);
-    internalTestAsyncCall(3, true, 2, 5, 10);
+    internalTestSerial(3, false, 2, 5, 100);
+    internalTestSerial(3, true, 2, 5, 10);
   }
 
-  @Test(timeout = 60000)
-  public void testAsyncCallLimit() throws IOException,
-      InterruptedException, ExecutionException {
-    internalTestAsyncCallLimit(100, false, 5, 10, 500);
-  }
-
-  public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
+  public void internalTestSerial(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
     Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -226,9 +126,9 @@ public class TestAsyncIPC {
       clients[i] = new Client(LongWritable.class, conf);
     }
 
-    AsyncCaller[] callers = new AsyncCaller[callerCount];
+    SerialCaller[] callers = new SerialCaller[callerCount];
     for (int i = 0; i < callerCount; i++) {
-      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
+      callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
       callers[i].start();
     }
     for (int i = 0; i < callerCount; i++) {
@@ -244,75 +144,6 @@ public class TestAsyncIPC {
     server.stop();
   }
 
-  @Test(timeout = 60000)
-  public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
-      InterruptedException, ExecutionException {
-    int handlerCount = 10, callCount = 100;
-    Server server = new TestIPC.TestServer(handlerCount, false, conf);
-    InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
-    final Client client = new Client(LongWritable.class, conf);
-
-    int asyncCallCount = client.getAsyncCallCount();
-
-    try {
-      AsyncCaller caller = new AsyncCaller(client, addr, callCount);
-      caller.run();
-
-      caller.waitForReturnValues();
-      String msg = String.format(
-          "First time, expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
-
-      caller.waitForReturnValues();
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-      msg = String.format("Second time, expected not failed for caller: %s.",
-          caller);
-      assertFalse(msg, caller.failed);
-
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-    } finally {
-      client.stop();
-      server.stop();
-    }
-  }
-
-  public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
-      int clientCount, int callerCount, int callCount) throws IOException,
-      InterruptedException, ExecutionException {
-    Configuration conf = new Configuration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
-    Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
-
-    Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
-    InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
-
-    Client[] clients = new Client[clientCount];
-    for (int i = 0; i < clientCount; i++) {
-      clients[i] = new Client(LongWritable.class, conf);
-    }
-
-    AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
-    for (int i = 0; i < callerCount; i++) {
-      callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
-          callCount);
-      callers[i].start();
-    }
-    for (int i = 0; i < callerCount; i++) {
-      callers[i].join();
-      callers[i].waitForReturnValues(callers[i].getStart(),
-          callers[i].getCount());
-      String msg = String.format("Expected not failed for caller-%d: %s.", i,
-          callers[i]);
-      assertFalse(msg, callers[i].failed);
-    }
-    for (int i = 0; i < clientCount; i++) {
-      clients[i].stop();
-    }
-    server.stop();
-  }
-
   /**
    * Test if (1) the rpc server uses the call id/retry provided by the rpc
    * client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -365,7 +196,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 4);
+      final SerialCaller caller = new SerialCaller(client, addr, 4);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -404,7 +235,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -441,7 +272,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
       caller.run();
       caller.waitForReturnValues();
       String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -482,9 +313,9 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      AsyncCaller[] callers = new AsyncCaller[callerCount];
+      SerialCaller[] callers = new SerialCaller[callerCount];
       for (int i = 0; i < callerCount; ++i) {
-        callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
+        callers[i] = new SerialCaller(client, addr, perCallerCallCount);
         callers[i].start();
       }
       for (int i = 0; i < callerCount; ++i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 356ae3f..37899aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
@@ -51,14 +50,11 @@ public class AsyncDistributedFileSystem {
     final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
         .getReturnValueCallback();
     Future<T> returnFuture = new AbstractFuture<T>() {
-      private final AtomicBoolean called = new AtomicBoolean(false);
       public T get() throws InterruptedException, ExecutionException {
-        if (called.compareAndSet(false, true)) {
-          try {
-            set(returnValueCallback.call());
-          } catch (Exception e) {
-            setException(e);
-          }
+        try {
+          set(returnValueCallback.call());
+        } catch (Exception e) {
+          setException(e);
         }
         return super.get();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d4c16ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index d129299..9322e1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -30,25 +31,80 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
+  final Path asyncRenameDir = new Path("/test/async_rename/");
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
+  final private static Configuration CONF = new HdfsConfiguration();
+
+  final private static String GROUP1_NAME = "group1";
+  final private static String GROUP2_NAME = "group2";
+  final private static String USER1_NAME = "user1";
+  private static final UserGroupInformation USER1;
+
+  private MiniDFSCluster gCluster;
+
+  static {
+    // explicitly turn on permission checking
+    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
+    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
+
+    // Initiate all four users
+    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
+        GROUP1_NAME, GROUP2_NAME });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
+    gCluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (gCluster != null) {
+      gCluster.shutdown();
+      gCluster = null;
+    }
+  }
+
+  static int countLease(MiniDFSCluster cluster) {
+    return TestDFSRename.countLease(cluster);
+  }
+
+  void list(DistributedFileSystem dfs, String name) throws IOException {
+    FileSystem.LOG.info("\n\n" + name);
+    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
+      FileSystem.LOG.info("" + s.getPath());
+    }
+  }
+
+  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
+    DataOutputStream a_out = dfs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
 
   /**
    * Check the blocks of dst file are cleaned after rename with overwrite
    * Restart NN to check the rename successfully
    */
-  @Test(timeout = 60000)
+  @Test
   public void testAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
@@ -113,134 +169,38 @@ public class TestAsyncDFSRename {
     }
   }
 
-  @Test(timeout = 60000)
-  public void testCallGetReturnValueMultipleTimes() throws Exception {
+  @Test
+  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
     final short replFactor = 2;
     final long blockSize = 512;
     final Path renameDir = new Path(
-        "/test/testCallGetReturnValueMultipleTimes/");
-    final Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    final int count = 100;
-    long fileLen = blockSize * 3;
-    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-    assertTrue(dfs.mkdirs(renameDir));
-
-    try {
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
-      }
-
-      for (int i = 0; i < 5; i++) {
-        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
-            renameDir, dfs);
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  private void verifyCallGetReturnValueMultipleTimes(
-      Map<Integer, Future<Void>> returnFutures, int count,
-      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
-      throws InterruptedException, ExecutionException, IOException {
-    // wait for completing the calls
-    for (int i = 0; i < count; i++) {
-      returnFutures.get(i).get();
-    }
-
-    // Restart NN and check the rename successfully
-    cluster.restartNameNodes();
-
-    // very the src dir should not exist, dst should
-    for (int i = 0; i < count; i++) {
-      Path src = new Path(renameDir, "src" + i);
-      Path dst = new Path(renameDir, "dst" + i);
-      assertFalse(dfs.exists(src));
-      assertTrue(dfs.exists(dst));
-    }
-  }
-
-  @Test(timeout = 120000)
-  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(100,
-        "testAggressiveConcurrentAsyncRenameWithOverwrite");
-  }
-
-  @Test(timeout = 60000)
-  public void testConservativeConcurrentAsyncRenameWithOverwrite()
-      throws Exception {
-    internalTestConcurrentAsyncRenameWithOverwrite(10000,
-        "testConservativeConcurrentAsyncRenameWithOverwrite");
-  }
-
-  private void internalTestConcurrentAsyncRenameWithOverwrite(
-      final int asyncCallLimit, final String basePath) throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    final Path renameDir = new Path(String.format("/test/%s/", basePath));
+        "/test/concurrent_reanme_with_overwrite_dir/");
     Configuration conf = new HdfsConfiguration();
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        asyncCallLimit);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
         .build();
     cluster.waitActive();
     DistributedFileSystem dfs = cluster.getFileSystem();
     AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
     int count = 1000;
-    long fileLen = blockSize * 3;
-    int start = 0, end = 0;
-    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-    assertTrue(dfs.mkdirs(renameDir));
 
     try {
+      long fileLen = blockSize * 3;
+      assertTrue(dfs.mkdirs(renameDir));
+
+      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
       // concurrently invoking many rename
       for (int i = 0; i < count; i++) {
         Path src = new Path(renameDir, "src" + i);
         Path dst = new Path(renameDir, "dst" + i);
         DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
         DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        for (;;) {
-          try {
-            LOG.info("rename #" + i);
-            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-            returnFutures.put(i, returnFuture);
-            break;
-          } catch (AsyncCallLimitExceededException e) {
-            /**
-             * reached limit of async calls, fetch results of finished async
-             * calls to let follow-on calls go
-             */
-            LOG.error(e);
-            start = end;
-            end = i;
-            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
-            waitForReturnValues(returnFutures, start, end);
-          }
-        }
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
       }
 
       // wait for completing the calls
-      for (int i = start; i < count; i++) {
+      for (int i = 0; i < count; i++) {
         returnFutures.get(i).get();
       }
 
@@ -255,60 +215,26 @@ public class TestAsyncDFSRename {
         assertTrue(dfs.exists(dst));
       }
     } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
+      dfs.delete(renameDir, true);
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
 
-  private void waitForReturnValues(
-      final Map<Integer, Future<Void>> returnFutures, final int start,
-      final int end) throws InterruptedException, ExecutionException {
-    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
-    for (int i = start; i < end; i++) {
-      LOG.info("calling Future#get #" + i);
-      returnFutures.get(i).get();
-    }
-  }
-
-  @Test(timeout = 60000)
+  @Test
   public void testAsyncRenameWithException() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-    UserGroupInformation ugi1;
-
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(user1, new String[] { group1, group2 });
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
-
-    // Initiate all four users
-    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
-        group1, group2 });
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3).build();
-    cluster.waitActive();
-
-    FileSystem rootFs = FileSystem.get(conf);
+    FileSystem rootFs = FileSystem.get(CONF);
     final Path renameDir = new Path("/test/async_rename_exception/");
     final Path src = new Path(renameDir, "src");
     final Path dst = new Path(renameDir, "dst");
     rootFs.mkdirs(src);
 
-    AsyncDistributedFileSystem adfs = ugi1
+    AsyncDistributedFileSystem adfs = USER1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
-            return cluster.getFileSystem().getAsyncDistributedFileSystem();
+            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
           }
         });
 
@@ -316,24 +242,16 @@ public class TestAsyncDFSRename {
       Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
       returnFuture.get();
     } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-    } finally {
-      if (rootFs != null) {
-        rootFs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      checkPermissionDenied(e, src);
     }
   }
 
-  private void checkPermissionDenied(final Exception e, final Path dir,
-      final String user) {
+  private void checkPermissionDenied(final Exception e, final Path dir) {
     assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(user));
+        .getMessage().contains(USER1_NAME));
     assertTrue("Permission denied messages must carry the path parent", e
         .getMessage().contains(dir.getParent().toUri().getPath()));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[8/8] hadoop git commit: Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou"

Posted by wa...@apache.org.
Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem.  Contributed by Xiaobing Zhou"

This reverts commit d316f44059c4417ed30064470a590991dd160857.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c11c2ee6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c11c2ee6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c11c2ee6

Branch: refs/heads/branch-2
Commit: c11c2ee649a186d211cea0b85cb33dab0b3f96db
Parents: 7d4c16b
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:11:12 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:11:12 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   |   1 +
 .../main/java/org/apache/hadoop/ipc/Client.java |  11 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  34 +--
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |   2 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 --------
 .../hadoop/hdfs/DistributedFileSystem.java      |  22 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  41 +--
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 258 -------------------
 8 files changed, 18 insertions(+), 461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 740072d..ab3eac0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1252,6 +1252,7 @@ public abstract class FileSystem extends Configured implements Closeable {
   /**
    * Renames Path src to Path dst
    * <ul>
+   * <li
    * <li>Fails if src is a file and dst is a directory.
    * <li>Fails if src is a directory and dst is a file.
    * <li>Fails if the parent of dst does not exist or is a file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 4ff1ff0..0ec7acc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,8 +119,7 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>>
-      RETURN_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -131,8 +130,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnRpcResponse() {
-    return (Future<T>) RETURN_RPC_RESPONSE.get();
+  public static <T> Future<T> getReturnValue() {
+    return (Future<T>) returnValue.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1399,7 +1398,7 @@ public class Client implements AutoCloseable {
         }
       };
 
-      RETURN_RPC_RESPONSE.set(returnFuture);
+      returnValue.set(returnFuture);
       return null;
     } else {
       return getRpcResponse(call, connection);
@@ -1413,7 +1412,7 @@ public class Client implements AutoCloseable {
    *          synchronous mode.
    */
   @Unstable
-  public static boolean isAsynchronousMode() {
+  static boolean isAsynchronousMode() {
     return asynchronousMode.get();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 350e041..88e2e2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,9 +26,7 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputOutputStream;
 import org.apache.hadoop.io.Writable;
@@ -70,9 +67,7 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<Callable<?>>
-      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
-
+  
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
         RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -81,12 +76,6 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
-  @SuppressWarnings("unchecked")
-  @Unstable
-  public static <T> Callable<T> getReturnMessageCallback() {
-    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
-  }
-
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
@@ -200,7 +189,7 @@ public class ProtobufRpcEngine implements RpcEngine {
      * the server.
      */
     @Override
-    public Object invoke(Object proxy, final Method method, Object[] args)
+    public Object invoke(Object proxy, Method method, Object[] args)
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
@@ -262,23 +251,6 @@ public class ProtobufRpcEngine implements RpcEngine {
         LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
-      if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
-        Callable<Message> callback = new Callable<Message>() {
-          @Override
-          public Message call() throws Exception {
-            return getReturnMessage(method, frrw.get());
-          }
-        };
-        RETURN_MESSAGE_CALLBACK.set(callback);
-        return null;
-      } else {
-        return getReturnMessage(method, val);
-      }
-    }
-
-    private Message getReturnMessage(final Method method,
-        final RpcResponseWrapper rrw) throws ServiceException {
       Message prototype = null;
       try {
         prototype = getReturnProtoType(method);
@@ -288,7 +260,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(rrw.theResponseRead).build();
+            .mergeFrom(val.theResponseRead).build();
 
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..de4395e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+          Future<LongWritable> returnFuture = Client.getReturnValue();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
deleted file mode 100644
index 37899aa..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.ipc.Client;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-/****************************************************************
- * Implementation of the asynchronous distributed file system.
- * This instance of this class is the way end-user code interacts
- * with a Hadoop DistributedFileSystem in an asynchronous manner.
- *
- *****************************************************************/
-@Unstable
-public class AsyncDistributedFileSystem {
-
-  private final DistributedFileSystem dfs;
-
-  AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
-    this.dfs = dfs;
-  }
-
-  static <T> Future<T> getReturnValue() {
-    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
-        .getReturnValueCallback();
-    Future<T> returnFuture = new AbstractFuture<T>() {
-      public T get() throws InterruptedException, ExecutionException {
-        try {
-          set(returnValueCallback.call());
-        } catch (Exception e) {
-          setException(e);
-        }
-        return super.get();
-      }
-    };
-    return returnFuture;
-  }
-
-  /**
-   * Renames Path src to Path dst
-   * <ul>
-   * <li>Fails if src is a file and dst is a directory.
-   * <li>Fails if src is a directory and dst is a file.
-   * <li>Fails if the parent of dst does not exist or is a file.
-   * </ul>
-   * <p>
-   * If OVERWRITE option is not passed as an argument, rename fails if the dst
-   * already exists.
-   * <p>
-   * If OVERWRITE option is passed as an argument, rename overwrites the dst if
-   * it is a file or an empty directory. Rename fails if dst is a non-empty
-   * directory.
-   * <p>
-   * Note that atomicity of rename is dependent on the file system
-   * implementation. Please refer to the file system documentation for details.
-   * This default implementation is non atomic.
-   *
-   * @param src
-   *          path to be renamed
-   * @param dst
-   *          new path after rename
-   * @throws IOException
-   *           on failure
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<Void> rename(Path src, Path dst,
-      final Options.Rename... options) throws IOException {
-    dfs.getFsStatistics().incrementWriteOps(1);
-
-    final Path absSrc = dfs.fixRelativePart(src);
-    final Path absDst = dfs.fixRelativePart(dst);
-
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
-          options);
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 7fc767f..77eb2ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,7 +31,6 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
@@ -206,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
    * @return path component of {file}
    * @throws IllegalArgumentException if URI does not belong to this DFS
    */
-  String getPathName(Path file) {
+  private String getPathName(Path file) {
     checkPath(file);
     String result = file.toUri().getPath();
     if (!DFSUtilClient.isValidName(result)) {
@@ -2506,23 +2505,4 @@ public class DistributedFileSystem extends FileSystem {
     }
     return ret;
   }
-
-  private final AsyncDistributedFileSystem adfs =
-      new AsyncDistributedFileSystem(this);
-
-  /** @return an {@link AsyncDistributedFileSystem} object. */
-  @Unstable
-  public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
-    return adfs;
-  }
-
-  @Override
-  protected Path fixRelativePart(Path p) {
-    return super.fixRelativePart(p);
-  }
-
-  Statistics getFsStatistics() {
-    return statistics;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 9a52d93..f4e538c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,14 +24,11 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -138,6 +135,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -155,15 +153,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
@@ -175,9 +171,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -189,9 +183,12 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
+    .EncryptionZoneProto;
+
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
  * while translating from the parameter types used in ClientProtocol to the
@@ -202,8 +199,6 @@ import com.google.protobuf.ServiceException;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<Callable<?>>
-      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -236,12 +231,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
-  @SuppressWarnings("unchecked")
-  @Unstable
-  public static <T> Callable<T> getReturnValueCallback() {
-    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
-  }
-
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -477,7 +466,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
     RenameRequestProto req = RenameRequestProto.newBuilder()
         .setSrc(src)
         .setDst(dst).build();
-
     try {
       return rpcProxy.rename(null, req).getResult();
     } catch (ServiceException e) {
@@ -502,22 +490,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         setDst(dst).setOverwriteDest(overwrite).
         build();
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.rename2(null, req);
-
-        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-            .getReturnMessageCallback();
-        Callable<Void> callBack = new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            returnMessageCallback.call();
-            return null;
-          }
-        };
-        RETURN_VALUE_CALLBACK.set(callBack);
-      } else {
-        rpcProxy.rename2(null, req);
-      }
+      rpcProxy.rename2(null, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11c2ee6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
deleted file mode 100644
index 9322e1a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestAsyncDFSRename {
-  final Path asyncRenameDir = new Path("/test/async_rename/");
-  public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  final private static Configuration CONF = new HdfsConfiguration();
-
-  final private static String GROUP1_NAME = "group1";
-  final private static String GROUP2_NAME = "group2";
-  final private static String USER1_NAME = "user1";
-  private static final UserGroupInformation USER1;
-
-  private MiniDFSCluster gCluster;
-
-  static {
-    // explicitly turn on permission checking
-    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
-    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
-    // Initiate all four users
-    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
-        GROUP1_NAME, GROUP2_NAME });
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
-    gCluster.waitActive();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (gCluster != null) {
-      gCluster.shutdown();
-      gCluster = null;
-    }
-  }
-
-  static int countLease(MiniDFSCluster cluster) {
-    return TestDFSRename.countLease(cluster);
-  }
-
-  void list(DistributedFileSystem dfs, String name) throws IOException {
-    FileSystem.LOG.info("\n\n" + name);
-    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
-      FileSystem.LOG.info("" + s.getPath());
-    }
-  }
-
-  static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
-    DataOutputStream a_out = dfs.create(f);
-    a_out.writeBytes("something");
-    a_out.close();
-  }
-
-  /**
-   * Check the blocks of dst file are cleaned after rename with overwrite
-   * Restart NN to check the rename successfully
-   */
-  @Test
-  public void testAsyncRenameWithOverwrite() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
-        replFactor).build();
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
-    try {
-
-      long fileLen = blockSize * 3;
-      String src = "/foo/src";
-      String dst = "/foo/dst";
-      String src2 = "/foo/src2";
-      String dst2 = "/foo/dst2";
-      Path srcPath = new Path(src);
-      Path dstPath = new Path(dst);
-      Path srcPath2 = new Path(src2);
-      Path dstPath2 = new Path(dst2);
-
-      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
-      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst, 0, fileLen);
-      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst2, 0, fileLen);
-      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
-          .getBlockManager();
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-
-      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
-      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
-      retVal1.get();
-      retVal2.get();
-
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
-
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-      assertFalse(dfs.exists(srcPath));
-      assertTrue(dfs.exists(dstPath));
-      assertFalse(dfs.exists(srcPath2));
-      assertTrue(dfs.exists(dstPath2));
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    final Path renameDir = new Path(
-        "/test/concurrent_reanme_with_overwrite_dir/");
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
-        .build();
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    int count = 1000;
-
-    try {
-      long fileLen = blockSize * 3;
-      assertTrue(dfs.mkdirs(renameDir));
-
-      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
-
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
-      }
-
-      // wait for completing the calls
-      for (int i = 0; i < count; i++) {
-        returnFutures.get(i).get();
-      }
-
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-
-      // very the src dir should not exist, dst should
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        assertFalse(dfs.exists(src));
-        assertTrue(dfs.exists(dst));
-      }
-    } finally {
-      dfs.delete(renameDir, true);
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testAsyncRenameWithException() throws Exception {
-    FileSystem rootFs = FileSystem.get(CONF);
-    final Path renameDir = new Path("/test/async_rename_exception/");
-    final Path src = new Path(renameDir, "src");
-    final Path dst = new Path(renameDir, "dst");
-    rootFs.mkdirs(src);
-
-    AsyncDistributedFileSystem adfs = USER1
-        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
-          @Override
-          public AsyncDistributedFileSystem run() throws Exception {
-            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
-          }
-        });
-
-    try {
-      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-      returnFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src);
-    }
-  }
-
-  private void checkPermissionDenied(final Exception e, final Path dir) {
-    assertTrue(e.getCause() instanceof ExecutionException);
-    assertTrue("Permission denied messages must carry AccessControlException",
-        e.getMessage().contains("AccessControlException"));
-    assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(USER1_NAME));
-    assertTrue("Permission denied messages must carry the path parent", e
-        .getMessage().contains(dir.getParent().toUri().getPath()));
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/8] hadoop git commit: Revert "HDFS-10431 Refactor and speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou"

Posted by wa...@apache.org.
Revert "HDFS-10431 Refactor and speedup TestAsyncDFSRename.  Contributed by Xiaobing Zhou"

This reverts commit 72773f8ea896b125882025666084802873c59963.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e2245d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e2245d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e2245d0

Branch: refs/heads/branch-2
Commit: 8e2245d0a4d1ee7630c65162630b9627a5b585ca
Parents: 6f8f40b
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:48 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:48 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    | 233 +-------
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 563 +++++++++++++++----
 2 files changed, 483 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e2245d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index ddcf492..67262dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -29,16 +29,13 @@ import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
 import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -46,21 +43,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -72,28 +63,21 @@ import com.google.common.collect.Lists;
  * */
 public class TestAsyncDFS {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
-  private final short replFactor = 1;
-  private final long blockSize = 512;
-  private long fileLen = blockSize * 3;
-  private final long seed = Time.now();
-  private final Random r = new Random(seed);
-  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
-  private static final int NUM_TESTS = 50;
+  private static final int NUM_TESTS = 1000;
   private static final int NUM_NN_HANDLER = 10;
-  private static final int ASYNC_CALL_LIMIT = 1000;
+  private static final int ASYNC_CALL_LIMIT = 100;
 
   private Configuration conf;
   private MiniDFSCluster cluster;
   private FileSystem fs;
-  private AsyncDistributedFileSystem adfs;
 
   @Before
   public void setup() throws IOException {
     conf = new HdfsConfiguration();
     // explicitly turn on acl
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-    // explicitly turn on permission checking
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    // explicitly turn on ACL
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
     // set the limit of max async calls
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
         ASYNC_CALL_LIMIT);
@@ -102,7 +86,6 @@ public class TestAsyncDFS {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
     fs = FileSystem.get(conf);
-    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
   }
 
   @After
@@ -147,9 +130,13 @@ public class TestAsyncDFS {
     final String basePath = "testBatchAsyncAcl";
     final Path parent = new Path(String.format("/test/%s/", basePath));
 
+    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+        .getAsyncDistributedFileSystem();
+
     // prepare test
-    final Path[] paths = new Path[NUM_TESTS];
-    for (int i = 0; i < NUM_TESTS; i++) {
+    int count = NUM_TESTS;
+    final Path[] paths = new Path[count];
+    for (int i = 0; i < count; i++) {
       paths[i] = new Path(parent, "acl" + i);
       FileSystem.mkdirs(fs, paths[i],
           FsPermission.createImmutable((short) 0750));
@@ -166,7 +153,7 @@ public class TestAsyncDFS {
     int start = 0, end = 0;
     try {
       // test setAcl
-      for (int i = 0; i < NUM_TESTS; i++) {
+      for (int i = 0; i < count; i++) {
         for (;;) {
           try {
             Future<Void> retFuture = adfs.setAcl(paths[i], aclSpec);
@@ -179,12 +166,12 @@ public class TestAsyncDFS {
           }
         }
       }
-      waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS);
+      waitForAclReturnValues(setAclRetFutures, end, count);
 
       // test getAclStatus
       start = 0;
       end = 0;
-      for (int i = 0; i < NUM_TESTS; i++) {
+      for (int i = 0; i < count; i++) {
         for (;;) {
           try {
             Future<AclStatus> retFuture = adfs.getAclStatus(paths[i]);
@@ -198,23 +185,13 @@ public class TestAsyncDFS {
           }
         }
       }
-      waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths,
+      waitForAclReturnValues(getAclRetFutures, end, count, paths,
           expectedAclSpec);
     } catch (Exception e) {
       throw e;
     }
   }
 
-  static void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
-      final int start, final int end)
-      throws InterruptedException, ExecutionException {
-    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
-    for (int i = start; i < end; i++) {
-      LOG.info("calling Future#get #" + i);
-      retFutures.get(i).get();
-    }
-  }
-
   private void waitForAclReturnValues(
       final Map<Integer, Future<Void>> aclRetFutures, final int start,
       final int end) throws InterruptedException, ExecutionException {
@@ -289,12 +266,9 @@ public class TestAsyncDFS {
 
     final Path parent = new Path("/test/async_api_exception/");
     final Path aclDir = new Path(parent, "aclDir");
-    final Path src = new Path(parent, "src");
-    final Path dst = new Path(parent, "dst");
-    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700));
-    fs.mkdirs(src);
+    fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770));
 
-    AsyncDistributedFileSystem adfs1 = ugi1
+    AsyncDistributedFileSystem adfs = ugi1
         .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
           @Override
           public AsyncDistributedFileSystem run() throws Exception {
@@ -303,36 +277,9 @@ public class TestAsyncDFS {
         });
 
     Future<Void> retFuture;
-    // test rename
-    try {
-      retFuture = adfs1.rename(src, dst, Rename.OVERWRITE);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-      assertTrue("Permission denied messages must carry the path parent", e
-          .getMessage().contains(src.getParent().toUri().getPath()));
-    }
-
-    // test setPermission
-    FsPermission fsPerm = new FsPermission(permGenerator.next());
-    try {
-      retFuture = adfs1.setPermission(src, fsPerm);
-      retFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-    }
-
-    // test setOwner
-    try {
-      retFuture = adfs1.setOwner(src, "user1", "group2");
-      retFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src, user1);
-    }
-
     // test setAcl
     try {
-      retFuture = adfs1.setAcl(aclDir,
+      retFuture = adfs.setAcl(aclDir,
           Lists.newArrayList(aclEntry(ACCESS, USER, ALL)));
       retFuture.get();
       fail("setAcl should fail with permission denied");
@@ -342,7 +289,7 @@ public class TestAsyncDFS {
 
     // test getAclStatus
     try {
-      Future<AclStatus> aclRetFuture = adfs1.getAclStatus(aclDir);
+      Future<AclStatus> aclRetFuture = adfs.getAclStatus(aclDir);
       aclRetFuture.get();
       fail("getAclStatus should fail with permission denied");
     } catch (ExecutionException e) {
@@ -360,148 +307,4 @@ public class TestAsyncDFS {
     assertTrue("Permission denied messages must carry the name of the path",
         e.getMessage().contains(dir.getName()));
   }
-
-
-  @Test(timeout = 120000)
-  public void testConcurrentAsyncAPI() throws Exception {
-    String group1 = "group1";
-    String group2 = "group2";
-    String user1 = "user1";
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
-    u2gMap.put(user1, new String[] {group1, group2});
-    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
-
-    // prepare for test
-    final Path parent = new Path(
-        String.format("/test/%s/", "testConcurrentAsyncAPI"));
-    final Path[] srcs = new Path[NUM_TESTS];
-    final Path[] dsts = new Path[NUM_TESTS];
-    short[] permissions = new short[NUM_TESTS];
-    for (int i = 0; i < NUM_TESTS; i++) {
-      srcs[i] = new Path(parent, "src" + i);
-      dsts[i] = new Path(parent, "dst" + i);
-      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
-      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
-      assertTrue(fs.exists(srcs[i]));
-      assertTrue(fs.getFileStatus(srcs[i]).isFile());
-      assertTrue(fs.exists(dsts[i]));
-      assertTrue(fs.getFileStatus(dsts[i]).isFile());
-      permissions[i] = permGenerator.next();
-    }
-
-    Map<Integer, Future<Void>> renameRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> permRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    Map<Integer, Future<Void>> ownerRetFutures =
-        new HashMap<Integer, Future<Void>>();
-    int start = 0, end = 0;
-    // test rename
-    for (int i = 0; i < NUM_TESTS; i++) {
-      for (;;) {
-        try {
-          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
-              Rename.OVERWRITE);
-          renameRetFutures.put(i, returnFuture);
-          break;
-        } catch (AsyncCallLimitExceededException e) {
-          start = end;
-          end = i;
-          waitForReturnValues(renameRetFutures, start, end);
-        }
-      }
-    }
-
-    // wait for completing the calls
-    waitForAclReturnValues(renameRetFutures, end, NUM_TESTS);
-
-    // verify the src should not exist, dst should
-    for (int i = 0; i < NUM_TESTS; i++) {
-      assertFalse(fs.exists(srcs[i]));
-      assertTrue(fs.exists(dsts[i]));
-    }
-
-    // test permissions
-    for (int i = 0; i < NUM_TESTS; i++) {
-      for (;;) {
-        try {
-          Future<Void> retFuture = adfs.setPermission(dsts[i],
-              new FsPermission(permissions[i]));
-          permRetFutures.put(i, retFuture);
-          break;
-        } catch (AsyncCallLimitExceededException e) {
-          start = end;
-          end = i;
-          waitForReturnValues(permRetFutures, start, end);
-        }
-      }
-    }
-    // wait for completing the calls
-    waitForAclReturnValues(permRetFutures, end, NUM_TESTS);
-
-    // verify the permission
-    for (int i = 0; i < NUM_TESTS; i++) {
-      assertTrue(fs.exists(dsts[i]));
-      FsPermission fsPerm = new FsPermission(permissions[i]);
-      checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction());
-    }
-
-    // test setOwner
-    start = 0;
-    end = 0;
-    for (int i = 0; i < NUM_TESTS; i++) {
-      for (;;) {
-        try {
-          Future<Void> retFuture = adfs.setOwner(dsts[i], "user1", "group2");
-          ownerRetFutures.put(i, retFuture);
-          break;
-        } catch (AsyncCallLimitExceededException e) {
-          start = end;
-          end = i;
-          waitForReturnValues(ownerRetFutures, start, end);
-        }
-      }
-    }
-    // wait for completing the calls
-    waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS);
-
-    // verify the owner
-    for (int i = 0; i < NUM_TESTS; i++) {
-      assertTrue(fs.exists(dsts[i]));
-      assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner()));
-      assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
-    }
-  }
-
-  static void checkAccessPermissions(FileStatus stat, FsAction mode)
-      throws IOException {
-    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
-  }
-
-  static void checkAccessPermissions(final UserGroupInformation ugi,
-      FileStatus stat, FsAction mode) throws IOException {
-    FsPermission perm = stat.getPermission();
-    String user = ugi.getShortUserName();
-    List<String> groups = Arrays.asList(ugi.getGroupNames());
-
-    if (user.equals(stat.getOwner())) {
-      if (perm.getUserAction().implies(mode)) {
-        return;
-      }
-    } else if (groups.contains(stat.getGroup())) {
-      if (perm.getGroupAction().implies(mode)) {
-        return;
-      }
-    } else {
-      if (perm.getOtherAction().implies(mode)) {
-        return;
-      }
-    }
-    throw new AccessControlException(String.format(
-        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
-            .getPath(), stat.getOwner(), stat.getGroup(),
-        stat.isDirectory() ? "d" : "-", perm));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e2245d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 8d3e509..03c8151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -31,157 +34,521 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestAsyncDFSRename {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  private final short replFactor = 1;
+  private final long seed = Time.now();
+  private final Random r = new Random(seed);
+  private final PermissionGenerator permGenerator = new PermissionGenerator(r);
+  private final short replFactor = 2;
   private final long blockSize = 512;
   private long fileLen = blockSize * 3;
-  private static final int NUM_TESTS = 50;
-  private static final int NUM_NN_HANDLER = 10;
-  private static final int ASYNC_CALL_LIMIT = 1000;
-
-  private Configuration conf;
-  private MiniDFSCluster cluster;
-  private FileSystem fs;
-  private AsyncDistributedFileSystem adfs;
-
-  @Before
-  public void setup() throws IOException {
-    conf = new HdfsConfiguration();
-    // set the limit of max async calls
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
-        ASYNC_CALL_LIMIT);
-    // set server handlers
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+  /**
+   * Check the blocks of dst file are cleaned after rename with overwrite
+   * Restart NN to check the rename successfully
+   */
+  @Test(timeout = 60000)
+  public void testAsyncRenameWithOverwrite() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        replFactor).build();
     cluster.waitActive();
-    fs = FileSystem.get(conf);
-    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
-  }
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
 
-  @After
-  public void tearDown() throws IOException {
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
+    try {
+      String src = "/foo/src";
+      String dst = "/foo/dst";
+      String src2 = "/foo/src2";
+      String dst2 = "/foo/dst2";
+      Path srcPath = new Path(src);
+      Path dstPath = new Path(dst);
+      Path srcPath2 = new Path(src2);
+      Path dstPath2 = new Path(dst2);
+
+      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
+      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
+
+      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst, 0, fileLen);
+      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), dst2, 0, fileLen);
+      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
+          .getBlockManager();
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) != null);
+
+      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
+      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
+      retVal1.get();
+      retVal2.get();
+
+      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
+          .getLocalBlock()) == null);
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+      assertFalse(dfs.exists(srcPath));
+      assertTrue(dfs.exists(dstPath));
+      assertFalse(dfs.exists(srcPath2));
+      assertTrue(dfs.exists(dstPath2));
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
   @Test(timeout = 60000)
   public void testCallGetReturnValueMultipleTimes() throws Exception {
-    final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/");
-    assertTrue(fs.mkdirs(parent));
+    final Path renameDir = new Path(
+        "/test/testCallGetReturnValueMultipleTimes/");
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    final int count = 100;
+    final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
 
-    // prepare test
-    final Path[] srcs = new Path[NUM_TESTS];
-    final Path[] dsts = new Path[NUM_TESTS];
-    for (int i = 0; i < NUM_TESTS; i++) {
-      srcs[i] = new Path(parent, "src" + i);
-      dsts[i] = new Path(parent, "dst" + i);
-      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
-      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+    assertTrue(dfs.mkdirs(renameDir));
+
+    try {
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+        returnFutures.put(i, returnFuture);
+      }
+
+      for (int i = 0; i < 5; i++) {
+        verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+            renameDir, dfs);
+      }
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
+  }
 
-    // concurrently invoking many rename
-    final Map<Integer, Future<Void>> reFutures =
-        new HashMap<Integer, Future<Void>>();
-    for (int i = 0; i < NUM_TESTS; i++) {
-      Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
-          Rename.OVERWRITE);
-      reFutures.put(i, retFuture);
+  private void verifyCallGetReturnValueMultipleTimes(
+      Map<Integer, Future<Void>> returnFutures, int count,
+      MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+      throws InterruptedException, ExecutionException, IOException {
+    // wait for completing the calls
+    for (int i = 0; i < count; i++) {
+      returnFutures.get(i).get();
     }
 
-    assertEquals(NUM_TESTS, reFutures.size());
+    // Restart NN and check the rename successfully
+    cluster.restartNameNodes();
 
-    for (int i = 0; i < 5; i++) {
-      verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts);
+    // very the src dir should not exist, dst should
+    for (int i = 0; i < count; i++) {
+      Path src = new Path(renameDir, "src" + i);
+      Path dst = new Path(renameDir, "dst" + i);
+      assertFalse(dfs.exists(src));
+      assertTrue(dfs.exists(dst));
     }
   }
 
-  private void verifyCallGetReturnValueMultipleTimes(
-      final Map<Integer, Future<Void>> reFutures, final Path[] srcs,
-      final Path[] dsts)
-      throws InterruptedException, ExecutionException, IOException {
+  @Test
+  public void testConservativeConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(100,
+        "testAggressiveConcurrentAsyncRenameWithOverwrite");
+  }
 
-    // wait for completing the calls
-    waitForReturnValues(reFutures, 0, NUM_TESTS);
+  @Test(timeout = 60000)
+  public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+      throws Exception {
+    internalTestConcurrentAsyncRenameWithOverwrite(10000,
+        "testConservativeConcurrentAsyncRenameWithOverwrite");
+  }
+
+  private void internalTestConcurrentAsyncRenameWithOverwrite(
+      final int asyncCallLimit, final String basePath) throws Exception {
+    final Path renameDir = new Path(String.format("/test/%s/", basePath));
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        asyncCallLimit);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+    int count = 1000;
+    int start = 0, end = 0;
+    Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
+
+    assertTrue(dfs.mkdirs(renameDir));
+
+    try {
+      // concurrently invoking many rename
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+        for (;;) {
+          try {
+            LOG.info("rename #" + i);
+            Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+            returnFutures.put(i, returnFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            /**
+             * reached limit of async calls, fetch results of finished async
+             * calls to let follow-on calls go
+             */
+            LOG.error(e);
+            start = end;
+            end = i;
+            LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+            waitForReturnValues(returnFutures, start, end);
+          }
+        }
+      }
 
-    // verify the src dir should not exist, dst should
-    verifyRenames(srcs, dsts);
+      // wait for completing the calls
+      for (int i = start; i < count; i++) {
+        returnFutures.get(i).get();
+      }
+
+      // Restart NN and check the rename successfully
+      cluster.restartNameNodes();
+
+      // very the src dir should not exist, dst should
+      for (int i = 0; i < count; i++) {
+        Path src = new Path(renameDir, "src" + i);
+        Path dst = new Path(renameDir, "dst" + i);
+        assertFalse(dfs.exists(src));
+        assertTrue(dfs.exists(dst));
+      }
+    } finally {
+      if (dfs != null) {
+        dfs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void waitForReturnValues(
+      final Map<Integer, Future<Void>> returnFutures, final int start,
+      final int end) throws InterruptedException, ExecutionException {
+    LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+    for (int i = start; i < end; i++) {
+      LOG.info("calling Future#get #" + i);
+      returnFutures.get(i).get();
+    }
+  }
+
+  @Test
+  public void testConservativeConcurrentAsyncAPI() throws Exception {
+    internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI");
   }
 
   @Test(timeout = 60000)
-  public void testConcurrentAsyncRename() throws Exception {
-    final Path parent = new Path(
-        String.format("/test/%s/", "testConcurrentAsyncRename"));
-    assertTrue(fs.mkdirs(parent));
-
-    // prepare test
-    final Path[] srcs = new Path[NUM_TESTS];
-    final Path[] dsts = new Path[NUM_TESTS];
-    for (int i = 0; i < NUM_TESTS; i++) {
+  public void testAggressiveConcurrentAsyncAPI() throws Exception {
+    internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI");
+  }
+
+  private void internalTestConcurrentAsyncAPI(final int asyncCallLimit,
+      final String basePath) throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    int count = 500;
+
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    // set the limit of max async calls
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+        asyncCallLimit);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+    // start mini cluster
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    cluster.waitActive();
+    AsyncDistributedFileSystem adfs = cluster.getFileSystem()
+        .getAsyncDistributedFileSystem();
+
+    // prepare for test
+    FileSystem rootFs = FileSystem.get(conf);
+    final Path parent = new Path(String.format("/test/%s/", basePath));
+    final Path[] srcs = new Path[count];
+    final Path[] dsts = new Path[count];
+    short[] permissions = new short[count];
+    for (int i = 0; i < count; i++) {
       srcs[i] = new Path(parent, "src" + i);
       dsts[i] = new Path(parent, "dst" + i);
-      DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1);
-      DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1);
+      DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1);
+      assertTrue(rootFs.exists(srcs[i]));
+      assertTrue(rootFs.getFileStatus(srcs[i]).isFile());
+      assertTrue(rootFs.exists(dsts[i]));
+      assertTrue(rootFs.getFileStatus(dsts[i]).isFile());
+      permissions[i] = permGenerator.next();
     }
 
-    // concurrently invoking many rename
-    int start = 0, end = 0;
-    Map<Integer, Future<Void>> retFutures =
+    Map<Integer, Future<Void>> renameRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> permRetFutures =
+        new HashMap<Integer, Future<Void>>();
+    Map<Integer, Future<Void>> ownerRetFutures =
         new HashMap<Integer, Future<Void>>();
-    for (int i = 0; i < NUM_TESTS; i++) {
+    int start = 0, end = 0;
+    // test rename
+    for (int i = 0; i < count; i++) {
       for (;;) {
         try {
-          LOG.info("rename #" + i);
-          Future<Void> retFuture = adfs.rename(srcs[i], dsts[i],
+          Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
               Rename.OVERWRITE);
-          retFutures.put(i, retFuture);
+          renameRetFutures.put(i, returnFuture);
           break;
         } catch (AsyncCallLimitExceededException e) {
-          /**
-           * reached limit of async calls, fetch results of finished async calls
-           * to let follow-on calls go
-           */
-          LOG.error(e);
           start = end;
           end = i;
-          LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
-          waitForReturnValues(retFutures, start, end);
+          waitForReturnValues(renameRetFutures, start, end);
         }
       }
     }
 
     // wait for completing the calls
-    waitForReturnValues(retFutures, end, NUM_TESTS);
+    for (int i = start; i < count; i++) {
+      renameRetFutures.get(i).get();
+    }
+
+    // Restart NN and check the rename successfully
+    cluster.restartNameNodes();
+
+    // very the src should not exist, dst should
+    for (int i = 0; i < count; i++) {
+      assertFalse(rootFs.exists(srcs[i]));
+      assertTrue(rootFs.exists(dsts[i]));
+    }
+
+    // test permissions
+    try {
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<Void> retFuture = adfs.setPermission(dsts[i],
+                new FsPermission(permissions[i]));
+            permRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForReturnValues(permRetFutures, start, end);
+          }
+        }
+      }
+      // wait for completing the calls
+      for (int i = start; i < count; i++) {
+        permRetFutures.get(i).get();
+      }
+
+      // Restart NN and check permission then
+      cluster.restartNameNodes();
+
+      // verify the permission
+      for (int i = 0; i < count; i++) {
+        assertTrue(rootFs.exists(dsts[i]));
+        FsPermission fsPerm = new FsPermission(permissions[i]);
+        checkAccessPermissions(rootFs.getFileStatus(dsts[i]),
+            fsPerm.getUserAction());
+      }
+
+      // test setOwner
+      start = 0;
+      end = 0;
+      for (int i = 0; i < count; i++) {
+        for (;;) {
+          try {
+            Future<Void> retFuture = adfs.setOwner(dsts[i], "user1",
+                "group2");
+            ownerRetFutures.put(i, retFuture);
+            break;
+          } catch (AsyncCallLimitExceededException e) {
+            start = end;
+            end = i;
+            waitForReturnValues(ownerRetFutures, start, end);
+          }
+        }
+      }
+      // wait for completing the calls
+      for (int i = start; i < count; i++) {
+        ownerRetFutures.get(i).get();
+      }
 
-    // verify the src dir should not exist, dst should
-    verifyRenames(srcs, dsts);
+      // Restart NN and check owner then
+      cluster.restartNameNodes();
+
+      // verify the owner
+      for (int i = 0; i < count; i++) {
+        assertTrue(rootFs.exists(dsts[i]));
+        assertTrue(
+            "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner()));
+        assertTrue(
+            "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup()));
+      }
+    } catch (AccessControlException ace) {
+      throw ace;
+    } finally {
+      if (rootFs != null) {
+        rootFs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
 
-  private void verifyRenames(final Path[] srcs, final Path[] dsts)
+  static void checkAccessPermissions(FileStatus stat, FsAction mode)
       throws IOException {
-    for (int i = 0; i < NUM_TESTS; i++) {
-      assertFalse(fs.exists(srcs[i]));
-      assertTrue(fs.exists(dsts[i]));
+    checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode);
+  }
+
+  static void checkAccessPermissions(final UserGroupInformation ugi,
+      FileStatus stat, FsAction mode) throws IOException {
+    FsPermission perm = stat.getPermission();
+    String user = ugi.getShortUserName();
+    List<String> groups = Arrays.asList(ugi.getGroupNames());
+
+    if (user.equals(stat.getOwner())) {
+      if (perm.getUserAction().implies(mode)) {
+        return;
+      }
+    } else if (groups.contains(stat.getGroup())) {
+      if (perm.getGroupAction().implies(mode)) {
+        return;
+      }
+    } else {
+      if (perm.getOtherAction().implies(mode)) {
+        return;
+      }
     }
+    throw new AccessControlException(String.format(
+        "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat
+            .getPath(), stat.getOwner(), stat.getGroup(),
+        stat.isDirectory() ? "d" : "-", perm));
   }
 
-  void waitForReturnValues(final Map<Integer, Future<Void>> retFutures,
-      final int start, final int end)
-      throws InterruptedException, ExecutionException {
-    TestAsyncDFS.waitForReturnValues(retFutures, start, end);
+  @Test(timeout = 60000)
+  public void testAsyncAPIWithException() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    String group1 = "group1";
+    String group2 = "group2";
+    String user1 = "user1";
+    UserGroupInformation ugi1;
+
+    // explicitly turn on permission checking
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+    // create fake mapping for the groups
+    Map<String, String[]> u2gMap = new HashMap<String, String[]>(1);
+    u2gMap.put(user1, new String[] {group1, group2});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap);
+
+    // Initiate all four users
+    ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+        group1, group2 });
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem rootFs = FileSystem.get(conf);
+    final Path renameDir = new Path("/test/async_api_exception/");
+    final Path src = new Path(renameDir, "src");
+    final Path dst = new Path(renameDir, "dst");
+    rootFs.mkdirs(src);
+
+    AsyncDistributedFileSystem adfs = ugi1
+        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
+          @Override
+          public AsyncDistributedFileSystem run() throws Exception {
+            return cluster.getFileSystem().getAsyncDistributedFileSystem();
+          }
+        });
+
+    Future<Void> retFuture;
+    try {
+      retFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the path parent", e
+          .getMessage().contains(src.getParent().toUri().getPath()));
+    }
+
+    FsPermission fsPerm = new FsPermission(permGenerator.next());
+    try {
+      retFuture = adfs.setPermission(src, fsPerm);
+      retFuture.get();
+    } catch (ExecutionException e) {
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the name of the path",
+          e.getMessage().contains(src.getName()));
+    }
+
+    try {
+      retFuture = adfs.setOwner(src, "user1", "group2");
+      retFuture.get();
+    } catch (ExecutionException e) {
+      TestAsyncDFS.checkPermissionDenied(e, src, user1);
+      assertTrue("Permission denied messages must carry the name of the path",
+          e.getMessage().contains(src.getName()));
+    } finally {
+      if (rootFs != null) {
+        rootFs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[6/8] hadoop git commit: Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."

Posted by wa...@apache.org.
Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."

This reverts commit 658072d62e856c65632f1a66b10808de625649f1.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/071aeab5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/071aeab5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/071aeab5

Branch: refs/heads/branch-2
Commit: 071aeab5853087314efd6252b9b7e4b64979a0fd
Parents: 87ea078
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:10:49 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:10:49 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 119 +++++++++---------
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  62 +++++-----
 .../apache/hadoop/util/concurrent/AsyncGet.java |  60 ---------
 .../hadoop/util/concurrent/AsyncGetFuture.java  |  73 -----------
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 122 ++++++++-----------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  26 +++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  33 +++--
 7 files changed, 183 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 892df89..afad066 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,46 @@
 
 package org.apache.hadoop.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-import java.io.*;
-import java.net.*;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
-import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
-      = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>>
+      RETURN_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getAsyncRpcResponse() {
-    return (Future<T>) ASYNC_RPC_RESPONSE.get();
+  public static <T> Future<T> getReturnRpcResponse() {
+    return (Future<T>) RETURN_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -356,11 +381,6 @@ public class Client implements AutoCloseable {
       }
     }
 
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + id;
-    }
-
     /** Indicate when the call is complete and the
      * value or error are available.  Notifies by default.  */
     protected synchronized void callComplete() {
@@ -1395,32 +1415,27 @@ public class Client implements AutoCloseable {
     }
 
     if (isAsynchronousMode()) {
-      final AsyncGet<Writable, IOException> asyncGet
-          = new AsyncGet<Writable, IOException>() {
+      Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
-        public Writable get(long timeout, TimeUnit unit)
-            throws IOException, TimeoutException{
-          boolean done = true;
-          try {
-            final Writable w = getRpcResponse(call, connection, timeout, unit);
-            if (w == null) {
-              done = false;
-              throw new TimeoutException(call + " timed out "
-                  + timeout + " " + unit);
-            }
-            return w;
-          } finally {
-            if (done) {
+        public Writable get() throws InterruptedException, ExecutionException {
+          if (callled.compareAndSet(false, true)) {
+            try {
+              set(getRpcResponse(call, connection));
+            } catch (IOException ie) {
+              setException(ie);
+            } finally {
               releaseAsyncCall();
             }
           }
+          return super.get();
         }
       };
 
-      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+      RETURN_RPC_RESPONSE.set(returnFuture);
       return null;
     } else {
-      return getRpcResponse(call, connection, -1, null);
+      return getRpcResponse(call, connection);
     }
   }
 
@@ -1456,18 +1471,12 @@ public class Client implements AutoCloseable {
     return asyncCallCounter.get();
   }
 
-  /** @return the rpc response or, in case of timeout, null. */
-  private Writable getRpcResponse(final Call call, final Connection connection,
-      final long timeout, final TimeUnit unit) throws IOException {
+  private Writable getRpcResponse(final Call call, final Connection connection)
+      throws IOException {
     synchronized (call) {
       while (!call.done) {
         try {
-          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
-              timeout, unit);
-          call.wait(waitTimeout); // wait for the result
-          if (waitTimeout > 0 && !call.done) {
-            return null;
-          }
+          call.wait();                           // wait for the result
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Call interrupted");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 4641a67..350e041 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,9 +18,21 @@
 
 package org.apache.hadoop.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
 
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 
 /**
  * RPC Engine for for protobuf based RPCs.
@@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<AsyncGet<Message, Exception>>
-      ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
+  private static final ThreadLocal<Callable<?>>
+      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
 
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  @SuppressWarnings("unchecked")
   @Unstable
-  public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
-    return ASYNC_RETURN_MESSAGE.get();
+  public static <T> Callable<T> getReturnMessageCallback() {
+    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
   }
 
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
-        final AsyncGet<Message, Exception> asyncGet
-            = new AsyncGet<Message, Exception>() {
+        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+        Callable<Message> callback = new Callable<Message>() {
           @Override
-          public Message get(long timeout, TimeUnit unit) throws Exception {
-            final RpcResponseWrapper rrw = timeout < 0?
-                frrw.get(): frrw.get(timeout, unit);
-            return getReturnMessage(method, rrw);
+          public Message call() throws Exception {
+            return getReturnMessage(method, frrw.get());
           }
         };
-        ASYNC_RETURN_MESSAGE.set(asyncGet);
+        RETURN_MESSAGE_CALLBACK.set(callback);
         return null;
       } else {
         return getReturnMessage(method, val);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
deleted file mode 100644
index 5eac869..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
- *
- * When the return value is still being computed, invoking
- * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
- * The method should be invoked again and again
- * until the underlying computation is completed.
- *
- * @param <R> The type of the return value.
- * @param <E> The exception type that the underlying implementation may throw.
- */
-public interface AsyncGet<R, E extends Throwable> {
-  /**
-   * Get the result.
-   *
-   * @param timeout The maximum time period to wait.
-   *                When timeout == 0, it does not wait at all.
-   *                When timeout < 0, it waits indefinitely.
-   * @param unit The unit of the timeout value
-   * @return the result, which is possibly null.
-   * @throws E an exception thrown by the underlying implementation.
-   * @throws TimeoutException if it cannot return after the given time period.
-   * @throws InterruptedException if the thread is interrupted.
-   */
-  R get(long timeout, TimeUnit unit)
-      throws E, TimeoutException, InterruptedException;
-
-  /** Utility */
-  class Util {
-    /**
-     * @return {@link Object#wait(long)} timeout converted
-     *         from {@link #get(long, TimeUnit)} timeout.
-     */
-    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
-      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
deleted file mode 100644
index d687867..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/** A {@link Future} implemented using an {@link AsyncGet} object. */
-public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
-  public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
-
-  private final AtomicBoolean called = new AtomicBoolean(false);
-  private final AsyncGet<T, E> asyncGet;
-
-  public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
-    this.asyncGet = asyncGet;
-  }
-
-  private void callAsyncGet(long timeout, TimeUnit unit) {
-    if (!isCancelled() && called.compareAndSet(false, true)) {
-      try {
-        set(asyncGet.get(timeout, unit));
-      } catch (TimeoutException te) {
-        LOG.trace("TRACE", te);
-        called.compareAndSet(true, false);
-      } catch (Throwable e) {
-        LOG.trace("TRACE", e);
-        setException(e);
-      }
-    }
-  }
-
-  @Override
-  public T get() throws InterruptedException, ExecutionException {
-    callAsyncGet(-1, TimeUnit.MILLISECONDS);
-    return super.get();
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit)
-      throws InterruptedException, TimeoutException, ExecutionException {
-    callAsyncGet(timeout, unit);
-    return super.get(0, TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  public boolean isDone() {
-    callAsyncGet(0, TimeUnit.MILLISECONDS);
-    return super.isDone();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 7623975..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,6 +18,20 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,17 +48,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
 public class TestAsyncIPC {
 
   private static Configuration conf;
@@ -84,51 +87,26 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
+          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
-          throw new RuntimeException(e);
         }
       }
     }
 
-    void assertReturnValues() throws InterruptedException, ExecutionException {
+    public void waitForReturnValues() throws InterruptedException,
+        ExecutionException {
       for (int i = 0; i < count; i++) {
         LongWritable value = returnFutures.get(i).get();
-        Assert.assertEquals("call" + i + " failed.",
-            expectedValues.get(i).longValue(), value.get());
-      }
-      Assert.assertFalse(failed);
-    }
-
-    void assertReturnValues(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException {
-      final boolean[] checked = new boolean[count];
-      for(boolean done = false; !done;) {
-        done = true;
-        for (int i = 0; i < count; i++) {
-          if (checked[i]) {
-            continue;
-          } else {
-            done = false;
-          }
-
-          final LongWritable value;
-          try {
-            value = returnFutures.get(i).get(timeout, unit);
-          } catch (TimeoutException e) {
-            LOG.info("call" + i + " caught ", e);
-            continue;
-          }
-
-          Assert.assertEquals("call" + i + " failed.",
-              expectedValues.get(i).longValue(), value.get());
-          checked[i] = true;
+        if (expectedValues.get(i) != value.get()) {
+          LOG.fatal(String.format("Call-%d failed!", i));
+          failed = true;
+          break;
         }
       }
-      Assert.assertFalse(failed);
     }
   }
 
@@ -205,7 +183,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
+      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
       returnFutures.put(idx, returnFuture);
       expectedValues.put(idx, param);
     }
@@ -255,7 +233,10 @@ public class TestAsyncIPC {
     }
     for (int i = 0; i < callerCount; i++) {
       callers[i].join();
-      callers[i].assertReturnValues();
+      callers[i].waitForReturnValues();
+      String msg = String.format("Expected not failed for caller-%d: %s.", i,
+          callers[i]);
+      assertFalse(msg, callers[i].failed);
     }
     for (int i = 0; i < clientCount; i++) {
       clients[i].stop();
@@ -277,37 +258,25 @@ public class TestAsyncIPC {
     try {
       AsyncCaller caller = new AsyncCaller(client, addr, callCount);
       caller.run();
-      caller.assertReturnValues();
-      caller.assertReturnValues();
-      caller.assertReturnValues();
-      Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
-    } finally {
-      client.stop();
-      server.stop();
-    }
-  }
 
-  @Test(timeout = 60000)
-  public void testFutureGetWithTimeout() throws IOException,
-      InterruptedException, ExecutionException {
-//    GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
-    final Server server = new TestIPC.TestServer(10, true, conf);
-    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
+      caller.waitForReturnValues();
+      String msg = String.format(
+          "First time, expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
 
-    final Client client = new Client(LongWritable.class, conf);
+      caller.waitForReturnValues();
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+      msg = String.format("Second time, expected not failed for caller: %s.",
+          caller);
+      assertFalse(msg, caller.failed);
 
-    try {
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
-      caller.run();
-      caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
     } finally {
       client.stop();
       server.stop();
     }
   }
 
-
   public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
@@ -398,7 +367,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -435,7 +406,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -470,7 +443,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -514,7 +489,10 @@ public class TestAsyncIPC {
       }
       for (int i = 0; i < callerCount; ++i) {
         callers[i].join();
-        callers[i].assertReturnValues();
+        callers[i].waitForReturnValues();
+        String msg = String.format("Expected not failed for caller-%d: %s.", i,
+            callers[i]);
+        assertFalse(msg, callers[i].failed);
       }
     } finally {
       client.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 1f60df2..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,17 +19,20 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
+import com.google.common.util.concurrent.AbstractFuture;
+
 /****************************************************************
  * Implementation of the asynchronous distributed file system.
  * This instance of this class is the way end-user code interacts
@@ -49,9 +52,22 @@ public class AsyncDistributedFileSystem {
   }
 
   static <T> Future<T> getReturnValue() {
-    final AsyncGet<T, Exception> asyncGet
-        = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
-    return new AsyncGetFuture<>(asyncGet);
+    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+        .getReturnValueCallback();
+    Future<T> returnFuture = new AbstractFuture<T>() {
+      private final AtomicBoolean called = new AtomicBoolean(false);
+      public T get() throws InterruptedException, ExecutionException {
+        if (called.compareAndSet(false, true)) {
+          try {
+            set(returnValueCallback.call());
+          } catch (Exception e) {
+            setException(e);
+          }
+        }
+        return super.get();
+      }
+    };
+    return returnFuture;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/071aeab5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 796aa29..28ac78d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,8 +24,7 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -192,7 +191,6 @@ import org.apache.hadoop.security.token.Token;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.util.concurrent.AsyncGet;
 
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -204,8 +202,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<AsyncGet<?, Exception>>
-      ASYNC_RETURN_VALUE = new ThreadLocal<>();
+  private static final ThreadLocal<Callable<?>>
+      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -240,8 +238,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
-    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+  public static <T> Callable<T> getReturnValueCallback() {
+    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
   }
 
   @Override
@@ -363,7 +361,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setPermission(null, req);
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.setPermission(null, req);
       }
@@ -372,18 +370,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
-  private void setAsyncReturnValue() {
-    final AsyncGet<Message, Exception> asyncReturnMessage
-        = ProtobufRpcEngine.getAsyncReturnMessage();
-    final AsyncGet<Void, Exception> asyncGet
-        = new AsyncGet<Void, Exception>() {
+  private void setReturnValueCallback() {
+    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+        .getReturnMessageCallback();
+    Callable<Void> callBack = new Callable<Void>() {
       @Override
-      public Void get(long timeout, TimeUnit unit) throws Exception {
-        asyncReturnMessage.get(timeout, unit);
+      public Void call() throws Exception {
+        returnMessageCallback.call();
         return null;
       }
     };
-    ASYNC_RETURN_VALUE.set(asyncGet);
+    RETURN_VALUE_CALLBACK.set(callBack);
   }
 
   @Override
@@ -398,7 +395,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setOwner(null, req.build());
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.setOwner(null, req.build());
       }
@@ -530,7 +527,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.rename2(null, req);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org