You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/24 13:49:23 UTC
[2/2] flink git commit: [FLINK-5798] [rpc] Let the RpcService provide
a ScheduledExecutorService
[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService
This PR adds the getScheduledExecutorService method to the RpcService interface. So
henceforth all RpcService implementations have to provide a ScheduledExecutorService
implementation.
Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the ActorSystem's
internal scheduler.
Introduce ScheduledExecutor interface to hide service methods from the ScheduledExecutorService
This closes #3310.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf458dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf458dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf458dd
Branch: refs/heads/master
Commit: ccf458dd4d173b3370257177c2bbd9680baa6511
Parents: 5983069
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Feb 14 16:50:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Feb 24 14:48:52 2017 +0100
----------------------------------------------------------------------
.../runtime/concurrent/ScheduledExecutor.java | 92 ++++++++++
.../ScheduledExecutorServiceAdapter.java | 64 +++++++
.../apache/flink/runtime/rpc/RpcService.java | 15 ++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 178 +++++++++++++++++++
.../runtime/rpc/TestingSerialRpcService.java | 34 ++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 160 +++++++++++++++++
6 files changed, 543 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
new file mode 100644
index 0000000..c1b47e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extension for the {@link Executor} interface which is enriched by method for scheduling tasks
+ * in the future.
+ */
+public interface ScheduledExecutor extends Executor {
+
+ /**
+ * Executes the given command after the given delay.
+ *
+ * @param command the task to execute in the future
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing the completion of the scheduled task
+ */
+ ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
+
+ /**
+ * Executes the given callable after the given delay. The result of the callable is returned
+ * as a {@link ScheduledFuture}.
+ *
+ * @param callable the callable to execute
+ * @param delay the time from now to delay the execution
+ * @param unit the time unit of the delay parameter
+ * @param <V> result type of the callable
+ * @return a ScheduledFuture which holds the future value of the given callable
+ */
+ <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
+
+ /**
+ * Executes the given command periodically. The first execution is started after the
+ * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
+ * the third after {@code initialDelay + 2*period} and so on.
+ * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
+ * is cancelled.
+ *
+ * @param command the task to be executed periodically
+ * @param initialDelay the time from now until the first execution is triggered
+ * @param period the time after which the next execution is triggered
+ * @param unit the time unit of the delay and period parameter
+ * @return a ScheduledFuture representing the periodic task. This future never completes
+ * unless an execution of the given task fails or if the future is cancelled
+ */
+ ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit);
+
+ /**
+ * Executed the given command repeatedly with the given delay between the end of an execution
+ * and the start of the next execution.
+ * The task is executed repeatedly until either an exception occurs or if the returned
+ * {@link ScheduledFuture} is cancelled.
+ *
+ * @param command the task to execute repeatedly
+ * @param initialDelay the time from now until the first execution is triggered
+ * @param delay the time between the end of the current and the start of the next execution
+ * @param unit the time unit of the initial delay and the delay parameter
+ * @return a ScheduledFuture representing the repeatedly executed task. This future never
+ * completes unless th exectuion of the given task fails or if the future is cancelled
+ */
+ ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
new file mode 100644
index 0000000..7662c35
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Adapter class for a {@link ScheduledExecutorService} which shall be used as a
+ * {@link ScheduledExecutor}.
+ */
+public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return scheduledExecutorService.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return scheduledExecutorService.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ scheduledExecutorService.execute(command);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 4b9100a..2d2019a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import java.util.concurrent.Callable;
@@ -98,6 +99,20 @@ public interface RpcService {
Executor getExecutor();
/**
+ * Gets a scheduled executor from the RPC service. This executor can be used to schedule
+ * tasks to be executed in the future.
+ *
+ * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @return The RPC service provided scheduled executor
+ */
+ ScheduledExecutor getScheduledExecutor();
+
+ /**
* Execute the runnable in the execution context of this RPC Service, as returned by
* {@link #getExecutor()}, after a scheduled delay.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6e3fb40..6a6a85d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
+import akka.actor.Cancellable;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
@@ -34,6 +35,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
@@ -43,18 +45,24 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -81,6 +89,8 @@ public class AkkaRpcService implements RpcService {
private final String address;
+ private final ScheduledExecutor internalScheduledExecutor;
+
private volatile boolean stopped;
public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
@@ -101,6 +111,8 @@ public class AkkaRpcService implements RpcService {
} else {
address = "";
}
+
+ internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
}
@Override
@@ -259,6 +271,10 @@ public class AkkaRpcService implements RpcService {
return actorSystem.dispatcher();
}
+ public ScheduledExecutor getScheduledExecutor() {
+ return internalScheduledExecutor;
+ }
+
@Override
public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
checkNotNull(runnable, "runnable");
@@ -279,4 +295,166 @@ public class AkkaRpcService implements RpcService {
return new FlinkFuture<>(scalaFuture);
}
+
+ /**
+ * Helper class to expose the internal scheduling logic via a {@link ScheduledExecutor}.
+ */
+ private static final class InternalScheduledExecutorImpl implements ScheduledExecutor {
+
+ private final ActorSystem actorSystem;
+
+ private InternalScheduledExecutorImpl(ActorSystem actorSystem) {
+ this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
+ }
+
+ @Override
+ @Nonnull
+ public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
+ ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);
+
+ Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
+
+ scheduledFutureTask.setCancellable(cancellable);
+
+ return scheduledFutureTask;
+ }
+
+ @Override
+ @Nonnull
+ public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
+ ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);
+
+ Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
+
+ scheduledFutureTask.setCancellable(cancellable);
+
+ return scheduledFutureTask;
+ }
+
+ @Override
+ @Nonnull
+ public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
+ ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
+ command,
+ triggerTime(unit.toNanos(initialDelay)),
+ unit.toNanos(period));
+
+ Cancellable cancellable = actorSystem.scheduler().schedule(
+ new FiniteDuration(initialDelay, unit),
+ new FiniteDuration(period, unit),
+ scheduledFutureTask,
+ actorSystem.dispatcher());
+
+ scheduledFutureTask.setCancellable(cancellable);
+
+ return scheduledFutureTask;
+ }
+
+ @Override
+ @Nonnull
+ public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
+ ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
+ command,
+ triggerTime(unit.toNanos(initialDelay)),
+ unit.toNanos(-delay));
+
+ Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);
+
+ scheduledFutureTask.setCancellable(cancellable);
+
+ return scheduledFutureTask;
+ }
+
+ @Override
+ public void execute(@Nonnull Runnable command) {
+ actorSystem.dispatcher().execute(command);
+ }
+
+ private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
+ return actorSystem.scheduler().scheduleOnce(
+ new FiniteDuration(delay, unit),
+ runnable,
+ actorSystem.dispatcher());
+ }
+
+ private long now() {
+ return System.nanoTime();
+ }
+
+ private long triggerTime(long delay) {
+ return now() + delay;
+ }
+
+ private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
+
+ private long time;
+
+ private final long period;
+
+ private volatile Cancellable cancellable;
+
+ ScheduledFutureTask(Callable<V> callable, long time, long period) {
+ super(callable);
+ this.time = time;
+ this.period = period;
+ }
+
+ ScheduledFutureTask(Runnable runnable, long time, long period) {
+ super(runnable, null);
+ this.time = time;
+ this.period = period;
+ }
+
+ public void setCancellable(Cancellable newCancellable) {
+ this.cancellable = newCancellable;
+ }
+
+ @Override
+ public void run() {
+ if (!isPeriodic()) {
+ super.run();
+ } else if (runAndReset()){
+ if (period > 0L) {
+ time += period;
+ } else {
+ cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);
+
+ // check whether we have been cancelled concurrently
+ if (isCancelled()) {
+ cancellable.cancel();
+ } else {
+ time = triggerTime(-period);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = super.cancel(mayInterruptIfRunning);
+
+ return result && cancellable.cancel();
+ }
+
+ @Override
+ public long getDelay(@Nonnull TimeUnit unit) {
+ return unit.convert(time - now(), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public int compareTo(@Nonnull Delayed o) {
+ if (o == this) {
+ return 0;
+ }
+
+ long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
+ return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
+ }
+
+ @Override
+ public boolean isPeriodic() {
+ return period != 0L;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 1d30ea4..07edfef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.Preconditions;
@@ -31,10 +33,13 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.BitSet;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,13 +51,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class TestingSerialRpcService implements RpcService {
private final DirectExecutorService executorService;
+ private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
private final CompletableFuture<Void> terminationFuture;
+ private final ScheduledExecutor scheduledExecutorServiceAdapter;
+
public TestingSerialRpcService() {
executorService = new DirectExecutorService();
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
this.registeredConnections = new ConcurrentHashMap<>(16);
this.terminationFuture = new FlinkCompletableFuture<>();
+
+ this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
}
@Override
@@ -86,9 +97,32 @@ public class TestingSerialRpcService implements RpcService {
return executorService;
}
+ public ScheduledExecutor getScheduledExecutor() {
+ return scheduledExecutorServiceAdapter;
+ }
+
@Override
public void stopService() {
executorService.shutdown();
+
+ scheduledExecutorService.shutdown();
+
+ boolean terminated = false;
+
+ try {
+ terminated = scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (!terminated) {
+ List<Runnable> runnables = scheduledExecutorService.shutdownNow();
+
+ for (Runnable runnable : runnables) {
+ runnable.run();
+ }
+ }
+
registeredConnections.clear();
terminationFuture.complete(null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7c8defa..eb71287 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
@@ -30,13 +31,16 @@ import org.junit.AfterClass;
import org.junit.Test;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class AkkaRpcServiceTest extends TestLogger {
@@ -149,4 +153,160 @@ public class AkkaRpcServiceTest extends TestLogger {
terminationFuture.get();
}
+
+ /**
+ * Tests a simple scheduled runnable being executed by the RPC services scheduled executor
+ * service.
+ */
+ @Test(timeout = 1000)
+ public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException {
+ ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ ScheduledFuture<?> future = scheduledExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ latch.trigger();
+ }
+ },
+ 10L,
+ TimeUnit.MILLISECONDS);
+
+ future.get();
+
+ // once the future is completed, then the latch should have been triggered
+ assertTrue(latch.isTriggered());
+ }
+
+ /**
+ * Tests that the RPC service's scheduled executor service can execute runnables at a fixed
+ * rate.
+ */
+ @Test(timeout = 1000)
+ public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException {
+ ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+ final int tries = 4;
+ final long delay = 10L;
+ final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+ long currentTime = System.nanoTime();
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch.countDown();
+ }
+ },
+ delay,
+ delay,
+ TimeUnit.MILLISECONDS);
+
+ assertTrue(!future.isDone());
+
+ countDownLatch.await();
+
+ // the future should not complete since we have a periodic task
+ assertTrue(!future.isDone());
+
+ long finalTime = System.nanoTime() - currentTime;
+
+ // the processing should have taken at least delay times the number of count downs.
+ assertTrue(finalTime >= tries * delay);
+
+ future.cancel(true);
+ }
+
+ /**
+ * Tests that the RPC service's scheduled executor service can execute runnable with a fixed
+ * delay.
+ */
+ @Test(timeout = 1000)
+ public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException, InterruptedException {
+ ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+ final int tries = 4;
+ final long delay = 10L;
+ final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+ long currentTime = System.nanoTime();
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ countDownLatch.countDown();
+ }
+ },
+ delay,
+ delay,
+ TimeUnit.MILLISECONDS);
+
+ assertTrue(!future.isDone());
+
+ countDownLatch.await();
+
+ // the future should not complete since we have a periodic task
+ assertTrue(!future.isDone());
+
+ long finalTime = System.nanoTime() - currentTime;
+
+ // the processing should have taken at least delay times the number of count downs.
+ assertTrue(finalTime >= tries * delay);
+
+ future.cancel(true);
+ }
+
+ /**
+ * Tests that canceling the returned future will stop the execution of the scheduled runnable.
+ */
+ @Test
+ public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
+ ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+ long delay = 10L;
+
+ final OneShotLatch futureTask = new OneShotLatch();
+ final OneShotLatch latch = new OneShotLatch();
+ final OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!futureTask.isTriggered()) {
+ // first run
+ futureTask.trigger();
+ latch.await();
+ } else {
+ shouldNotBeTriggeredLatch.trigger();
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ },
+ delay,
+ delay,
+ TimeUnit.MILLISECONDS);
+
+ // wait until we're in the runnable
+ futureTask.await();
+
+ // cancel the scheduled future
+ future.cancel(false);
+
+ latch.trigger();
+
+ try {
+ shouldNotBeTriggeredLatch.await(5 * delay, TimeUnit.MILLISECONDS);
+ fail("The shouldNotBeTriggeredLatch should never be triggered.");
+ } catch (TimeoutException e) {
+ // expected
+ }
+ }
}