You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/06/04 01:22:53 UTC
[2/8] hadoop git commit: Revert "HADOOP-13226 Support async call
retry and failover."
Revert "HADOOP-13226 Support async call retry and failover."
This reverts commit a8941d7790b2209ac779c9372298b833ededd132.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/886e2396
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/886e2396
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/886e2396
Branch: refs/heads/branch-2.8
Commit: 886e2396062ef076b3ea4aa93395cc90287efd4f
Parents: 6f69113
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:12:30 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:12:30 2016 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 8 +-
.../hadoop/io/retry/AsyncCallHandler.java | 321 -------------------
.../org/apache/hadoop/io/retry/CallReturn.java | 75 -----
.../hadoop/io/retry/RetryInvocationHandler.java | 134 ++------
.../apache/hadoop/io/retry/RetryPolicies.java | 4 +-
.../main/java/org/apache/hadoop/ipc/Client.java | 25 +-
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 13 +-
.../apache/hadoop/util/concurrent/AsyncGet.java | 17 +-
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 11 +-
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 9 +-
.../ClientNamenodeProtocolTranslatorPB.java | 42 +--
.../org/apache/hadoop/hdfs/TestAsyncDFS.java | 43 ++-
.../apache/hadoop/hdfs/TestAsyncHDFSWithHA.java | 182 -----------
.../hdfs/server/namenode/ha/HATestUtil.java | 9 +-
14 files changed, 116 insertions(+), 777 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 4bf1762..4a8cbaf 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -363,13 +363,7 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
- <!-- WA_NOT_IN_LOOP is invalid in util.concurrent.AsyncGet$Util.wait. -->
- <Match>
- <Class name="org.apache.hadoop.util.concurrent.AsyncGet$Util" />
- <Method name="wait" />
- <Bug pattern="WA_NOT_IN_LOOP" />
- </Match>
-
+ <!-- Synchronization performed on util.concurrent instance. -->
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
deleted file mode 100644
index 5a03b03..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/** Handle async calls. */
-@InterfaceAudience.Private
-public class AsyncCallHandler {
- static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
-
- private static final ThreadLocal<AsyncGet<?, Exception>>
- LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
- private static final ThreadLocal<AsyncGet<Object, Throwable>>
- ASYNC_RETURN = new ThreadLocal<>();
-
- /** @return the async return value from {@link AsyncCallHandler}. */
- @InterfaceStability.Unstable
- @SuppressWarnings("unchecked")
- public static <R, T extends Throwable> AsyncGet<R, T> getAsyncReturn() {
- final AsyncGet<R, T> asyncGet = (AsyncGet<R, T>)ASYNC_RETURN.get();
- if (asyncGet != null) {
- ASYNC_RETURN.set(null);
- return asyncGet;
- } else {
- return (AsyncGet<R, T>) getLowerLayerAsyncReturn();
- }
- }
-
- /** For the lower rpc layers to set the async return value. */
- @InterfaceStability.Unstable
- public static void setLowerLayerAsyncReturn(
- AsyncGet<?, Exception> asyncReturn) {
- LOWER_LAYER_ASYNC_RETURN.set(asyncReturn);
- }
-
- private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
- final AsyncGet<?, Exception> asyncGet = LOWER_LAYER_ASYNC_RETURN.get();
- Preconditions.checkNotNull(asyncGet);
- LOWER_LAYER_ASYNC_RETURN.set(null);
- return asyncGet;
- }
-
- /** A simple concurrent queue which keeping track the empty start time. */
- static class ConcurrentQueue<T> {
- private final Queue<T> queue = new LinkedList<>();
- private long emptyStartTime = Time.monotonicNow();
-
- synchronized int size() {
- return queue.size();
- }
-
- /** Is the queue empty for more than the given time in millisecond? */
- synchronized boolean isEmpty(long time) {
- return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
- }
-
- synchronized void offer(T c) {
- final boolean added = queue.offer(c);
- Preconditions.checkState(added);
- }
-
- synchronized T poll() {
- Preconditions.checkState(!queue.isEmpty());
- final T t = queue.poll();
- if (queue.isEmpty()) {
- emptyStartTime = Time.monotonicNow();
- }
- return t;
- }
- }
-
- /** A queue for handling async calls. */
- static class AsyncCallQueue {
- private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
- private final Processor processor = new Processor();
-
- void addCall(AsyncCall call) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("add " + call);
- }
- queue.offer(call);
- processor.tryStart();
- }
-
- void checkCalls() {
- final int size = queue.size();
- for (int i = 0; i < size; i++) {
- final AsyncCall c = queue.poll();
- if (!c.isDone()) {
- queue.offer(c); // the call is not done yet, add it back.
- }
- }
- }
-
- /** Process the async calls in the queue. */
- private class Processor {
- static final long GRACE_PERIOD = 10*1000L;
- static final long SLEEP_PERIOD = 100L;
-
- private final AtomicReference<Thread> running = new AtomicReference<>();
-
- boolean isRunning(Daemon d) {
- return d == running.get();
- }
-
- void tryStart() {
- final Thread current = Thread.currentThread();
- if (running.compareAndSet(null, current)) {
- final Daemon daemon = new Daemon() {
- @Override
- public void run() {
- for (; isRunning(this);) {
- try {
- Thread.sleep(SLEEP_PERIOD);
- } catch (InterruptedException e) {
- kill(this);
- return;
- }
-
- checkCalls();
- tryStop(this);
- }
- }
- };
-
- final boolean set = running.compareAndSet(current, daemon);
- Preconditions.checkState(set);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting AsyncCallQueue.Processor " + daemon);
- }
- daemon.start();
- }
- }
-
- void tryStop(Daemon d) {
- if (queue.isEmpty(GRACE_PERIOD)) {
- kill(d);
- }
- }
-
- void kill(Daemon d) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Killing " + d);
- }
- final boolean set = running.compareAndSet(d, null);
- Preconditions.checkState(set);
- }
- }
- }
-
- static class AsyncValue<V> {
- private V value;
-
- synchronized V waitAsyncValue(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- if (value != null) {
- return value;
- }
- AsyncGet.Util.wait(this, timeout, unit);
- if (value != null) {
- return value;
- }
-
- throw new TimeoutException("waitCallReturn timed out "
- + timeout + " " + unit);
- }
-
- synchronized void set(V v) {
- Preconditions.checkNotNull(v);
- Preconditions.checkState(value == null);
- value = v;
- notify();
- }
-
- synchronized boolean isDone() {
- return value != null;
- }
- }
-
- static class AsyncCall extends RetryInvocationHandler.Call {
- private final AsyncCallHandler asyncCallHandler;
-
- private final AsyncValue<CallReturn> asyncCallReturn = new AsyncValue<>();
- private AsyncGet<?, Exception> lowerLayerAsyncGet;
-
- AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
- RetryInvocationHandler.Counters counters,
- RetryInvocationHandler<?> retryInvocationHandler,
- AsyncCallHandler asyncCallHandler) {
- super(method, args, isRpc, callId, counters, retryInvocationHandler);
-
- this.asyncCallHandler = asyncCallHandler;
- }
-
- /** @return true if the call is done; otherwise, return false. */
- boolean isDone() {
- final CallReturn r = invokeOnce();
- switch (r.getState()) {
- case RETURNED:
- case EXCEPTION:
- asyncCallReturn.set(r); // the async call is done
- return true;
- case RETRY:
- invokeOnce();
- break;
- case ASYNC_CALL_IN_PROGRESS:
- case ASYNC_INVOKED:
- // nothing to do
- break;
- default:
- Preconditions.checkState(false);
- }
- return false;
- }
-
- @Override
- CallReturn invoke() throws Throwable {
- LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
- if (lowerLayerAsyncGet != null) {
- // async call was submitted early, check the lower level async call
- final boolean isDone = lowerLayerAsyncGet.isDone();
- LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
- if (!isDone) {
- return CallReturn.ASYNC_CALL_IN_PROGRESS;
- }
- try {
- return new CallReturn(lowerLayerAsyncGet.get(0, TimeUnit.SECONDS));
- } finally {
- lowerLayerAsyncGet = null;
- }
- }
-
- // submit a new async call
- LOG.trace("invoke: ASYNC_INVOKED");
- final boolean mode = Client.isAsynchronousMode();
- try {
- Client.setAsynchronousMode(true);
- final Object r = invokeMethod();
- // invokeMethod should set LOWER_LAYER_ASYNC_RETURN and return null.
- Preconditions.checkState(r == null);
- lowerLayerAsyncGet = getLowerLayerAsyncReturn();
-
- if (counters.isZeros()) {
- // first async attempt, initialize
- LOG.trace("invoke: initAsyncCall");
- asyncCallHandler.initAsyncCall(this, asyncCallReturn);
- }
- return CallReturn.ASYNC_INVOKED;
- } finally {
- Client.setAsynchronousMode(mode);
- }
- }
- }
-
- private final AsyncCallQueue asyncCalls = new AsyncCallQueue();
- private volatile boolean hasSuccessfulCall = false;
-
- AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
- int callId, RetryInvocationHandler.Counters counters,
- RetryInvocationHandler<?> retryInvocationHandler) {
- return new AsyncCall(method, args, isRpc, callId, counters,
- retryInvocationHandler, this);
- }
-
- boolean hasSuccessfulCall() {
- return hasSuccessfulCall;
- }
-
- private void initAsyncCall(final AsyncCall asyncCall,
- final AsyncValue<CallReturn> asyncCallReturn) {
- asyncCalls.addCall(asyncCall);
-
- final AsyncGet<Object, Throwable> asyncGet
- = new AsyncGet<Object, Throwable>() {
- @Override
- public Object get(long timeout, TimeUnit unit) throws Throwable {
- final CallReturn c = asyncCallReturn.waitAsyncValue(timeout, unit);
- final Object r = c.getReturnValue();
- hasSuccessfulCall = true;
- return r;
- }
-
- @Override
- public boolean isDone() {
- return asyncCallReturn.isDone();
- }
- };
- ASYNC_RETURN.set(asyncGet);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
deleted file mode 100644
index 943725c..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.retry;
-
-import com.google.common.base.Preconditions;
-
-/** The call return from a method invocation. */
-class CallReturn {
- /** The return state. */
- enum State {
- /** Call is returned successfully. */
- RETURNED,
- /** Call throws an exception. */
- EXCEPTION,
- /** Call should be retried according to the {@link RetryPolicy}. */
- RETRY,
- /** Call, which is async, is still in progress. */
- ASYNC_CALL_IN_PROGRESS,
- /** Call, which is async, just has been invoked. */
- ASYNC_INVOKED
- }
-
- static final CallReturn ASYNC_CALL_IN_PROGRESS = new CallReturn(
- State.ASYNC_CALL_IN_PROGRESS);
- static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
- static final CallReturn RETRY = new CallReturn(State.RETRY);
-
- private final Object returnValue;
- private final Throwable thrown;
- private final State state;
-
- CallReturn(Object r) {
- this(r, null, State.RETURNED);
- }
- CallReturn(Throwable t) {
- this(null, t, State.EXCEPTION);
- Preconditions.checkNotNull(t);
- }
- private CallReturn(State s) {
- this(null, null, s);
- }
- private CallReturn(Object r, Throwable t, State s) {
- Preconditions.checkArgument(r == null || t == null);
- returnValue = r;
- thrown = t;
- state = s;
- }
-
- State getState() {
- return state;
- }
-
- Object getReturnValue() throws Throwable {
- if (state == State.EXCEPTION) {
- throw thrown;
- }
- Preconditions.checkState(state == State.RETURNED, "state == %s", state);
- return returnValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index f2b2c99..300d0c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -42,83 +42,11 @@ import java.util.Map;
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
- static class Call {
- private final Method method;
- private final Object[] args;
- private final boolean isRpc;
- private final int callId;
- final Counters counters;
-
- private final RetryPolicy retryPolicy;
- private final RetryInvocationHandler<?> retryInvocationHandler;
-
- Call(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
- this.method = method;
- this.args = args;
- this.isRpc = isRpc;
- this.callId = callId;
- this.counters = counters;
-
- this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
- this.retryInvocationHandler = retryInvocationHandler;
- }
-
- /** Invoke the call once without retrying. */
- synchronized CallReturn invokeOnce() {
- try {
- // The number of times this invocation handler has ever been failed over
- // before this method invocation attempt. Used to prevent concurrent
- // failed method invocations from triggering multiple failover attempts.
- final long failoverCount = retryInvocationHandler.getFailoverCount();
- try {
- return invoke();
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(this, e);
- }
- if (Thread.currentThread().isInterrupted()) {
- // If interrupted, do not retry.
- throw e;
- }
- retryInvocationHandler.handleException(
- method, retryPolicy, failoverCount, counters, e);
- return CallReturn.RETRY;
- }
- } catch(Throwable t) {
- return new CallReturn(t);
- }
- }
-
- CallReturn invoke() throws Throwable {
- return new CallReturn(invokeMethod());
- }
-
- Object invokeMethod() throws Throwable {
- if (isRpc) {
- Client.setCallIdAndRetryCount(callId, counters.retries);
- }
- return retryInvocationHandler.invokeMethod(method, args);
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "#" + callId + ": "
- + method.getDeclaringClass().getSimpleName() + "." + method.getName()
- + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
- + ")";
- }
- }
-
- static class Counters {
+ private static class Counters {
/** Counter for retries. */
private int retries;
/** Counter for method invocation has been failed over. */
private int failovers;
-
- boolean isZeros() {
- return retries == 0 && failovers == 0;
- }
}
private static class ProxyDescriptor<T> {
@@ -216,13 +144,11 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
private final ProxyDescriptor<T> proxyDescriptor;
- private volatile boolean hasSuccessfulCall = false;
-
+ private volatile boolean hasMadeASuccessfulCall = false;
+
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
- private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();
-
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
@@ -241,35 +167,38 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return policy != null? policy: defaultPolicy;
}
- private long getFailoverCount() {
- return proxyDescriptor.getFailoverCount();
- }
-
- private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
- Counters counters) {
- if (Client.isAsynchronousMode()) {
- return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
- counters, this);
- } else {
- return new Call(method, args, isRpc, callId, counters, this);
- }
- }
-
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
- final Counters counters = new Counters();
+ return invoke(method, args, isRpc, callId, new Counters());
+ }
+
+ private Object invoke(final Method method, final Object[] args,
+ final boolean isRpc, final int callId, final Counters counters)
+ throws Throwable {
+ final RetryPolicy policy = getRetryPolicy(method);
- final Call call = newCall(method, args, isRpc, callId, counters);
while (true) {
- final CallReturn c = call.invokeOnce();
- final CallReturn.State state = c.getState();
- if (state == CallReturn.State.ASYNC_INVOKED) {
- return null; // return null for async calls
- } else if (c.getState() != CallReturn.State.RETRY) {
- return c.getReturnValue();
+ // The number of times this invocation handler has ever been failed over,
+ // before this method invocation attempt. Used to prevent concurrent
+ // failed method invocations from triggering multiple failover attempts.
+ final long failoverCount = proxyDescriptor.getFailoverCount();
+
+ if (isRpc) {
+ Client.setCallIdAndRetryCount(callId, counters.retries);
+ }
+ try {
+ final Object ret = invokeMethod(method, args);
+ hasMadeASuccessfulCall = true;
+ return ret;
+ } catch (Exception ex) {
+ if (Thread.currentThread().isInterrupted()) {
+ // If interrupted, do not retry.
+ throw ex;
+ }
+ handleException(method, policy, failoverCount, counters, ex);
}
}
}
@@ -310,8 +239,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
final int failovers, final long delay, final Exception ex) {
// log info if this has made some successful calls or
// this is not the first failover
- final boolean info = hasSuccessfulCall || failovers != 0
- || asyncCallHandler.hasSuccessfulCall();
+ final boolean info = hasMadeASuccessfulCall || failovers != 0;
if (!info && !LOG.isDebugEnabled()) {
return;
}
@@ -337,9 +265,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (!method.isAccessible()) {
method.setAccessible(true);
}
- final Object r = method.invoke(proxyDescriptor.getProxy(), args);
- hasSuccessfulCall = true;
- return r;
+ return method.invoke(proxyDescriptor.getProxy(), args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
index c0a14b7..131aa8f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.io.retry;
-import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
@@ -648,9 +647,8 @@ public class RetryPolicies {
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "retries ("
+ retries + ") exceeded maximum allowed (" + maxRetries + ")");
}
-
+
if (e instanceof ConnectException ||
- e instanceof EOFException ||
e instanceof NoRouteToHostException ||
e instanceof UnknownHostException ||
e instanceof StandbyException ||
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2820c93..23b14e1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
@@ -93,8 +94,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
- ASYNC_RPC_RESPONSE = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+ = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -105,9 +106,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T extends Writable> AsyncGet<T, IOException>
- getAsyncRpcResponse() {
- return (AsyncGet<T, IOException>) ASYNC_RPC_RESPONSE.get();
+ public static <T> Future<T> getAsyncRpcResponse() {
+ return (Future<T>) ASYNC_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -1414,16 +1414,9 @@ public class Client implements AutoCloseable {
}
}
}
-
- @Override
- public boolean isDone() {
- synchronized (call) {
- return call.done;
- }
- }
};
- ASYNC_RPC_RESPONSE.set(asyncGet);
+ ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
return null;
} else {
return getRpcResponse(call, connection, -1, null);
@@ -1468,8 +1461,10 @@ public class Client implements AutoCloseable {
synchronized (call) {
while (!call.done) {
try {
- AsyncGet.Util.wait(call, timeout, unit);
- if (timeout >= 0 && !call.done) {
+ final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+ timeout, unit);
+ call.wait(waitTimeout); // wait for the result
+ if (waitTimeout > 0 && !call.done) {
return null;
}
} catch (InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index ea629b1..4641a67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -54,6 +54,7 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -255,18 +256,14 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final AsyncGet<RpcResponseWrapper, IOException> arr
- = Client.getAsyncRpcResponse();
+ final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
- return getReturnMessage(method, arr.get(timeout, unit));
- }
-
- @Override
- public boolean isDone() {
- return arr.isDone();
+ final RpcResponseWrapper rrw = timeout < 0?
+ frrw.get(): frrw.get(timeout, unit);
+ return getReturnMessage(method, rrw);
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
index f124890..5eac869 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -47,19 +47,14 @@ public interface AsyncGet<R, E extends Throwable> {
R get(long timeout, TimeUnit unit)
throws E, TimeoutException, InterruptedException;
- /** @return true if the underlying computation is done; false, otherwise. */
- boolean isDone();
-
/** Utility */
class Util {
- /** Use {@link #get(long, TimeUnit)} timeout parameters to wait. */
- public static void wait(Object obj, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (timeout < 0) {
- obj.wait();
- } else if (timeout > 0) {
- obj.wait(unit.toMillis(timeout));
- }
+ /**
+ * @return {@link Object#wait(long)} timeout converted
+ * from {@link #get(long, TimeUnit)} timeout.
+ */
+ public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
+ return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index ef27e12..7623975 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.ipc.TestIPC.TestServer;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -51,12 +50,6 @@ public class TestAsyncIPC {
private static Configuration conf;
private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
- static <T extends Writable> AsyncGetFuture<T, IOException>
- getAsyncRpcResponseFuture() {
- return (AsyncGetFuture<T, IOException>) new AsyncGetFuture<>(
- Client.getAsyncRpcResponse());
- }
-
@Before
public void setupConf() {
conf = new Configuration();
@@ -91,7 +84,7 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+ Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
@@ -212,7 +205,7 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- Future<LongWritable> returnFuture = getAsyncRpcResponseFuture();
+ Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 472b1d4..b507fa5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
@@ -51,8 +52,10 @@ public class AsyncDistributedFileSystem {
this.dfs = dfs;
}
- private static <T> Future<T> getReturnValue() {
- return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
+ static <T> Future<T> getReturnValue() {
+ final AsyncGet<T, Exception> asyncGet
+ = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
+ return new AsyncGetFuture<>(asyncGet);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index dd87b0b..b9dcee5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -175,7 +176,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -204,6 +204,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
+ private static final ThreadLocal<AsyncGet<?, Exception>>
+ ASYNC_RETURN_VALUE = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -236,6 +238,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy = proxy;
}
+ @SuppressWarnings("unchecked")
+ @Unstable
+ public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+ return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+ }
+
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@@ -374,13 +382,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
asyncReturnMessage.get(timeout, unit);
return null;
}
-
- @Override
- public boolean isDone() {
- return asyncReturnMessage.isDone();
- }
};
- AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+ ASYNC_RETURN_VALUE.set(asyncGet);
}
@Override
@@ -1352,20 +1355,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
rpcProxy.getAclStatus(null, req);
final AsyncGet<Message, Exception> asyncReturnMessage
= ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<AclStatus, Exception> asyncGet
- = new AsyncGet<AclStatus, Exception>() {
- @Override
- public AclStatus get(long timeout, TimeUnit unit) throws Exception {
- return PBHelperClient.convert((GetAclStatusResponseProto)
- asyncReturnMessage.get(timeout, unit));
- }
-
- @Override
- public boolean isDone() {
- return asyncReturnMessage.isDone();
- }
- };
- AsyncCallHandler.setLowerLayerAsyncReturn(asyncGet);
+ final AsyncGet<AclStatus, Exception> asyncGet =
+ new AsyncGet<AclStatus, Exception>() {
+ @Override
+ public AclStatus get(long timeout, TimeUnit unit)
+ throws Exception {
+ return PBHelperClient
+ .convert((GetAclStatusResponseProto) asyncReturnMessage
+ .get(timeout, unit));
+ }
+ };
+ ASYNC_RETURN_VALUE.set(asyncGet);
return null;
} else {
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
index 6a60290..c7615a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator;
import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers;
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -71,7 +70,7 @@ public class TestAsyncDFS {
public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class);
private final short replFactor = 1;
private final long blockSize = 512;
- private long fileLen = 0;
+ private long fileLen = blockSize * 3;
private final long seed = Time.now();
private final Random r = new Random(seed);
private final PermissionGenerator permGenerator = new PermissionGenerator(r);
@@ -81,7 +80,7 @@ public class TestAsyncDFS {
private Configuration conf;
private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
+ private FileSystem fs;
private AsyncDistributedFileSystem adfs;
@Before
@@ -96,10 +95,10 @@ public class TestAsyncDFS {
ASYNC_CALL_LIMIT);
// set server handlers
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
- fs = cluster.getFileSystem();
- adfs = fs.getAsyncDistributedFileSystem();
+ fs = FileSystem.get(conf);
+ adfs = cluster.getFileSystem().getAsyncDistributedFileSystem();
}
@After
@@ -114,6 +113,31 @@ public class TestAsyncDFS {
}
}
+ static class AclQueueEntry {
+ private final Object future;
+ private final Path path;
+ private final Boolean isSetAcl;
+
+ AclQueueEntry(final Object future, final Path path,
+ final Boolean isSetAcl) {
+ this.future = future;
+ this.path = path;
+ this.isSetAcl = isSetAcl;
+ }
+
+ public final Object getFuture() {
+ return future;
+ }
+
+ public final Path getPath() {
+ return path;
+ }
+
+ public final Boolean isSetAcl() {
+ return this.isSetAcl;
+ }
+ }
+
@Test(timeout=60000)
public void testBatchAsyncAcl() throws Exception {
final String basePath = "testBatchAsyncAcl";
@@ -324,7 +348,7 @@ public class TestAsyncDFS {
public static void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
- assertTrue(e.getCause() instanceof RemoteException);
+ assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
@@ -446,9 +470,4 @@ public class TestAsyncDFS {
assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup()));
}
}
-
- @Test
- public void testAsyncWithoutRetry() throws Exception {
- TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
deleted file mode 100644
index 70ca03d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.io.retry.AsyncCallHandler;
-import org.apache.hadoop.io.retry.RetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/** Test async methods with HA setup. */
-public class TestAsyncHDFSWithHA {
- static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class);
- static {
- GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL);
- }
-
- private static <T> Future<T> getReturnValue() {
- return (Future<T>)new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn());
- }
-
- static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs,
- Path[] dsts) throws IOException {
- for (int i = 0; i < srcs.length; i++) {
- srcs[i] = new Path(dir, "src" + i);
- dsts[i] = new Path(dir, "dst" + i);
- dfs.mkdirs(srcs[i]);
- }
- }
-
- static void runTestAsyncWithoutRetry(Configuration conf,
- MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception {
- final int num = 5;
-
- final String renameDir = "/testAsyncWithoutRetry/";
- final Path[] srcs = new Path[num + 1];
- final Path[] dsts = new Path[num + 1];
- mkdirs(dfs, renameDir, srcs, dsts);
-
- // create a proxy without retry.
- final NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> proxyInfo
- = NameNodeProxies.createNonHAProxy(conf,
- cluster.getNameNode(0).getNameNodeAddress(),
- ClientProtocol.class, UserGroupInformation.getCurrentUser(),
- false);
- final ClientProtocol cp = proxyInfo.getProxy();
-
- // submit async calls
- Client.setAsynchronousMode(true);
- final List<Future<Void>> results = new ArrayList<>();
- for (int i = 0; i < num; i++) {
- final String src = srcs[i].toString();
- final String dst = dsts[i].toString();
- LOG.info(i + ") rename " + src + " -> " + dst);
- cp.rename2(src, dst);
- final Future<Void> returnValue = getReturnValue();
- results.add(returnValue);
- }
- Client.setAsynchronousMode(false);
-
- // wait for the async calls
- for (Future<Void> f : results) {
- f.get();
- }
-
- //check results
- for (int i = 0; i < num; i++) {
- Assert.assertEquals(false, dfs.exists(srcs[i]));
- Assert.assertEquals(true, dfs.exists(dsts[i]));
- }
- }
-
- /** Testing HDFS async methods with HA setup. */
- @Test(timeout = 120000)
- public void testAsyncWithHAFailover() throws Exception {
- final int num = 10;
-
- final Configuration conf = new HdfsConfiguration();
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .numDataNodes(0).build();
-
- try {
- cluster.waitActive();
- cluster.transitionToActive(0);
-
- final DistributedFileSystem dfs = HATestUtil.configureFailoverFs(
- cluster, conf);
- runTestAsyncWithoutRetry(conf, cluster, dfs);
-
- final String renameDir = "/testAsyncWithHAFailover/";
- final Path[] srcs = new Path[num + 1];
- final Path[] dsts = new Path[num + 1];
- mkdirs(dfs, renameDir, srcs, dsts);
-
- // submit async calls and trigger failover in the middle.
- final AsyncDistributedFileSystem adfs
- = dfs.getAsyncDistributedFileSystem();
- final ExecutorService executor = Executors.newFixedThreadPool(num + 1);
-
- final List<Future<Void>> results = new ArrayList<>();
- final List<IOException> exceptions = new ArrayList<>();
- final List<Future<?>> futures = new ArrayList<>();
- final int half = num/2;
- for(int i = 0; i <= num; i++) {
- final int id = i;
- futures.add(executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- if (id == half) {
- // failover
- cluster.shutdownNameNode(0);
- cluster.transitionToActive(1);
- } else {
- // rename
- results.add(adfs.rename(srcs[id], dsts[id]));
- }
- } catch (IOException e) {
- exceptions.add(e);
- }
- }
- }));
- }
-
- // wait for the tasks
- Assert.assertEquals(num + 1, futures.size());
- for(int i = 0; i <= num; i++) {
- futures.get(i).get();
- }
- // wait for the async calls
- Assert.assertEquals(num, results.size());
- Assert.assertTrue(exceptions.isEmpty());
- for(Future<Void> r : results) {
- r.get();
- }
-
- // check results
- for(int i = 0; i <= num; i++) {
- final boolean renamed = i != half;
- Assert.assertEquals(!renamed, dfs.exists(srcs[i]));
- Assert.assertEquals(renamed, dfs.exists(dsts[i]));
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/886e2396/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 05e8412..c7c4a77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -132,8 +131,7 @@ public abstract class HATestUtil {
}
/** Gets the filesystem instance by setting the failover configurations */
- public static DistributedFileSystem configureFailoverFs(
- MiniDFSCluster cluster, Configuration conf)
+ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
throws IOException, URISyntaxException {
return configureFailoverFs(cluster, conf, 0);
}
@@ -145,14 +143,13 @@ public abstract class HATestUtil {
* @param nsIndex namespace index starting with zero
* @throws IOException if an error occurs rolling the edit log
*/
- public static DistributedFileSystem configureFailoverFs(
- MiniDFSCluster cluster, Configuration conf,
+ public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
int nsIndex) throws IOException, URISyntaxException {
conf = new Configuration(conf);
String logicalName = getLogicalHostname(cluster);
setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
- return (DistributedFileSystem)fs;
+ return fs;
}
public static void setFailoverConfigurations(MiniDFSCluster cluster,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org