You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/07 20:26:18 UTC

[26/50] [abbrv] hadoop git commit: Revert "HADOOP-13226 Support async call retry and failover."

Revert "HADOOP-13226 Support async call retry and failover."

This reverts commit 83f2f78c118a7e52aba5104bd97b0acedc96be7b.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5360da8b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5360da8b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5360da8b

Branch: refs/heads/YARN-4757
Commit: 5360da8bd9f720384860f411bee081aef13b4bd4
Parents: 47e0321
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:09:09 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:09:09 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   8 +-
 .../hadoop/io/retry/AsyncCallHandler.java       | 321 -------------------
 .../org/apache/hadoop/io/retry/CallReturn.java  |  75 -----
 .../hadoop/io/retry/RetryInvocationHandler.java | 134 ++------
 .../apache/hadoop/io/retry/RetryPolicies.java   |   4 +-
 .../main/java/org/apache/hadoop/ipc/Client.java |  25 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  13 +-
 .../apache/hadoop/util/concurrent/AsyncGet.java |  17 +-
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |  10 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |   7 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  42 +--
 .../org/apache/hadoop/hdfs/TestAsyncDFS.java    |  43 ++-
 .../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 181 -----------
 .../hdfs/server/namenode/ha/HATestUtil.java     |   9 +-
 14 files changed, 114 insertions(+), 775 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index a644aa5..ab8673b 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -345,13 +345,7 @@
        <Bug pattern="SF_SWITCH_FALLTHROUGH" />
      </Match>
 
-     <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
-     <Match>
-       <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
-       <Method name="wait" />
-       <Bug pattern="WA_NOT_IN_LOOP" />
-     </Match>
-
+     <!-- Synchronization performed on util.concurrent instance. -->
      <Match>
        <Class name="org.apache.hadoop.service.AbstractService" />
        <Method name="stop" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
