You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:33 UTC

[28/50] [abbrv] flink git commit: [FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 01776ed..957453a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
@@ -37,6 +34,7 @@ import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,8 +65,8 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public ExecutionContext getExecutionContext() {
-		return ExecutionContexts.fromExecutorService(executorService);
+	public Executor getExecutor() {
+		return executorService;
 	}
 
 	@Override
@@ -94,7 +92,7 @@ public class TestingSerialRpcService implements RpcService {
 			classLoader,
 			new Class<?>[]{
 				rpcEndpoint.getSelfGatewayType(),
-				MainThreadExecutor.class,
+				MainThreadExecutable.class,
 				StartStoppable.class,
 				RpcGateway.class
 			},
@@ -114,13 +112,13 @@ public class TestingSerialRpcService implements RpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return Futures.successful(typedGateway);
+				return FlinkCompletableFuture.completed(typedGateway);
 			} else {
-				return Futures.failed(
+				return FlinkCompletableFuture.completedExceptionally(
 					new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
 		} else {
-			return Futures.failed(new Exception("No gateway registered under that name"));
+			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
 		}
 	}
 
@@ -141,20 +139,20 @@ public class TestingSerialRpcService implements RpcService {
 		registeredConnections.clear();
 	}
 
-	private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+	private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutable, StartStoppable {
 
 		private final T rpcEndpoint;
 
 		/** default timeout for asks */
-		private final Timeout timeout;
+		private final Time timeout;
 
 		private final String address;
 
 		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
-			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+			this(address, rpcEndpoint, Time.seconds(10));
 		}
 
-		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
+		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) {
 			this.rpcEndpoint = rpcEndpoint;
 			this.timeout = timeout;
 			this.address = address;
@@ -163,7 +161,7 @@ public class TestingSerialRpcService implements RpcService {
 		@Override
 		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 			Class<?> declaringClass = method.getDeclaringClass();
-			if (declaringClass.equals(MainThreadExecutor.class) ||
+			if (declaringClass.equals(MainThreadExecutable.class) ||
 				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
 				declaringClass.equals(RpcGateway.class)) {
 				return method.invoke(this, args);
@@ -171,7 +169,7 @@ public class TestingSerialRpcService implements RpcService {
 				final String methodName = method.getName();
 				Class<?>[] parameterTypes = method.getParameterTypes();
 				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+				Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
 
 				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
 					parameterTypes,
@@ -201,13 +199,13 @@ public class TestingSerialRpcService implements RpcService {
 		private Object handleRpcInvocationSync(final String methodName,
 			final Class<?>[] parameterTypes,
 			final Object[] args,
-			final Timeout futureTimeout) throws Exception {
+			final Time futureTimeout) throws Exception {
 			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
 			Object result = rpcMethod.invoke(rpcEndpoint, args);
 
 			if (result instanceof Future) {
 				Future<?> future = (Future<?>) result;
-				return Await.result(future, futureTimeout.duration());
+				return future.get(futureTimeout.getSize(), futureTimeout.getUnit());
 			} else {
 				return result;
 			}
@@ -219,11 +217,11 @@ public class TestingSerialRpcService implements RpcService {
 		}
 
 		@Override
-		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+		public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
 			try {
-				return Futures.successful(callable.call());
+				return FlinkCompletableFuture.completed(callable.call());
 			} catch (Throwable e) {
-				return Futures.failed(e);
+				return FlinkCompletableFuture.completedExceptionally(e);
 			}
 		}
 
@@ -281,18 +279,18 @@ public class TestingSerialRpcService implements RpcService {
 		 *                             has been found
 		 * @return Timeout extracted from the array of arguments or the default timeout
 		 */
-		private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
-			Timeout defaultTimeout) {
+		private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
+			Time defaultTimeout) {
 			if (args != null) {
 				Preconditions.checkArgument(parameterAnnotations.length == args.length);
 
 				for (int i = 0; i < parameterAnnotations.length; i++) {
 					if (isRpcTimeout(parameterAnnotations[i])) {
-						if (args[i] instanceof FiniteDuration) {
-							return new Timeout((FiniteDuration) args[i]);
+						if (args[i] instanceof Time) {
+							return (Time) args[i];
 						} else {
 							throw new RuntimeException("The rpc timeout parameter must be of type " +
-								FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+								Time.class.getName() + ". The type " + args[i].getClass().getName() +
 								" is not supported.");
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a6ceb91..5624d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -30,13 +31,12 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class AkkaRpcActorTest extends TestLogger {
@@ -47,7 +47,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 
-	private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
+	private static Time timeout = Time.milliseconds(10000L);
 
 	private static AkkaRpcService akkaRpcService =
 		new AkkaRpcService(actorSystem, timeout);
@@ -69,7 +69,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
 
-		DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration());
+		DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
 		assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
 	}
@@ -82,11 +82,12 @@ public class AkkaRpcActorTest extends TestLogger {
 		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
 		try {
-			DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration());
+			DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
 			fail("The rpc connection resolution should have failed.");
-		} catch (RpcConnectionException exception) {
+		} catch (ExecutionException exception) {
 			// we're expecting a RpcConnectionException
+			assertTrue(exception.getCause() instanceof RpcConnectionException);
 		}
 	}
 
@@ -111,7 +112,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		// now process the rpc
 		rpcEndpoint.start();
 
-		Integer actualValue = Await.result(result, timeout.duration());
+		Integer actualValue = result.get(timeout.getSize(), timeout.getUnit());
 
 		assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f55069e..4e9e518 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.TestLogger;
@@ -40,7 +40,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 
 	private static AkkaRpcService akkaRpcService =
-			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+			new AkkaRpcService(actorSystem, Time.milliseconds(10000));
 
 	@AfterClass
 	public static void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ffafda..9ec1f7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -30,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertTrue;
 
 public class MainThreadValidationTest extends TestLogger {
@@ -48,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
 		// actual test
 		AkkaRpcService akkaRpcService = new AkkaRpcService(
 				AkkaUtils.createDefaultActorSystem(),
-				new Timeout(10000, TimeUnit.MILLISECONDS));
+				Time.milliseconds(10000));
 
 		try {
 			TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9d2ed99..0d5dc28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,10 +19,11 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,13 +33,9 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -52,7 +49,7 @@ public class MessageSerializationTest extends TestLogger {
 	private static AkkaRpcService akkaRpcService1;
 	private static AkkaRpcService akkaRpcService2;
 
-	private static final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
+	private static final Time timeout = Time.seconds(10L);
 	private static final int maxFrameSize = 32000;
 
 	@BeforeClass
@@ -63,8 +60,8 @@ public class MessageSerializationTest extends TestLogger {
 		actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
 		actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
 
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, new Timeout(timeout));
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, new Timeout(timeout));
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
 	}
 
 	@AfterClass
@@ -113,7 +110,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
-		TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
 		remoteGateway.foobar(new Object());
 
@@ -134,7 +131,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
-		TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
 		int expected = 42;
 
@@ -158,7 +155,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
-		TestGateway remoteGateway = Await.result(remoteGatewayFuture, timeout);
+		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
 		int bufferSize = maxFrameSize + 1;
 		byte[] buffer = new byte[bufferSize];

http://git-wip-us.apache.org/repos/asf/flink/blob/88a05f5b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a8d5bd7..09aab18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.NonHaServices;
@@ -29,8 +30,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
 
 import static org.junit.Assert.*;
@@ -56,7 +55,7 @@ public class TaskExecutorTest extends TestLogger {
 			taskManager.start();
 
 			verify(rmGateway, timeout(5000)).registerTaskExecutor(
-					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 		}
 		finally {
 			rpc.stopService();
@@ -97,7 +96,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, leaderId1);
 
 			verify(rmGateway1, timeout(5000)).registerTaskExecutor(
-					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+					eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// cancel the leader 
@@ -107,7 +106,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2, timeout(5000)).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 		}
 		finally {