You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:53 UTC
[11/50] [abbrv] flink git commit: [FLINK-4656] [rpc] Port the
existing code to Flink's own future abstraction
[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction
This closes #2530.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/507e86cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/507e86cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/507e86cf
Branch: refs/heads/flip-6
Commit: 507e86cfd7c22a18d0d839d42c1f3e2a72c1ff8a
Parents: fbd3867
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 17:26:21 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200
----------------------------------------------------------------------
.../runtime/concurrent/impl/FlinkFuture.java | 4 ++
.../flink/runtime/jobmaster/JobMaster.java | 2 +-
.../runtime/jobmaster/JobMasterGateway.java | 2 +-
.../registration/RetryingRegistration.java | 65 ++++++++---------
.../resourcemanager/ResourceManager.java | 13 ++--
.../resourcemanager/ResourceManagerGateway.java | 9 ++-
.../slotmanager/SlotManager.java | 9 ++-
.../flink/runtime/rpc/MainThreadExecutable.java | 64 +++++++++++++++++
.../flink/runtime/rpc/MainThreadExecutor.java | 64 -----------------
.../apache/flink/runtime/rpc/RpcEndpoint.java | 60 ++++++----------
.../apache/flink/runtime/rpc/RpcService.java | 17 +++--
.../runtime/rpc/akka/AkkaInvocationHandler.java | 42 +++++------
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 28 ++++----
.../runtime/taskexecutor/TaskExecutor.java | 12 ++--
.../taskexecutor/TaskExecutorGateway.java | 6 +-
...TaskExecutorToResourceManagerConnection.java | 34 +++++----
.../registration/RetryingRegistrationTest.java | 75 ++++++++++----------
.../registration/TestRegistrationGateway.java | 6 +-
.../resourcemanager/ResourceManagerHATest.java | 4 +-
.../slotmanager/SlotProtocolTest.java | 14 ++--
.../flink/runtime/rpc/AsyncCallsTest.java | 13 ++--
.../flink/runtime/rpc/RpcCompletenessTest.java | 9 +--
.../flink/runtime/rpc/TestingGatewayBase.java | 18 ++---
.../flink/runtime/rpc/TestingRpcService.java | 20 +++---
.../runtime/rpc/TestingSerialRpcService.java | 54 +++++++-------
.../runtime/rpc/akka/AkkaRpcActorTest.java | 19 ++---
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +-
.../rpc/akka/MainThreadValidationTest.java | 7 +-
.../rpc/akka/MessageSerializationTest.java | 19 +++--
.../runtime/taskexecutor/TaskExecutorTest.java | 9 ++-
31 files changed, 355 insertions(+), 368 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 277f4fa..004738b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -60,6 +60,10 @@ public class FlinkFuture<T> implements Future<T> {
this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
}
+ public scala.concurrent.Future<T> getScalaFuture() {
+ return scalaFuture;
+ }
+
//-----------------------------------------------------------------------------------
// Future's methods
//-----------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0a6a7ef..1537396 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -36,7 +36,7 @@ import java.util.UUID;
/**
* JobMaster implementation. The job master is responsible for the execution of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ * {@link JobGraph}.
* <p>
* It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index a53e383..86bf17c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
/**
* {@link JobMaster} rpc gateway interface
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ea49e42..32dd978 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -18,19 +18,17 @@
package org.apache.flink.runtime.registration;
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.slf4j.Logger;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -86,7 +84,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
private final UUID leaderId;
- private final Promise<Tuple2<Gateway, Success>> completionPromise;
+ private final CompletableFuture<Tuple2<Gateway, Success>> completionFuture;
private final long initialRegistrationTimeout;
@@ -140,7 +138,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
this.delayOnError = delayOnError;
this.delayOnRefusedRegistration = delayOnRefusedRegistration;
- this.completionPromise = new DefaultPromise<>();
+ this.completionFuture = new FlinkCompletableFuture<>();
}
// ------------------------------------------------------------------------
@@ -148,7 +146,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
// ------------------------------------------------------------------------
public Future<Tuple2<Gateway, Success>> getFuture() {
- return completionPromise.future();
+ return completionFuture;
}
/**
@@ -184,28 +182,30 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
// upon success, start the registration attempts
- resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
+ resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
@Override
- public void onSuccess(Gateway result) {
+ public void accept(Gateway result) {
log.info("Resolved {} address, beginning registration", targetName);
register(result, 1, initialRegistrationTimeout);
}
- }, rpcService.getExecutionContext());
-
+ }, rpcService.getExecutor());
+
// upon failure, retry, unless this is cancelled
- resourceManagerFuture.onFailure(new OnFailure() {
+ resourceManagerFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onFailure(Throwable failure) {
+ public Void apply(Throwable failure) {
if (!isCanceled()) {
log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure);
startRegistration();
}
+
+ return null;
}
- }, rpcService.getExecutionContext());
+ }, rpcService.getExecutor());
}
catch (Throwable t) {
cancel();
- completionPromise.tryFailure(t);
+ completionFuture.completeExceptionally(t);
}
}
@@ -225,15 +225,14 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
// if the registration was successful, let the TaskExecutor know
- registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
-
+ registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
@Override
- public void onSuccess(RegistrationResponse result) throws Throwable {
+ public void accept(RegistrationResponse result) {
if (!isCanceled()) {
if (result instanceof RegistrationResponse.Success) {
// registration successful!
Success success = (Success) result;
- completionPromise.success(new Tuple2<>(gateway, success));
+ completionFuture.complete(Tuple2.of(gateway, success));
}
else {
// registration refused or unknown
@@ -241,7 +240,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
log.info("Registration at {} was declined: {}", targetName, decline.getReason());
} else {
- log.error("Received unknown response to registration attempt: " + result);
+ log.error("Received unknown response to registration attempt: {}", result);
}
log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
@@ -249,12 +248,12 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
}
}
}
- }, rpcService.getExecutionContext());
+ }, rpcService.getExecutor());
// upon failure, retry
- registrationFuture.onFailure(new OnFailure() {
+ registrationFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onFailure(Throwable failure) {
+ public Void apply(Throwable failure) {
if (!isCanceled()) {
if (failure instanceof TimeoutException) {
// we simply have not received a response in time. maybe the timeout was
@@ -262,26 +261,28 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
// currently down.
if (log.isDebugEnabled()) {
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
- targetName, targetAddress, attempt, timeoutMillis);
+ targetName, targetAddress, attempt, timeoutMillis);
}
-
+
long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
register(gateway, attempt + 1, newTimeoutMillis);
}
else {
// a serious failure occurred. we still should not give up, but keep trying
- log.error("Registration at " + targetName + " failed due to an error", failure);
+ log.error("Registration at {} failed due to an error", targetName, failure);
log.info("Pausing and re-attempting registration in {} ms", delayOnError);
-
+
registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
}
}
+
+ return null;
}
- }, rpcService.getExecutionContext());
+ }, rpcService.getExecutor());
}
catch (Throwable t) {
cancel();
- completionPromise.tryFailure(t);
+ completionFuture.completeExceptionally(t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d9a7134..5370710 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,12 +18,11 @@
package org.apache.flink.runtime.resourcemanager;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -38,7 +37,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
import java.util.HashMap;
import java.util.Map;
@@ -126,10 +124,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
final JobID jobID = jobMasterRegistration.getJobID();
- return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
+ return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
@Override
- public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-
+ public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
LOG.info("Replacing existing gateway {} for JobID {} with {}.",
@@ -137,7 +134,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
}
return new RegistrationResponse(true);
}
- }, getMainThreadExecutionContext());
+ }, getMainThreadExecutor());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c8e3488..5c8786c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.resourcemanager;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.UUID;
/**
@@ -42,7 +41,7 @@ public interface ResourceManagerGateway extends RpcGateway {
*/
Future<RegistrationResponse> registerJobMaster(
JobMasterRegistration jobMasterRegistration,
- @RpcTimeout FiniteDuration timeout);
+ @RpcTimeout Time timeout);
/**
* Register a {@link JobMaster} at the resource manager.
@@ -73,5 +72,5 @@ public interface ResourceManagerGateway extends RpcGateway {
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID,
- @RpcTimeout FiniteDuration timeout);
+ @RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 96fde7d..97176b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,11 +19,13 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
@@ -33,14 +35,11 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -79,7 +78,7 @@ public abstract class SlotManager implements LeaderRetrievalListener {
/** All allocations, we can lookup allocations either by SlotID or AllocationID */
private final AllocationMap allocationMap;
- private final FiniteDuration timeout;
+ private final Time timeout;
/** The current leader id set by the ResourceManager */
private UUID leaderID;
@@ -90,7 +89,7 @@ public abstract class SlotManager implements LeaderRetrievalListener {
this.freeSlots = new HashMap<>(16);
this.allocationMap = new AllocationMap();
this.taskManagerGateways = new HashMap<>();
- this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+ this.timeout = Time.seconds(10);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
new file mode 100644
index 0000000..ec1c984
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
+ * RPC endpoint.
+ *
+ * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread of the underlying
+ * RPC endpoint.
+ */
+public interface MainThreadExecutable {
+
+ /**
+ * Execute the runnable in the main thread of the underlying RPC endpoint.
+ *
+ * @param runnable Runnable to be executed
+ */
+ void runAsync(Runnable runnable);
+
+ /**
+ * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
+ * the callable result. If the future is not completed within the given timeout, the returned
+ * future will throw a {@link TimeoutException}.
+ *
+ * @param callable Callable to be executed
+ * @param callTimeout Timeout for the future to complete
+ * @param <V> Return value of the callable
+ * @return Future of the callable result
+ */
+ <V> Future<V> callAsync(Callable<V> callable, Time callTimeout);
+
+ /**
+ * Execute the runnable in the main thread of the underlying RPC endpoint, with
+ * a delay of the given number of milliseconds.
+ *
+ * @param runnable Runnable to be executed
+ * @param delay The delay, in milliseconds, after which the runnable will be executed
+ */
+ void scheduleRunAsync(Runnable runnable, long delay);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
deleted file mode 100644
index 5e4fead..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import akka.util.Timeout;
-import scala.concurrent.Future;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
- * RPC endpoint.
- *
- * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
- * implementation which allows to dispatch local procedures to the main thread of the underlying
- * RPC endpoint.
- */
-public interface MainThreadExecutor {
-
- /**
- * Execute the runnable in the main thread of the underlying RPC endpoint.
- *
- * @param runnable Runnable to be executed
- */
- void runAsync(Runnable runnable);
-
- /**
- * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
- * the callable result. If the future is not completed within the given timeout, the returned
- * future will throw a {@link TimeoutException}.
- *
- * @param callable Callable to be executed
- * @param callTimeout Timeout for the future to complete
- * @param <V> Return value of the callable
- * @return Future of the callable result
- */
- <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
-
- /**
- * Execute the runnable in the main thread of the underlying RPC endpoint, with
- * a delay of the given number of milliseconds.
- *
- * @param runnable Runnable to be executed
- * @param delay The delay, in milliseconds, after which the runnable will be executed
- */
- void scheduleRunAsync(Runnable runnable, long delay);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index e9e2b2c..4e5e49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,16 +18,15 @@
package org.apache.flink.runtime.rpc;
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,8 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model
* of Erlang or Akka.
*
- * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)}
- * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
+ * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread.
*
* @param <C> The RPC gateway counterpart for the implementing RPC endpoint
*/
@@ -69,9 +68,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/** Self gateway which can be used to schedule asynchronous calls on yourself */
private final C self;
- /** The main thread execution context to be used to execute future callbacks in the main thread
+ /** The main thread executor to be used to execute future callbacks in the main thread
* of the executing rpc server. */
- private final ExecutionContext mainThreadExecutionContext;
+ private final Executor mainThreadExecutor;
/** A reference to the endpoint's main thread, if the current method is called by the main thread */
final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);
@@ -89,7 +88,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
this.self = rpcService.startServer(this);
- this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
+ this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self);
}
/**
@@ -120,7 +119,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* Shuts down the underlying RPC endpoint via the RPC service.
* After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
* not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
- * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}).
+ * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}).
*
* <p>This method can be overridden to add RPC endpoint specific shut down code.
* The overridden method should always call the parent shut down method.
@@ -161,8 +160,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* @return Main thread execution context
*/
- protected ExecutionContext getMainThreadExecutionContext() {
- return mainThreadExecutionContext;
+ protected Executor getMainThreadExecutor() {
+ return mainThreadExecutor;
}
/**
@@ -185,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
*/
protected void runAsync(Runnable runnable) {
- ((MainThreadExecutor) self).runAsync(runnable);
+ ((MainThreadExecutable) self).runAsync(runnable);
}
/**
@@ -196,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param delay The delay after which the runnable will be executed
*/
protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
- ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
+ ((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay));
}
/**
@@ -209,8 +208,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param <V> Return type of the callable
* @return Future for the result of the callable.
*/
- protected <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
- return ((MainThreadExecutor) self).callAsync(callable, timeout);
+ protected <V> Future<V> callAsync(Callable<V> callable, Time timeout) {
+ return ((MainThreadExecutable) self).callAsync(callable, timeout);
}
// ------------------------------------------------------------------------
@@ -241,36 +240,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
// ------------------------------------------------------------------------
/**
- * Execution context which executes runnables in the main thread context. A reported failure
- * will cause the underlying rpc server to shut down.
+ * Executor which executes runnables in the main thread context.
*/
- private class MainThreadExecutionContext implements ExecutionContext {
+ private class MainThreadExecutor implements Executor {
- private final MainThreadExecutor gateway;
+ private final MainThreadExecutable gateway;
- MainThreadExecutionContext(MainThreadExecutor gateway) {
- this.gateway = gateway;
+ MainThreadExecutor(MainThreadExecutable gateway) {
+ this.gateway = Preconditions.checkNotNull(gateway);
}
@Override
public void execute(Runnable runnable) {
gateway.runAsync(runnable);
}
-
- @Override
- public void reportFailure(final Throwable t) {
- gateway.runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("Encountered failure in the main thread execution context.", t);
- shutDown();
- }
- });
- }
-
- @Override
- public ExecutionContext prepare() {
- return this;
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 78c1cec..a367ff2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
@@ -68,23 +68,22 @@ public interface RpcService {
void stopService();
/**
- * Gets the execution context, provided by this RPC service. This execution
- * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
- * methods of Futures.
+ * Gets the executor, provided by this RPC service. This executor can be used for example for
+ * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
*
- * <p><b>IMPORTANT:</b> This execution context does not isolate the method invocations against
+ * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
* any concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
- * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
* {@code RpcEndpoint}.
*
* @return The execution context provided by the RPC service
*/
- ExecutionContext getExecutionContext();
+ Executor getExecutor();
/**
* Execute the runnable in the execution context of this RPC Service, as returned by
- * {@link #getExecutionContext()}, after a scheduled delay.
+ * {@link #getExecutor()}, after a scheduled delay.
*
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index bfa04f6..8f4deff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
@@ -34,9 +36,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.apache.flink.util.Preconditions;
import org.apache.log4j.Logger;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import java.lang.annotation.Annotation;
@@ -53,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
* executed.
*/
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable {
private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
private final String address;
@@ -64,11 +63,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
private final boolean isLocal;
// default timeout for asks
- private final Timeout timeout;
+ private final Time timeout;
private final long maximumFramesize;
- AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+ AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) {
this.address = Preconditions.checkNotNull(address);
this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
@@ -82,7 +81,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
Object result;
- if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) ||
+ if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(RpcGateway.class)) {
result = method.invoke(this, args);
@@ -90,7 +89,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
- Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+ Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
parameterTypes,
@@ -130,13 +129,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
result = null;
} else if (returnType.equals(Future.class)) {
// execute an asynchronous call
- result = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout);
+ result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
} else {
// execute a synchronous call
- Future<?> futureResult = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout);
- FiniteDuration duration = timeout.duration();
+ scala.concurrent.Future<?> scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds());
- result = Await.result(futureResult, duration);
+ Future<?> futureResult = new FlinkFuture<>(scalaFuture);
+
+ return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
}
}
@@ -167,12 +167,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
}
@Override
- public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+ public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
if(isLocal) {
@SuppressWarnings("unchecked")
- Future<V> result = (Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout);
+ scala.concurrent.Future<V> result = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
- return result;
+ return new FlinkFuture<>(result);
} else {
throw new RuntimeException("Trying to send a Callable to a remote actor at " +
rpcEndpoint.path() + ". This is not supported.");
@@ -204,17 +204,17 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
* has been found
* @return Timeout extracted from the array of arguments or the default timeout
*/
- private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) {
+ private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
if (args != null) {
Preconditions.checkArgument(parameterAnnotations.length == args.length);
for (int i = 0; i < parameterAnnotations.length; i++) {
if (isRpcTimeout(parameterAnnotations[i])) {
- if (args[i] instanceof FiniteDuration) {
- return new Timeout((FiniteDuration) args[i]);
+ if (args[i] instanceof Time) {
+ return (Time) args[i];
} else {
throw new RuntimeException("The rpc timeout parameter must be of type " +
- FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+ Time.class.getName() + ". The type " + args[i].getClass().getName() +
" is not supported.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 2373be9..59daa46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActorWithStash;
+import akka.dispatch.Futures;
import akka.japi.Procedure;
import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -35,7 +38,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
@@ -146,8 +148,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
if (result instanceof Future) {
+ final Future<?> future = (Future<?>) result;
+
// pipe result to sender
- Patterns.pipe((Future<?>) result, getContext().dispatcher()).to(getSender());
+ if (future instanceof FlinkFuture) {
+ // FlinkFutures are currently backed by Scala's futures
+ FlinkFuture<?> flinkFuture = (FlinkFuture<?>) future;
+
+ Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender());
+ } else {
+ // We have to unpack the Flink future and pack it into a Scala future
+ Patterns.pipe(Futures.future(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return future.get();
+ }
+ }, getContext().dispatcher()), getContext().dispatcher());
+ }
} else {
// tell the sender the result of the computation
getSender().tell(new Status.Success(result), getSelf());
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 060a1ef..36f1115 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -26,11 +26,13 @@ import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Mapper;
-import akka.pattern.AskableActorSelection;
-import akka.util.Timeout;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
@@ -39,8 +41,6 @@ import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import javax.annotation.concurrent.ThreadSafe;
@@ -48,6 +48,7 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -68,13 +69,13 @@ public class AkkaRpcService implements RpcService {
private final Object lock = new Object();
private final ActorSystem actorSystem;
- private final Timeout timeout;
+ private final Time timeout;
private final Set<ActorRef> actors = new HashSet<>(4);
private final long maximumFramesize;
private volatile boolean stopped;
- public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) {
+ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
this.actorSystem = checkNotNull(actorSystem, "actor system");
this.timeout = checkNotNull(timeout, "timeout");
@@ -95,10 +96,9 @@ public class AkkaRpcService implements RpcService {
address, clazz.getName());
final ActorSelection actorSel = actorSystem.actorSelection(address);
- final AskableActorSelection asker = new AskableActorSelection(actorSel);
- final Future<Object> identify = asker.ask(new Identify(42), timeout);
- return identify.map(new Mapper<Object, C>(){
+ final scala.concurrent.Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
+ final scala.concurrent.Future<C> resultFuture = identify.map(new Mapper<Object, C>(){
@Override
public C checkedApply(Object obj) throws Exception {
@@ -128,6 +128,8 @@ public class AkkaRpcService implements RpcService {
}
}
}, actorSystem.dispatcher());
+
+ return new FlinkFuture<>(resultFuture);
}
@Override
@@ -159,7 +161,7 @@ public class AkkaRpcService implements RpcService {
classLoader,
new Class<?>[]{
rpcEndpoint.getSelfGatewayType(),
- MainThreadExecutor.class,
+ MainThreadExecutable.class,
StartStoppable.class,
AkkaGateway.class},
akkaInvocationHandler);
@@ -209,7 +211,7 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public ExecutionContext getExecutionContext() {
+ public Executor getExecutor() {
return actorSystem.dispatcher();
}
@@ -219,6 +221,6 @@ public class AkkaRpcService implements RpcService {
checkNotNull(unit, "unit");
checkArgument(delay >= 0, "delay must be zero or larger");
- actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext());
+ actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fadae5f..d84a6a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.taskexecutor;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
+import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.UUID;
@@ -198,7 +200,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
this,
newLeaderAddress,
newLeaderId,
- getMainThreadExecutionContext());
+ getMainThreadExecutor());
resourceManagerConnection.start();
}
}
@@ -302,9 +304,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
LOG.debug("Using akka configuration\n " + akkaConfig);
taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig);
} catch (Throwable t) {
- if (t instanceof org.jboss.netty.channel.ChannelException) {
+ if (t instanceof ChannelException) {
Throwable cause = t.getCause();
- if (cause != null && t.getCause() instanceof java.net.BindException) {
+ if (cause != null && t.getCause() instanceof BindException) {
String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
throw new IOException("Unable to bind TaskManager actor system to address " +
address + " - " + cause.getMessage(), t);
@@ -314,7 +316,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
// start akka rpc service based on actor system
- final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+ final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout);
// start high availability service to implement getResourceManagerLeaderRetriever method only
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 65323a8..0962802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.UUID;
@@ -48,5 +48,5 @@ public interface TaskExecutorGateway extends RpcGateway {
Future<SlotRequestReply> requestSlot(
AllocationID allocationID,
UUID resourceManagerLeaderID,
- @RpcTimeout FiniteDuration timeout);
+ @RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 28062b6..647359d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -18,11 +18,12 @@
package org.apache.flink.runtime.taskexecutor;
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -31,12 +32,8 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.slf4j.Logger;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -57,7 +54,7 @@ public class TaskExecutorToResourceManagerConnection {
private final String resourceManagerAddress;
/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
- private final ExecutionContext executionContext;
+ private final Executor executor;
private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
@@ -74,13 +71,13 @@ public class TaskExecutorToResourceManagerConnection {
TaskExecutor taskExecutor,
String resourceManagerAddress,
UUID resourceManagerLeaderId,
- ExecutionContext executionContext) {
+ Executor executor) {
this.log = checkNotNull(log);
this.taskExecutor = checkNotNull(taskExecutor);
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
- this.executionContext = checkNotNull(executionContext);
+ this.executor = checkNotNull(executor);
}
// ------------------------------------------------------------------------
@@ -100,21 +97,22 @@ public class TaskExecutorToResourceManagerConnection {
Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
- future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
+ future.thenAcceptAsync(new AcceptFunction<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
@Override
- public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
+ public void accept(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
registrationId = result.f1.getRegistrationId();
registeredResourceManager = result.f0;
}
- }, executionContext);
+ }, executor);
// this future should only ever fail if there is a bug, not if the registration is declined
- future.onFailure(new OnFailure() {
+ future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
- public void onFailure(Throwable failure) {
+ public Void apply(Throwable failure) {
taskExecutor.onFatalErrorAsync(failure);
+ return null;
}
- }, executionContext);
+ }, executor);
}
public void close() {
@@ -197,7 +195,7 @@ public class TaskExecutorToResourceManagerConnection {
protected Future<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
- FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+ Time timeout = Time.milliseconds(timeoutMillis);
return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 80fa19c..e56a9ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.registration;
-import akka.dispatch.Futures;
-
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.util.TestLogger;
@@ -29,18 +29,13 @@ import org.junit.Test;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -71,8 +66,8 @@ public class RetryingRegistrationTest extends TestLogger {
// multiple accesses return the same future
assertEquals(future, registration.getFuture());
- Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
+ Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
+ future.get(10L, TimeUnit.SECONDS);
// validate correct invocation and result
assertEquals(testId, success.f1.getCorrelationId());
@@ -83,7 +78,7 @@ public class RetryingRegistrationTest extends TestLogger {
rpc.stopService();
}
}
-
+
@Test
public void testPropagateFailures() throws Exception {
final String testExceptionMessage = "testExceptionMessage";
@@ -96,9 +91,15 @@ public class RetryingRegistrationTest extends TestLogger {
registration.startRegistration();
Future<?> future = registration.getFuture();
- assertTrue(future.failed().isCompleted());
+ assertTrue(future.isDone());
- assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage());
+ try {
+ future.get();
+
+ fail("We expected an ExecutionException.");
+ } catch (ExecutionException e) {
+ assertEquals(testExceptionMessage, e.getCause().getMessage());
+ }
}
@Test
@@ -113,16 +114,16 @@ public class RetryingRegistrationTest extends TestLogger {
// RPC service that fails upon the first connection, but succeeds on the second
RpcService rpc = mock(RpcService.class);
when(rpc.connect(anyString(), any(Class.class))).thenReturn(
- Futures.failed(new Exception("test connect failure")), // first connection attempt fails
- Futures.successful(testGateway) // second connection attempt succeeds
+ FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails
+ FlinkCompletableFuture.completed(testGateway) // second connection attempt succeeds
);
- when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+ when(rpc.getExecutor()).thenReturn(executor);
TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
registration.startRegistration();
Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS));
+ registration.getFuture().get(10L, TimeUnit.SECONDS);
// validate correct invocation and result
assertEquals(testId, success.f1.getCorrelationId());
@@ -151,23 +152,23 @@ public class RetryingRegistrationTest extends TestLogger {
try {
rpc.registerGateway(testEndpointAddress, testGateway);
-
+
TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
+
long started = System.nanoTime();
registration.startRegistration();
-
+
Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
-
+ future.get(10L, TimeUnit.SECONDS);
+
long finished = System.nanoTime();
long elapsedMillis = (finished - started) / 1000000;
-
+
// validate correct invocation and result
assertEquals(testId, success.f1.getCorrelationId());
assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
-
+
// validate that some retry-delay / back-off behavior happened
assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
}
@@ -199,10 +200,10 @@ public class RetryingRegistrationTest extends TestLogger {
long started = System.nanoTime();
registration.startRegistration();
-
+
Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
+ future.get(10L, TimeUnit.SECONDS);
long finished = System.nanoTime();
long elapsedMillis = (finished - started) / 1000000;
@@ -212,7 +213,7 @@ public class RetryingRegistrationTest extends TestLogger {
assertEquals(leaderId, testGateway.getInvocations().take().leaderId());
// validate that some retry-delay / back-off behavior happened
- assertTrue("retries did not properly back off", elapsedMillis >=
+ assertTrue("retries did not properly back off", elapsedMillis >=
2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE);
}
finally {
@@ -220,7 +221,7 @@ public class RetryingRegistrationTest extends TestLogger {
rpc.stopService();
}
}
-
+
@Test
@SuppressWarnings("unchecked")
public void testRetryOnError() throws Exception {
@@ -235,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger {
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
- Futures.<RegistrationResponse>failed(new Exception("test exception")),
- Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-
+ FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
+ FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+
rpc.registerGateway(testEndpointAddress, testGateway);
TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
@@ -247,11 +248,11 @@ public class RetryingRegistrationTest extends TestLogger {
Future<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture();
Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success =
- Await.result(future, new FiniteDuration(10, SECONDS));
+ future.get(10, TimeUnit.SECONDS);
long finished = System.nanoTime();
long elapsedMillis = (finished - started) / 1000000;
-
+
assertEquals(testId, success.f1.getCorrelationId());
// validate that some retry-delay / back-off behavior happened
@@ -271,10 +272,10 @@ public class RetryingRegistrationTest extends TestLogger {
TestingRpcService rpc = new TestingRpcService();
try {
- Promise<RegistrationResponse> result = Futures.promise();
+ FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
- when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future());
+ when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
rpc.registerGateway(testEndpointAddress, testGateway);
@@ -283,7 +284,7 @@ public class RetryingRegistrationTest extends TestLogger {
// cancel and fail the current registration attempt
registration.cancel();
- result.failure(new TimeoutException());
+ result.completeExceptionally(new TimeoutException());
// there should not be a second registration attempt
verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong());
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 431fbe8..2843aeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.registration;
import akka.dispatch.Futures;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.TestingGatewayBase;
import org.apache.flink.util.Preconditions;
-import scala.concurrent.Future;
-
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +56,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
}
// return a completed future (for a proper value), or one that never completes and will time out (for null)
- return response != null ? Futures.successful(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+ return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
}
public BlockingQueue<RegistrationCall> getInvocations() {
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 8183c0a..64a1191 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
@@ -68,7 +68,7 @@ public class ResourceManagerHATest {
Assert.assertNull(resourceManager.getLeaderSessionID());
}
- private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+ private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway {
@Override
public void runAsync(Runnable runnable) {
runnable.run();
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 85d2880..1f9e7e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
@@ -40,10 +42,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
import java.util.Collections;
import java.util.UUID;
@@ -99,7 +97,7 @@ public class SlotProtocolTest extends TestLogger {
Future<RegistrationResponse> registrationFuture =
resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
try {
- Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+ registrationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Assert.fail("JobManager registration Future didn't become ready.");
}
@@ -141,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
slotManager.updateSlotStatus(slotReport);
// 4) Slot becomes available and TaskExecutor gets a SlotRequest
- verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+ verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
}
/**
@@ -171,7 +169,7 @@ public class SlotProtocolTest extends TestLogger {
Future<RegistrationResponse> registrationFuture =
resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
try {
- Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
+ registrationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Assert.fail("JobManager registration Future didn't become ready.");
}
@@ -207,7 +205,7 @@ public class SlotProtocolTest extends TestLogger {
// 4) a SlotRequest is routed to the TaskExecutor
- verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class));
+ verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1791056..7c6b0ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -21,18 +21,16 @@ package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,7 +47,7 @@ public class AsyncCallsTest extends TestLogger {
private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
private static AkkaRpcService akkaRpcService =
- new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+ new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
@AfterClass
public static void shutdown() {
@@ -104,8 +102,9 @@ public class AsyncCallsTest extends TestLogger {
}
return "test";
}
- }, new Timeout(30, TimeUnit.SECONDS));
- String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+ }, Time.seconds(30L));
+
+ String str = result.get(30, TimeUnit.SECONDS);
assertEquals("test", str);
// validate that no concurrent access happened
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index b431eb9..ee3f784 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,14 +18,14 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ReflectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.reflections.Reflections;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
@@ -43,6 +43,7 @@ import static org.junit.Assert.fail;
public class RpcCompletenessTest extends TestLogger {
private static final Class<?> futureClass = Future.class;
+ private static final Class<?> timeoutClass = Time.class;
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -147,8 +148,8 @@ public class RpcCompletenessTest extends TestLogger {
for (int i = 0; i < parameterAnnotations.length; i++) {
if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
assertTrue(
- "The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".",
- parameterTypes[i].equals(FiniteDuration.class));
+ "The rpc timeout has to be of type " + timeoutClass.getName() + ".",
+ parameterTypes[i].equals(timeoutClass));
rpcTimeoutParameters++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 8133a87..caf5e81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.rpc;
-import akka.dispatch.Futures;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -73,25 +73,25 @@ public abstract class TestingGatewayBase implements RpcGateway {
// ------------------------------------------------------------------------
public <T> Future<T> futureWithTimeout(long timeoutMillis) {
- Promise<T> promise = Futures.<T>promise();
- executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS);
- return promise.future();
+ FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+ executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
+ return future;
}
// ------------------------------------------------------------------------
private static final class FutureTimeout implements Runnable {
- private final Promise<?> promise;
+ private final CompletableFuture<?> promise;
- private FutureTimeout(Promise<?> promise) {
+ private FutureTimeout(CompletableFuture<?> promise) {
this.promise = promise;
}
@Override
public void run() {
try {
- promise.failure(new TimeoutException());
+ promise.completeExceptionally(new TimeoutException());
} catch (Throwable t) {
System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage());
t.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/507e86cf/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 2212680..f164056 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,18 +18,14 @@
package org.apache.flink.runtime.rpc;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -69,7 +65,7 @@ public class TestingRpcService extends AkkaRpcService {
* Creates a new {@code TestingRpcService}, using the given configuration.
*/
public TestingRpcService(Configuration configuration) {
- super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+ super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
this.registeredConnections = new ConcurrentHashMap<>();
}
@@ -103,13 +99,13 @@ public class TestingRpcService extends AkkaRpcService {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return Futures.successful(typedGateway);
+ return FlinkCompletableFuture.completed(typedGateway);
} else {
- return Futures.failed(
- new Exception("Gateway registered under " + address + " is not of type " + clazz));
+ return FlinkCompletableFuture.completedExceptionally(
+ new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
} else {
- return Futures.failed(new Exception("No gateway registered under that name"));
+ return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
}
}