You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/06/04 01:23:15 UTC
[5/8] hadoop git commit: Revert "HADOOP-13168. Support Future.get
with timeout in ipc async calls."
Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."
This reverts commit 42c22f7e3d6e88bf1115f617f6e803288886d1ac.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4450d47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4450d47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4450d47
Branch: refs/heads/trunk
Commit: e4450d47f19131818e1c040b6bd8d85ae8250475
Parents: b82c74b
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jun 3 18:09:16 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jun 3 18:09:16 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ipc/Client.java | 119 ++++++++++--------
.../apache/hadoop/ipc/ProtobufRpcEngine.java | 62 +++++-----
.../apache/hadoop/util/concurrent/AsyncGet.java | 60 ---------
.../hadoop/util/concurrent/AsyncGetFuture.java | 73 -----------
.../org/apache/hadoop/ipc/TestAsyncIPC.java | 124 ++++++++-----------
.../hadoop/hdfs/AsyncDistributedFileSystem.java | 24 +++-
.../ClientNamenodeProtocolTranslatorPB.java | 33 +++--
7 files changed, 185 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d1d5b17..9be4649 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,46 @@
package org.apache.hadoop.ipc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.Tracer;
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-import java.io.*;
-import java.net.*;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
-import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
- = new ThreadLocal<>();
+ private static final ThreadLocal<Future<?>>
+ RETURN_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
@SuppressWarnings("unchecked")
@Unstable
- public static <T> Future<T> getAsyncRpcResponse() {
- return (Future<T>) ASYNC_RPC_RESPONSE.get();
+ public static <T> Future<T> getReturnRpcResponse() {
+ return (Future<T>) RETURN_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@@ -354,11 +379,6 @@ public class Client implements AutoCloseable {
}
}
- @Override
- public String toString() {
- return getClass().getSimpleName() + id;
- }
-
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
@@ -1393,32 +1413,27 @@ public class Client implements AutoCloseable {
}
if (isAsynchronousMode()) {
- final AsyncGet<Writable, IOException> asyncGet
- = new AsyncGet<Writable, IOException>() {
+ Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+ private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
- public Writable get(long timeout, TimeUnit unit)
- throws IOException, TimeoutException{
- boolean done = true;
- try {
- final Writable w = getRpcResponse(call, connection, timeout, unit);
- if (w == null) {
- done = false;
- throw new TimeoutException(call + " timed out "
- + timeout + " " + unit);
- }
- return w;
- } finally {
- if (done) {
+ public Writable get() throws InterruptedException, ExecutionException {
+ if (callled.compareAndSet(false, true)) {
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
+ } finally {
releaseAsyncCall();
}
}
+ return super.get();
}
};
- ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+ RETURN_RPC_RESPONSE.set(returnFuture);
return null;
} else {
- return getRpcResponse(call, connection, -1, null);
+ return getRpcResponse(call, connection);
}
}
@@ -1454,18 +1469,12 @@ public class Client implements AutoCloseable {
return asyncCallCounter.get();
}
- /** @return the rpc response or, in case of timeout, null. */
- private Writable getRpcResponse(final Call call, final Connection connection,
- final long timeout, final TimeUnit unit) throws IOException {
+ private Writable getRpcResponse(final Call call, final Connection connection)
+ throws IOException {
synchronized (call) {
while (!call.done) {
try {
- final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
- timeout, unit);
- call.wait(waitTimeout); // wait for the result
- if (waitTimeout > 0 && !call.done) {
- return null;
- }
+ call.wait(); // wait for the result
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0f43fc6..8fcdb78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,9 +18,21 @@
package org.apache.hadoop.ipc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
/**
* RPC Engine for for protobuf based RPCs.
@@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
- private static final ThreadLocal<AsyncGet<Message, Exception>>
- ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
+ private static final ThreadLocal<Callable<?>>
+ RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
+ @SuppressWarnings("unchecked")
@Unstable
- public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
- return ASYNC_RETURN_MESSAGE.get();
+ public static <T> Callable<T> getReturnMessageCallback() {
+ return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
}
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Client.isAsynchronousMode()) {
- final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
- final AsyncGet<Message, Exception> asyncGet
- = new AsyncGet<Message, Exception>() {
+ final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+ Callable<Message> callback = new Callable<Message>() {
@Override
- public Message get(long timeout, TimeUnit unit) throws Exception {
- final RpcResponseWrapper rrw = timeout < 0?
- frrw.get(): frrw.get(timeout, unit);
- return getReturnMessage(method, rrw);
+ public Message call() throws Exception {
+ return getReturnMessage(method, frrw.get());
}
};
- ASYNC_RETURN_MESSAGE.set(asyncGet);
+ RETURN_MESSAGE_CALLBACK.set(callback);
return null;
} else {
return getReturnMessage(method, val);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
deleted file mode 100644
index 5eac869..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
- *
- * When the return value is still being computed, invoking
- * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
- * The method should be invoked again and again
- * until the underlying computation is completed.
- *
- * @param <R> The type of the return value.
- * @param <E> The exception type that the underlying implementation may throw.
- */
-public interface AsyncGet<R, E extends Throwable> {
- /**
- * Get the result.
- *
- * @param timeout The maximum time period to wait.
- * When timeout == 0, it does not wait at all.
- * When timeout < 0, it waits indefinitely.
- * @param unit The unit of the timeout value
- * @return the result, which is possibly null.
- * @throws E an exception thrown by the underlying implementation.
- * @throws TimeoutException if it cannot return after the given time period.
- * @throws InterruptedException if the thread is interrupted.
- */
- R get(long timeout, TimeUnit unit)
- throws E, TimeoutException, InterruptedException;
-
- /** Utility */
- class Util {
- /**
- * @return {@link Object#wait(long)} timeout converted
- * from {@link #get(long, TimeUnit)} timeout.
- */
- public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
- return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
deleted file mode 100644
index d687867..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/** A {@link Future} implemented using an {@link AsyncGet} object. */
-public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
- public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
-
- private final AtomicBoolean called = new AtomicBoolean(false);
- private final AsyncGet<T, E> asyncGet;
-
- public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
- this.asyncGet = asyncGet;
- }
-
- private void callAsyncGet(long timeout, TimeUnit unit) {
- if (!isCancelled() && called.compareAndSet(false, true)) {
- try {
- set(asyncGet.get(timeout, unit));
- } catch (TimeoutException te) {
- LOG.trace("TRACE", te);
- called.compareAndSet(true, false);
- } catch (Throwable e) {
- LOG.trace("TRACE", e);
- setException(e);
- }
- }
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- callAsyncGet(-1, TimeUnit.MILLISECONDS);
- return super.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException, ExecutionException {
- callAsyncGet(timeout, unit);
- return super.get(0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public boolean isDone() {
- callAsyncGet(0, TimeUnit.MILLISECONDS);
- return super.isDone();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 0ad191b..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,6 +18,20 @@
package org.apache.hadoop.ipc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,17 +48,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
public class TestAsyncIPC {
private static Configuration conf;
@@ -84,50 +87,26 @@ public class TestAsyncIPC {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
- returnFutures.put(i, Client.getAsyncRpcResponse());
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+ returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
- throw new RuntimeException(e);
}
}
}
- void assertReturnValues() throws InterruptedException, ExecutionException {
+ public void waitForReturnValues() throws InterruptedException,
+ ExecutionException {
for (int i = 0; i < count; i++) {
LongWritable value = returnFutures.get(i).get();
- Assert.assertEquals("call" + i + " failed.",
- expectedValues.get(i).longValue(), value.get());
- }
- Assert.assertFalse(failed);
- }
-
- void assertReturnValues(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException {
- final boolean[] checked = new boolean[count];
- for(boolean done = false; !done;) {
- done = true;
- for (int i = 0; i < count; i++) {
- if (checked[i]) {
- continue;
- } else {
- done = false;
- }
-
- final LongWritable value;
- try {
- value = returnFutures.get(i).get(timeout, unit);
- } catch (TimeoutException e) {
- LOG.info("call" + i + " caught ", e);
- continue;
- }
-
- Assert.assertEquals("call" + i + " failed.",
- expectedValues.get(i).longValue(), value.get());
- checked[i] = true;
+ if (expectedValues.get(i) != value.get()) {
+ LOG.fatal(String.format("Call-%d failed!", i));
+ failed = true;
+ break;
}
}
- Assert.assertFalse(failed);
}
}
@@ -204,7 +183,8 @@ public class TestAsyncIPC {
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
- returnFutures.put(idx, Client.getAsyncRpcResponse());
+ Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+ returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
@@ -253,7 +233,10 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
- callers[i].assertReturnValues();
+ callers[i].waitForReturnValues();
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
@@ -275,37 +258,25 @@ public class TestAsyncIPC {
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
- caller.assertReturnValues();
- caller.assertReturnValues();
- caller.assertReturnValues();
- Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
- } finally {
- client.stop();
- server.stop();
- }
- }
- @Test(timeout = 60000)
- public void testFutureGetWithTimeout() throws IOException,
- InterruptedException, ExecutionException {
-// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
- final Server server = new TestIPC.TestServer(10, true, conf);
- final InetSocketAddress addr = NetUtils.getConnectAddress(server);
- server.start();
+ caller.waitForReturnValues();
+ String msg = String.format(
+ "First time, expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
- final Client client = new Client(LongWritable.class, conf);
+ caller.waitForReturnValues();
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ msg = String.format("Second time, expected not failed for caller: %s.",
+ caller);
+ assertFalse(msg, caller.failed);
- try {
- final AsyncCaller caller = new AsyncCaller(client, addr, 10);
- caller.run();
- caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally {
client.stop();
server.stop();
}
}
-
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
@@ -396,7 +367,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -433,7 +406,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -468,7 +443,9 @@ public class TestAsyncIPC {
server.start();
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
- caller.assertReturnValues();
+ caller.waitForReturnValues();
+ String msg = String.format("Expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
} finally {
client.stop();
server.stop();
@@ -512,7 +489,10 @@ public class TestAsyncIPC {
}
for (int i = 0; i < callerCount; ++i) {
callers[i].join();
- callers[i].assertReturnValues();
+ callers[i].waitForReturnValues();
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
}
} finally {
client.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 6bfd71d..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,16 +19,20 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
import org.apache.hadoop.ipc.Client;
+import com.google.common.util.concurrent.AbstractFuture;
+
/****************************************************************
* Implementation of the asynchronous distributed file system.
* This instance of this class is the way end-user code interacts
@@ -48,8 +52,22 @@ public class AsyncDistributedFileSystem {
}
static <T> Future<T> getReturnValue() {
- return new AsyncGetFuture<>(
- ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
+ final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+ .getReturnValueCallback();
+ Future<T> returnFuture = new AbstractFuture<T>() {
+ private final AtomicBoolean called = new AtomicBoolean(false);
+ public T get() throws InterruptedException, ExecutionException {
+ if (called.compareAndSet(false, true)) {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
+ }
+ return super.get();
+ }
+ };
+ return returnFuture;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 939c1ac..faa925c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,8 +24,7 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
-
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -199,7 +198,6 @@ import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.util.concurrent.AsyncGet;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -211,8 +209,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
- private static final ThreadLocal<AsyncGet<?, Exception>>
- ASYNC_RETURN_VALUE = new ThreadLocal<>();
+ private static final ThreadLocal<Callable<?>>
+ RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@@ -248,8 +246,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@SuppressWarnings("unchecked")
@Unstable
- public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
- return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+ public static <T> Callable<T> getReturnValueCallback() {
+ return (Callable<T>) RETURN_VALUE_CALLBACK.get();
}
@Override
@@ -371,7 +369,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setPermission(null, req);
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.setPermission(null, req);
}
@@ -380,18 +378,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
- private void setAsyncReturnValue() {
- final AsyncGet<Message, Exception> asyncReturnMessage
- = ProtobufRpcEngine.getAsyncReturnMessage();
- final AsyncGet<Void, Exception> asyncGet
- = new AsyncGet<Void, Exception>() {
+ private void setReturnValueCallback() {
+ final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+ .getReturnMessageCallback();
+ Callable<Void> callBack = new Callable<Void>() {
@Override
- public Void get(long timeout, TimeUnit unit) throws Exception {
- asyncReturnMessage.get(timeout, unit);
+ public Void call() throws Exception {
+ returnMessageCallback.call();
return null;
}
};
- ASYNC_RETURN_VALUE.set(asyncGet);
+ RETURN_VALUE_CALLBACK.set(callBack);
}
@Override
@@ -406,7 +403,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.setOwner(null, req.build());
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.setOwner(null, req.build());
}
@@ -539,7 +536,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
- setAsyncReturnValue();
+ setReturnValueCallback();
} else {
rpcProxy.rename2(null, req);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org