You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:33 UTC
[28/50] [abbrv] flink git commit: [FLINK-4656] [rpc] Port the
existing code to Flink's own future abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 01776ed..957453a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -18,16 +18,13 @@
package org.apache.flink.runtime.rpc;
-import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.Preconditions;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
@@ -37,6 +34,7 @@ import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,8 +65,8 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public ExecutionContext getExecutionContext() {
- return ExecutionContexts.fromExecutorService(executorService);
+ public Executor getExecutor() {
+ return executorService;
}
@Override
@@ -94,7 +92,7 @@ public class TestingSerialRpcService implements RpcService {
classLoader,
new Class<?>[]{
rpcEndpoint.getSelfGatewayType(),
- MainThreadExecutor.class,
+ MainThreadExecutable.class,
StartStoppable.class,
RpcGateway.class
},
@@ -114,13 +112,13 @@ public class TestingSerialRpcService implements RpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return Futures.successful(typedGateway);
+ return FlinkCompletableFuture.completed(typedGateway);
} else {
- return Futures.failed(
+ return FlinkCompletableFuture.completedExceptionally(
new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
} else {
- return Futures.failed(new Exception("No gateway registered under that name"));
+ return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
}
}
@@ -141,20 +139,20 @@ public class TestingSerialRpcService implements RpcService {
registeredConnections.clear();
}
- private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+ private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
private final T rpcEndpoint;
/** default timeout for asks */
- private final Timeout timeout;
+ private final Time timeout;
private final String address;
private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
- this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+ this(address, rpcEndpoint, Time.seconds(10));
}
- private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
+ private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) {
this.rpcEndpoint = rpcEndpoint;
this.timeout = timeout;
this.address = address;
@@ -163,7 +161,7 @@ public class TestingSerialRpcService implements RpcService {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
- if (declaringClass.equals(MainThreadExecutor.class) ||
+ if (declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(RpcGateway.class)) {
return method.invoke(this, args);
@@ -171,7 +169,7 @@ public class TestingSerialRpcService implements RpcService {
final String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
- Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+ Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
parameterTypes,
@@ -201,13 +199,13 @@ public class TestingSerialRpcService implements RpcService {
private Object handleRpcInvocationSync(final String methodName,
final Class<?>[] parameterTypes,
final Object[] args,
- final Timeout futureTimeout) throws Exception {
+ final Time futureTimeout) throws Exception {
final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
Object result = rpcMethod.invoke(rpcEndpoint, args);
if (result instanceof Future) {
Future<?> future = (Future<?>) result;
- return Await.result(future, futureTimeout.duration());
+ return future.get(futureTimeout.getSize(), futureTimeout.getUnit());
} else {
return result;
}
@@ -219,11 +217,11 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+ public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
try {
- return Futures.successful(callable.call());
+ return FlinkCompletableFuture.completed(callable.call());
} catch (Throwable e) {
- return Futures.failed(e);
+ return FlinkCompletableFuture.completedExceptionally(e);
}
}
@@ -281,18 +279,18 @@ public class TestingSerialRpcService implements RpcService {
* has been found
* @return Timeout extracted from the array of arguments or the default timeout
*/
- private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
- Timeout defaultTimeout) {
+ private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
+ Time defaultTimeout) {
if (args != null) {
Preconditions.checkArgument(parameterAnnotations.length == args.length);
for (int i = 0; i < parameterAnnotations.length; i++) {
if (isRpcTimeout(parameterAnnotations[i])) {
- if (args[i] instanceof FiniteDuration) {
- return new Timeout((FiniteDuration) args[i]);
+ if (args[i] instanceof Time) {
+ return (Time) args[i];
} else {
throw new RuntimeException("The rpc timeout parameter must be of type " +
- FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+ Time.class.getName() + ". The type " + args[i].getClass().getName() +
" is not supported.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a6ceb91..5624d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,9 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -30,13 +31,12 @@ import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AkkaRpcActorTest extends TestLogger {
@@ -47,7 +47,7 @@ public class AkkaRpcActorTest extends TestLogger {
private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
- private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
+ private static Time timeout = Time.milliseconds(10000L);
private static AkkaRpcService akkaRpcService =
new AkkaRpcService(actorSystem, timeout);
@@ -69,7 +69,7 @@ public class AkkaRpcActorTest extends TestLogger {
Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
- DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration());
+ DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
}
@@ -82,11 +82,12 @@ public class AkkaRpcActorTest extends TestLogger {
Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
try {
- DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration());
+ DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
fail("The rpc connection resolution should have failed.");
- } catch (RpcConnectionException exception) {
+ } catch (ExecutionException exception) {
// we're expecting a RpcConnectionException
+ assertTrue(exception.getCause() instanceof RpcConnectionException);
}
}
@@ -111,7 +112,7 @@ public class AkkaRpcActorTest extends TestLogger {
// now process the rpc
rpcEndpoint.start();
- Integer actualValue = Await.result(result, timeout.duration());
+ Integer actualValue = result.get(timeout.getSize(), timeout.getUnit());
assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f55069e..4e9e518 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.TestLogger;
@@ -40,7 +40,7 @@ public class AkkaRpcServiceTest extends TestLogger {
private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
private static AkkaRpcService akkaRpcService =
- new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+ new AkkaRpcService(actorSystem, Time.milliseconds(10000));
@AfterClass
public static void shutdown() {
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ffafda..9ec1f7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.runtime.rpc.akka;
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -30,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertTrue;
public class MainThreadValidationTest extends TestLogger {
@@ -48,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
// actual test
AkkaRpcService akkaRpcService = new AkkaRpcService(
AkkaUtils.createDefaultActorSystem(),
- new Timeout(10000, TimeUnit.MILLISECONDS));
+ Time.milliseconds(10000));
try {
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9d2ed99..0d5dc28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,10 +19,11 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,13 +33,9 @@ import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -52,7 +49,7 @@ public class MessageSerializationTest extends TestLogger {
private static AkkaRpcService akkaRpcService1;
private static AkkaRpcService akkaRpcService2;
- private static final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final Time timeout = Time.seconds(10L);
private static final int maxFrameSize = 32000;
@BeforeClass
@@ -63,8 +60,8 @@ public class MessageSerializationTest extends TestLogger {
actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
- akkaRpcService1 = new AkkaRpcService(actorSystem1, new Timeout(timeout));
- akkaRpcService2 = new AkkaRpcService(actorSystem2, new Timeout(timeout));
+ akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
+ akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
}
@AfterClass
@@ -113,7 +110,7 @@ public class MessageSerializationTest extends TestLogger {
Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
- TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+ TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
remoteGateway.foobar(new Object());
@@ -134,7 +131,7 @@ public class MessageSerializationTest extends TestLogger {
Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
- TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+ TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
int expected = 42;
@@ -158,7 +155,7 @@ public class MessageSerializationTest extends TestLogger {
Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
- TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+ TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
int bufferSize = maxFrameSize + 1;
byte[] buffer = new byte[bufferSize];
http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a8d5bd7..09aab18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.NonHaServices;
@@ -29,8 +30,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.UUID;
import static org.junit.Assert.*;
@@ -56,7 +55,7 @@ public class TaskExecutorTest extends TestLogger {
taskManager.start();
verify(rmGateway, timeout(5000)).registerTaskExecutor(
- any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class));
}
finally {
rpc.stopService();
@@ -97,7 +96,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address1, leaderId1);
verify(rmGateway1, timeout(5000)).registerTaskExecutor(
- eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// cancel the leader
@@ -107,7 +106,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address2, leaderId2);
verify(rmGateway2, timeout(5000)).registerTaskExecutor(
- eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+ eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
}
finally {