deleted file mode 100644
index 5a03b03..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/** Handle async calls. */
-@InterfaceAudience.Private
-public class AsyncCallHandler {
-  static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
-
-  private static final ThreadLocal<AsyncGet<?, Exception>>
-      LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
-  private static final ThreadLocal<AsyncGet<Object, Throwable>>
-      ASYNC_RETURN = new ThreadLocal<>();
-
-  /** @return the async return value from {@link AsyncCallHandler}. */
-  @InterfaceStability.Unstable
-  @SuppressWarnings("unchecked")
-  public static <R, T extends  Throwable> AsyncGet<R, T> getAsyncReturn() {
-    final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
-    if (asyncGet != null) {
-      ASYNC_RETURN.set(null);
-      return asyncGet;
-    } else {
-      return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
-    }
-  }
-
-  /** For the lower rpc layers to set the async return value. */
-  @InterfaceStability.Unstable
-  public static void setLowerLayerAsyncReturn(
-      AsyncGet<?, Exception> asyncReturn) {
-    LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
-  }
-
-  private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
-    final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
-    Preconditions.checkNotNull(asyncGet);
-    LOWER_LAYER_ASYNC_RETURN.set(null);
-    return asyncGet;
-  }
-
-  /** A simple concurrent queue which keeping track the empty start time. */
-  static class ConcurrentQueue<T> {
-    private final Queue<T> queue = new LinkedList<>();
-    private long emptyStartTime = Time.monotonicNow();
-
-    synchronized int size() {
-      return queue.size();
-    }
-
-    /** Is the queue empty for more than the given time in millisecond? */
-    synchronized boolean isEmpty(long time) {
-      return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
-    }
-
-    synchronized void offer(T c) {
-      final boolean added = queue.offer(c);
-      Preconditions.checkState(added);
-    }
-
-    synchronized T poll() {
-      Preconditions.checkState(!queue.isEmpty());
-      final T t = queue.poll();
-      if (queue.isEmpty()) {
-        emptyStartTime = Time.monotonicNow();
-      }
-      return t;
-    }
-  }
-
-  /** A queue for handling async calls. */
-  static class AsyncCallQueue {
-    private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
-    private final Processor processor = new Processor();
-
-    void addCall(AsyncCall call) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("add " + call);
-      }
-      queue.offer(call);
-      processor.tryStart();
-    }
-
-    void checkCalls() {
-      final int size = queue.size();
-      for (int i = 0; i < size; i++) {
-        final AsyncCall c = queue.poll();
-        if (!c.isDone()) {
-          queue.offer(c); // the call is not done yet, add it back.
-        }
-      }
-    }
-
-    /** Process the async calls in the queue. */
-    private class Processor {
-      static final long GRACE_PERIOD = 10*1000L;
-      static final long SLEEP_PERIOD = 100L;
-
-      private final AtomicReference<Thread> running = new AtomicReference<>();
-
-      boolean isRunning(Daemon d) {
-        return d == running.get();
-      }
-
-      void tryStart() {
-        final Thread current = Thread.currentThread();
-        if (running.compareAndSet(null, current)) {
-          final Daemon daemon = new Daemon() {
-            @Override
-            public void run() {
-              for (; isRunning(this);) {
-                try {
-                  Thread.sleep(SLEEP_PERIOD);
-                } catch (InterruptedException e) {
-                  kill(this);
-                  return;
-                }
-
-                checkCalls();
-                tryStop(this);
-              }
-            }
-          };
-
-          final boolean set = running.compareAndSet(current, daemon);
-          Preconditions.checkState(set);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
-          }
-          daemon.start();
-        }
-      }
-
-      void tryStop(Daemon d) {
-        if (queue.isEmpty(GRACE_PERIOD)) {
-          kill(d);
-        }
-      }
-
-      void kill(Daemon d) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Killing " + d);
-        }
-        final boolean set = running.compareAndSet(d, null);
-        Preconditions.checkState(set);
-      }
-    }
-  }
-
-  static class AsyncValue<V> {
-    private V value;
-
-    synchronized V waitAsyncValue(long timeout, TimeUnit unit)
-        throws InterruptedException, TimeoutException {
-      if (value != null) {
-        return value;
-      }
-      AsyncGet.Util.wait(this, timeout, unit);
-      if (value != null) {
-        return value;
-      }
-
-      throw new TimeoutException("waitCallReturn timed out "
-          + timeout + " " + unit);
-    }
-
-    synchronized void set(V v) {
-      Preconditions.checkNotNull(v);
-      Preconditions.checkState(value == null);
-      value = v;
-      notify();
-    }
-
-    synchronized boolean isDone() {
-      return value != null;
-    }
-  }
-
-  static class AsyncCall extends RetryInvocationHandler.Call {
-    private final AsyncCallHandler asyncCallHandler;
-
-    private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
-    private AsyncGet<?, Exception> lowerLayerAsyncGet;
-
-    AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
-              RetryInvocationHandler.Counters counters,
-              RetryInvocationHandler<?> retryInvocationHandler,
-              AsyncCallHandler asyncCallHandler) {
-      super(method, args, isRpc, callId, counters, retryInvocationHandler);
-
-      this.asyncCallHandler = asyncCallHandler;
-    }
-
-    /** @return true if the call is done; otherwise, return false. */
-    boolean isDone() {
-      final CallReturn r = invokeOnce();
-      switch (r.getState()) {
-        case RETURNED:
-        case EXCEPTION:
-          asyncCallReturn.set(r); // the async call is done
-          return true;
-        case RETRY:
-          invokeOnce();
-          break;
-        case ASYNC_CALL_IN_PROGRESS:
-        case ASYNC_INVOKED:
-          // nothing to do
-          break;
-        default:
-          Preconditions.checkState(false);
-      }
-      return false;
-    }
-
-    @Override
-    CallReturn invoke() throws Throwable {
-      LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
-      if (lowerLayerAsyncGet != null) {
-        // async call was submitted early, check the lower level async call
-        final boolean isDone = lowerLayerAsyncGet.isDone();
-        LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
-        if (!isDone) {
-          return CallReturn.ASYNC_CALL_IN_PROGRESS;
-        }
-        try {
-          return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
-        } finally {
-          lowerLayerAsyncGet = null;
-        }
-      }
-
-      // submit a new async call
-      LOG.trace("invoke: ASYNC_INVOKED");
-      final boolean mode = Client.isAsynchronousMode();
-      try {
-        Client.setAsynchronousMode(true);
-        final Object r = invokeMethod();
-        // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
-        Preconditions.checkState(r == null);
-        lowerLayerAsyncGet = getLowerLayerAsyncReturn();
-
-        if (counters.isZeros()) {
-          // first async attempt, initialize
-          LOG.trace("invoke: initAsyncCall");
-          asyncCallHandler.initAsyncCall(this, asyncCallReturn);
-        }
-        return CallReturn.ASYNC_INVOKED;
-      } finally {
-        Client.setAsynchronousMode(mode);
-      }
-    }
-  }
-
-  private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
-  private volatile boolean hasSuccessfulCall = false;
-
-  AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
-                         int callId, RetryInvocationHandler.Counters counters,
-                         RetryInvocationHandler<?> retryInvocationHandler) {
-    return new AsyncCall(method, args, isRpc, callId, counters,
-        retryInvocationHandler, this);
-  }
-
-  boolean hasSuccessfulCall() {
-    return hasSuccessfulCall;
-  }
-
-  private void initAsyncCall(final AsyncCall asyncCall,
-                             final AsyncValue<CallReturn> asyncCallReturn) {
-    asyncCalls.addCall(asyncCall);
-
-    final AsyncGet<Object, Throwable> asyncGet
-        = new AsyncGet<Object, Throwable>() {
-      @Override
-      public Object get(long timeout, TimeUnit unit) throws Throwable {
-        final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
-        final Object r = c.getReturnValue();
-        hasSuccessfulCall = true;
-        return r;
-      }
-
-      @Override
-      public boolean isDone() {
-        return asyncCallReturn.isDone();
-      }
-    };
-    ASYNC_RETURN.set(asyncGet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
deleted file mode 100644
index 943725c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-
-/** The call return from a method invocation. */
-class CallReturn {
-  /** The return state. */
-  enum State {
-    /** Call is returned successfully. */
-    RETURNED,
-    /** Call throws an exception. */
-    EXCEPTION,
-    /** Call should be retried according to the {@link RetryPolicy}. */
-    RETRY,
-    /** Call, which is async, is still in progress. */
-    ASYNC_CALL_IN_PROGRESS,
-    /** Call, which is async, just has been invoked. */
-    ASYNC_INVOKED
-  }
-
-  static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
-      State.ASYNC_CALL_IN_PROGRESS);
-  static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
-  static final CallReturn RETRY = new CallReturn(State.RETRY);
-
-  private final Object returnValue;
-  private final Throwable thrown;
-  private final State state;
-
-  CallReturn(Object r) {
-    this(r, null, State.RETURNED);
-  }
-  CallReturn(Throwable t) {
-    this(null, t, State.EXCEPTION);
-    Preconditions.checkNotNull(t);
-  }
-  private CallReturn(State s) {
-    this(null, null, s);
-  }
-  private CallReturn(Object r, Throwable t, State s) {
-    Preconditions.checkArgument(r == null || t == null);
-    returnValue = r;
-    thrown = t;
-    state = s;
-  }
-
-  State getState() {
-    return state;
-  }
-
-  Object getReturnValue() throws Throwable {
-    if (state == State.EXCEPTION) {
-      throw thrown;
-    }
-    Preconditions.checkState(state == State.RETURNED, "state == %s", state);
-    return returnValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index f2b2c99..300d0c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,83 +42,11 @@ import java.util.Map;
 public class RetryInvocationHandler<T> implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
 
-  static class Call {
-    private final Method method;
-    private final Object[] args;
-    private final boolean isRpc;
-    private final int callId;
-    final Counters counters;
-
-    private final RetryPolicy retryPolicy;
-    private final RetryInvocationHandler<?> retryInvocationHandler;
-
-    Call(Method method, Object[] args, boolean isRpc, int callId,
-         Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
-      this.method = method;
-      this.args = args;
-      this.isRpc = isRpc;
-      this.callId = callId;
-      this.counters = counters;
-
-      this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
-      this.retryInvocationHandler = retryInvocationHandler;
-    }
-
-    /** Invoke the call once without retrying. */
-    synchronized CallReturn invokeOnce() {
-      try {
-        // The number of times this invocation handler has ever been failed over
-        // before this method invocation attempt. Used to prevent concurrent
-        // failed method invocations from triggering multiple failover attempts.
-        final long failoverCount = retryInvocationHandler.getFailoverCount();
-        try {
-          return invoke();
-        } catch (Exception e) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this, e);
-          }
-          if (Thread.currentThread().isInterrupted()) {
-            // If interrupted, do not retry.
-            throw e;
-          }
-          retryInvocationHandler.handleException(
-              method, retryPolicy, failoverCount, counters, e);
-          return CallReturn.RETRY;
-        }
-      } catch(Throwable t) {
-        return new CallReturn(t);
-      }
-    }
-
-    CallReturn invoke() throws Throwable {
-      return new CallReturn(invokeMethod());
-    }
-
-    Object invokeMethod() throws Throwable {
-      if (isRpc) {
-        Client.setCallIdAndRetryCount(callId, counters.retries);
-      }
-      return retryInvocationHandler.invokeMethod(method, args);
-    }
-
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + "#" + callId + ": "
-          + method.getDeclaringClass().getSimpleName() + "." + method.getName()
-          + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
-          +  ")";
-    }
-  }
-
-  static class Counters {
+  private static class Counters {
     /** Counter for retries. */
     private int retries;
     /** Counter for method invocation has been failed over. */
     private int failovers;
-
-    boolean isZeros() {
-      return retries == 0 && failovers == 0;
-    }
   }
 
   private static class ProxyDescriptor<T> {
@@ -216,13 +144,11 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
 
   private final ProxyDescriptor<T> proxyDescriptor;
 
-  private volatile boolean hasSuccessfulCall = false;
-
+  private volatile boolean hasMadeASuccessfulCall = false;
+  
   private final RetryPolicy defaultPolicy;
   private final Map<String,RetryPolicy> methodNameToPolicyMap;
 
-  private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
-
   protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
       RetryPolicy retryPolicy) {
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -241,35 +167,38 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     return policy != null? policy: defaultPolicy;
   }
 
-  private long getFailoverCount() {
-    return proxyDescriptor.getFailoverCount();
-  }
-
-  private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
-                       Counters counters) {
-    if (Client.isAsynchronousMode()) {
-      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
-          counters, this);
-    } else {
-      return new Call(method, args, isRpc, callId, counters, this);
-    }
-  }
-
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
       throws Throwable {
     final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
     final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
-    final Counters counters = new Counters();
+    return invoke(method, args, isRpc, callId, new Counters());
+  }
+
+  private Object invoke(final Method method, final Object[] args,
+      final boolean isRpc, final int callId, final Counters counters)
+      throws Throwable {
+    final RetryPolicy policy = getRetryPolicy(method);
 
-    final Call call = newCall(method, args, isRpc, callId, counters);
     while (true) {
-      final CallReturn c = call.invokeOnce();
-      final CallReturn.State state = c.getState();
-      if (state == CallReturn.State.ASYNC_INVOKED) {
-        return null; // return null for async calls
-      } else if (c.getState() != CallReturn.State.RETRY) {
-        return c.getReturnValue();
+      // The number of times this invocation handler has ever been failed over,
+      // before this method invocation attempt. Used to prevent concurrent
+      // failed method invocations from triggering multiple failover attempts.
+      final long failoverCount = proxyDescriptor.getFailoverCount();
+
+      if (isRpc) {
+        Client.setCallIdAndRetryCount(callId, counters.retries);
+      }
+      try {
+        final Object ret = invokeMethod(method, args);
+        hasMadeASuccessfulCall = true;
+        return ret;
+      } catch (Exception ex) {
+        if (Thread.currentThread().isInterrupted()) {
+          // If interrupted, do not retry.
+          throw ex;
+        }
+        handleException(method, policy, failoverCount, counters, ex);
       }
     }
   }
@@ -310,8 +239,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       final int failovers, final long delay, final Exception ex) {
     // log info if this has made some successful calls or
     // this is not the first failover
-    final boolean info = hasSuccessfulCall || failovers != 0
-        || asyncCallHandler.hasSuccessfulCall();
+    final boolean info = hasMadeASuccessfulCall || failovers != 0;
     if (!info && !LOG.isDebugEnabled()) {
       return;
     }
@@ -337,9 +265,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
       if (!method.isAccessible()) {
         method.setAccessible(true);
       }
-      final Object r = method.invoke(proxyDescriptor.getProxy(), args);
-      hasSuccessfulCall = true;
-      return r;
+      return method.invoke(proxyDescriptor.getProxy(), args);
     } catch (InvocationTargetException e) {
       throw e.getCause();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index c0a14b7..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.io.retry;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
@@ -648,9 +647,8 @@ public class RetryPolicies {
         return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
             + retries + ") exceeded maximum allowed (" + maxRetries + ")");
       }
-
+      
       if (e instanceof ConnectException ||
-          e instanceof EOFException ||
           e instanceof NoRouteToHostException ||
           e instanceof UnknownHostException ||
           e instanceof StandbyException ||

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index ed8d905..d1d5b17 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
@@ -93,8 +94,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
-      ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+      = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -105,9 +106,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T extends Writable> AsyncGet<T, IOException>
-      getAsyncRpcResponse() {
-    return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+  public static <T> Future<T> getAsyncRpcResponse() {
+    return (Future<T>) ASYNC_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1413,16 +1413,9 @@ public class Client implements AutoCloseable {
             }
           }
         }
-
-        @Override
-        public boolean isDone() {
-          synchronized (call) {
-            return call.done;
-          }
-        }
       };
 
-      ASYNC_RPC_RESPONSE.set(asyncGet);
+      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
       return null;
     } else {
       return getRpcResponse(call, connection, -1, null);
@@ -1467,8 +1460,10 @@ public class Client implements AutoCloseable {
     synchronized (call) {
       while (!call.done) {
         try {
-          AsyncGet.Util.wait(call, timeout, unit);
-          if (timeout >= 0 && !call.done) {
+          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+              timeout, unit);
+          call.wait(waitTimeout); // wait for the result
+          if (waitTimeout > 0 && !call.done) {
             return null;
           }
         } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 315ec67..0f43fc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,6 +54,7 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -255,18 +256,14 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final AsyncGet<RpcResponseWrapper, IOException> arr
-            = Client.getAsyncRpcResponse();
+        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
         final AsyncGet<Message, Exception> asyncGet
             = new AsyncGet<Message, Exception>() {
           @Override
           public Message get(long timeout, TimeUnit unit) throws Exception {
-            return getReturnMessage(method, arr.get(timeout, unit));
-          }
-
-          @Override
-          public boolean isDone() {
-            return arr.isDone();
+            final RpcResponseWrapper rrw = timeout < 0?
+                frrw.get(): frrw.get(timeout, unit);
+            return getReturnMessage(method, rrw);
           }
         };
         ASYNC_RETURN_MESSAGE.set(asyncGet);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index f124890..5eac869 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,19 +47,14 @@ public interface AsyncGet<R, E extends Throwable> {
   R get(long timeout, TimeUnit unit)
       throws E, TimeoutException, InterruptedException;
 
-  /** @return true if the underlying computation is done; false, otherwise. */
-  boolean isDone();
-
   /** Utility */
   class Util {
-    /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
-    public static void wait(Object obj, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      if (timeout < 0) {
-        obj.wait();
-      } else if (timeout > 0) {
-        obj.wait(unit.toMillis(timeout));
-      }
+    /**
+     * @return {@link Object#wait(long)} timeout converted
+     *         from {@link #get(long, TimeUnit)} timeout.
+     */
+    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 4450c0c..0ad191b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,11 +50,6 @@ public class TestAsyncIPC {
   private static Configuration conf;
   private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
 
-  static <T extends Writable> AsyncGetFuture<T, IOException>
-      getAsyncRpcResponseFuture() {
-    return new AsyncGetFuture<>(Client.getAsyncRpcResponse());
-  }
-
   @Before
   public void setupConf() {
     conf = new Configuration();
@@ -90,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          returnFutures.put(i, getAsyncRpcResponseFuture());
+          returnFutures.put(i, Client.getAsyncRpcResponse());
           expectedValues.put(i, param);
         } catch (Exception e) {
           failed = true;
@@ -210,7 +204,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      returnFutures.put(idx, getAsyncRpcResponseFuture());
+      returnFutures.put(idx, Client.getAsyncRpcResponse());
       expectedValues.put(idx, param);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 824336a..29bac2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
 import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
@@ -51,8 +51,9 @@ public class AsyncDistributedFileSystem {
     this.dfs = dfs;
   }
 
-  private static <T> Future<T> getReturnValue() {
-    return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+  static <T> Future<T> getReturnValue() {
+    return new AsyncGetFuture<>(
+        ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index bcf5269..2373da7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -183,7 +184,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -212,6 +212,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
+  private static final ThreadLocal<AsyncGet<?, Exception>>
+      ASYNC_RETURN_VALUE = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -245,6 +247,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
+  @SuppressWarnings("unchecked")
+  @Unstable
+  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+  }
+
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -383,13 +391,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
         asyncReturnMessage.get(timeout, unit);
         return null;
       }
-
-      @Override
-      public boolean isDone() {
-        return asyncReturnMessage.isDone();
-      }
     };
-    AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+    ASYNC_RETURN_VALUE.set(asyncGet);
   }
 
   @Override
@@ -1364,20 +1367,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
         rpcProxy.getAclStatus(null, req);
         final AsyncGet<Message, Exception> asyncReturnMessage
             = ProtobufRpcEngine.getAsyncReturnMessage();
-        final AsyncGet<AclStatus, Exception> asyncGet
-            = new AsyncGet<AclStatus, Exception>() {
-          @Override
-          public AclStatus get(long timeout, TimeUnit unit) throws Exception {
-            return PBHelperClient.convert((GetAclStatusResponseProto)
-                asyncReturnMessage.get(timeout, unit));
-          }
-
-          @Override
-          public boolean isDone() {
-            return asyncReturnMessage.isDone();
-          }
-        };
-        AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+        final AsyncGet<AclStatus, Exception> asyncGet =
+            new AsyncGet<AclStatus, Exception>() {
+              @Override
+              public AclStatus get(long timeout, TimeUnit unit)
+                  throws Exception {
+                return PBHelperClient
+                    .convert((GetAclStatusResponseProto) asyncReturnMessage
+                        .get(timeout, unit));
+              }
+            };
+        ASYNC_RETURN_VALUE.set(asyncGet);
         return null;
       } else {
         return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 6a60290..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
 import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
 import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
 import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -71,7 +70,7 @@ public class TestAsyncDFS {
   public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
   private final short replFactor = 1;
   private final long blockSize = 512;
-  private long fileLen = 0;
+  private long fileLen = blockSize * 3;
   private final long seed = Time.now();
   private final Random r = new Random(seed);
   private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -81,7 +80,7 @@ public class TestAsyncDFS {
 
   private Configuration conf;
   private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
+  private FileSystem fs;
   private AsyncDistributedFileSystem adfs;
 
   @Before
@@ -96,10 +95,10 @@ public class TestAsyncDFS {
         ASYNC_CALL_LIMIT);
     // set server handlers
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
-    fs = cluster.getFileSystem();
-    adfs = fs.getAsyncDistributedFileSystem();
+    fs = FileSystem.get(conf);
+    adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
   }
 
   @After
@@ -114,6 +113,31 @@ public class TestAsyncDFS {
     }
   }
 
+  static class AclQueueEntry {
+    private final Object future;
+    private final Path path;
+    private final Boolean isSetAcl;
+
+    AclQueueEntry(final Object future, final Path path,
+        final Boolean isSetAcl) {
+      this.future = future;
+      this.path = path;
+      this.isSetAcl = isSetAcl;
+    }
+
+    public final Object getFuture() {
+      return future;
+    }
+
+    public final Path getPath() {
+      return path;
+    }
+
+    public final Boolean isSetAcl() {
+      return this.isSetAcl;
+    }
+  }
+
   @Test(timeout=60000)
   public void testBatchAsyncAcl() throws Exception {
     final String basePath = "testBatchAsyncAcl";
@@ -324,7 +348,7 @@ public class TestAsyncDFS {
 
   public static void checkPermissionDenied(final Exception e, final Path dir,
       final String user) {
-    assertTrue(e.getCause() instanceof RemoteException);
+    assertTrue(e.getCause() instanceof ExecutionException);
     assertTrue("Permission denied messages must carry AccessControlException",
         e.getMessage().contains("AccessControlException"));
     assertTrue("Permission denied messages must carry the username", e
@@ -446,9 +470,4 @@ public class TestAsyncDFS {
       assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
     }
   }
-
-  @Test
-  public void testAsyncWithoutRetry() throws Exception {
-    TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
deleted file mode 100644
index 9ade8ec..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/** Test async methods with HA setup. */
-public class TestAsyncHDFSWithHA {
-  static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
-  static {
-    GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
-  }
-
-  private static <T> Future<T> getReturnValue() {
-    return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
-  }
-
-  static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
-                     Path[] dsts) throws IOException {
-    for (int i = 0; i < srcs.length; i++) {
-      srcs[i] = new Path(dir, "src" + i);
-      dsts[i] = new Path(dir, "dst" + i);
-      dfs.mkdirs(srcs[i]);
-    }
-  }
-
-  static void runTestAsyncWithoutRetry(Configuration conf,
-      MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
-    final int num = 5;
-
-    final String renameDir = "/testAsyncWithoutRetry/";
-    final Path[] srcs = new Path[num + 1];
-    final Path[] dsts = new Path[num + 1];
-    mkdirs(dfs, renameDir, srcs, dsts);
-
-    // create a proxy without retry.
-    final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
-        = NameNodeProxies.createNonHAProxy(conf,
-        cluster.getNameNode(0).getNameNodeAddress(),
-        ClientProtocol.class, UserGroupInformation.getCurrentUser(),
-        false);
-    final ClientProtocol cp = proxyInfo.getProxy();
-
-    // submit async calls
-    Client.setAsynchronousMode(true);
-    final List<Future<Void>> results = new ArrayList<>();
-    for (int i = 0; i < num; i++) {
-      final String src = srcs[i].toString();
-      final String dst = dsts[i].toString();
-      LOG.info(i + ") rename " + src + " -> " + dst);
-      cp.rename2(src, dst);
-      results.add(getReturnValue());
-    }
-    Client.setAsynchronousMode(false);
-
-    // wait for the async calls
-    for (Future<Void> f : results) {
-      f.get();
-    }
-
-    //check results
-    for (int i = 0; i < num; i++) {
-      Assert.assertEquals(false, dfs.exists(srcs[i]));
-      Assert.assertEquals(true, dfs.exists(dsts[i]));
-    }
-  }
-
-  /** Testing HDFS async methods with HA setup. */
-  @Test(timeout = 120000)
-  public void testAsyncWithHAFailover() throws Exception {
-    final int num = 10;
-
-    final Configuration conf = new HdfsConfiguration();
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(0).build();
-
-    try {
-      cluster.waitActive();
-      cluster.transitionToActive(0);
-
-      final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
-          cluster, conf);
-      runTestAsyncWithoutRetry(conf, cluster, dfs);
-
-      final String renameDir = "/testAsyncWithHAFailover/";
-      final Path[] srcs = new Path[num + 1];
-      final Path[] dsts = new Path[num + 1];
-      mkdirs(dfs, renameDir, srcs, dsts);
-
-      // submit async calls and trigger failover in the middle.
-      final AsyncDistributedFileSystem adfs
-          = dfs.getAsyncDistributedFileSystem();
-      final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
-
-      final List<Future<Void>> results = new ArrayList<>();
-      final List<IOException> exceptions = new ArrayList<>();
-      final List<Future<?>> futures = new ArrayList<>();
-      final int half = num/2;
-      for(int i = 0; i <= num; i++) {
-        final int id = i;
-        futures.add(executor.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              if (id == half) {
-                // failover
-                cluster.shutdownNameNode(0);
-                cluster.transitionToActive(1);
-              } else {
-                // rename
-                results.add(adfs.rename(srcs[id], dsts[id]));
-              }
-            } catch (IOException e) {
-              exceptions.add(e);
-            }
-          }
-        }));
-      }
-
-      // wait for the tasks
-      Assert.assertEquals(num + 1, futures.size());
-      for(int i = 0; i <= num; i++) {
-        futures.get(i).get();
-      }
-      // wait for the async calls
-      Assert.assertEquals(num, results.size());
-      Assert.assertTrue(exceptions.isEmpty());
-      for(Future<Void> r : results) {
-        r.get();
-      }
-
-      // check results
-      for(int i = 0; i <= num; i++) {
-        final boolean renamed = i != half;
-        Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
-        Assert.assertEquals(renamed, dfs.exists(dsts[i]));
-      }
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5360da8b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 169bbee..42cf3d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -136,8 +135,7 @@ public abstract class HATestUtil {
   }
   
   /** Gets the filesystem instance by setting the failover configurations */
-  public static DistributedFileSystem configureFailoverFs(
-      MiniDFSCluster cluster, Configuration conf)
+  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
       throws IOException, URISyntaxException {
     return configureFailoverFs(cluster, conf, 0);
   }
@@ -149,14 +147,13 @@ public abstract class HATestUtil {
    * @param nsIndex namespace index starting with zero
    * @throws IOException if an error occurs rolling the edit log
    */
-  public static DistributedFileSystem configureFailoverFs(
-      MiniDFSCluster cluster, Configuration conf,
+  public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
       int nsIndex) throws IOException, URISyntaxException {
     conf = new Configuration(conf);
     String logicalName = getLogicalHostname(cluster);
     setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
     FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
-    return (DistributedFileSystem)fs;
+    return fs;
   }
   
   public static void setFailoverConfigurations(MiniDFSCluster cluster,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org