You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:59 UTC
[40/52] [abbrv] flink git commit: [FLINK-5093] Add proper shutdown of
scheduled executor service in TimerService
[FLINK-5093] Add proper shutdown of scheduled executor service in TimerService
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d3a3eeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d3a3eeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d3a3eeb
Branch: refs/heads/master
Commit: 4d3a3eeb7b2cfc2ead2a388f745a05184c7878ce
Parents: eefcbbd
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 28 15:25:57 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100
----------------------------------------------------------------------
.../taskexecutor/TaskManagerServices.java | 4 ++-
.../TaskManagerServicesConfiguration.java | 20 ++++++++++++--
.../runtime/taskexecutor/slot/TimerService.java | 29 +++++++++++++++++---
.../taskexecutor/TaskExecutorITCase.java | 2 +-
.../taskexecutor/slot/TimerServiceTest.java | 2 +-
5 files changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index b57fafe..ae5a383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -204,7 +204,9 @@ public class TaskManagerServices {
resourceProfiles.add(new ResourceProfile(1.0, 42L));
}
- final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1));
+ final TimerService<AllocationID> timerService = new TimerService<>(
+ new ScheduledThreadPoolExecutor(1),
+ taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 3190a93..2c76372 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.HybridMemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.memory.MemoryManager;
@@ -37,6 +38,7 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -63,6 +65,8 @@ public class TaskManagerServicesConfiguration {
private final MetricRegistryConfiguration metricRegistryConfiguration;
+ private final long timerServiceShutdownTimeout;
+
public TaskManagerServicesConfiguration(
InetAddress taskManagerAddress,
String[] tmpDirPaths,
@@ -72,7 +76,8 @@ public class TaskManagerServicesConfiguration {
long configuredMemory,
boolean preAllocateMemory,
float memoryFraction,
- MetricRegistryConfiguration metricRegistryConfiguration) {
+ MetricRegistryConfiguration metricRegistryConfiguration,
+ long timerServiceShutdownTimeout) {
this.taskManagerAddress = checkNotNull(taskManagerAddress);
this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -85,6 +90,10 @@ public class TaskManagerServicesConfiguration {
this.memoryFraction = memoryFraction;
this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration);
+
+ checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
+ "service shutdown timeout must be greater or equal to 0.");
+ this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
}
// --------------------------------------------------------------------------------------------
@@ -128,6 +137,10 @@ public class TaskManagerServicesConfiguration {
return metricRegistryConfiguration;
}
+ public long getTimerServiceShutdownTimeout() {
+ return timerServiceShutdownTimeout;
+ }
+
// --------------------------------------------------------------------------------------------
// Parsing of Flink configuration
// --------------------------------------------------------------------------------------------
@@ -188,7 +201,7 @@ public class TaskManagerServicesConfiguration {
final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
-
+ long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
return new TaskManagerServicesConfiguration(
remoteAddress,
@@ -199,7 +212,8 @@ public class TaskManagerServicesConfiguration {
configuredMemory,
preAllocateMemory,
memoryFraction,
- metricRegistryConfiguration);
+ metricRegistryConfiguration,
+ timerServiceShutdownTimeout);
}
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 14c9ab1..8ec9a2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.taskexecutor.slot;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -35,18 +37,28 @@ import java.util.concurrent.TimeUnit;
*/
public class TimerService<K> {
+ private static final Logger LOG = LoggerFactory.getLogger(TimerService.class);
+
/** Executor service for the scheduled timeouts */
private final ScheduledExecutorService scheduledExecutorService;
+ /** Timeout for the shutdown of the service. */
+ private final long shutdownTimeout;
+
/** Map of currently active timeouts */
private final Map<K, Timeout<K>> timeouts;
/** Listener which is notified about occurring timeouts */
private TimeoutListener<K> timeoutListener;
- public TimerService(final ScheduledExecutorService scheduledExecutorService) {
+ public TimerService(
+ final ScheduledExecutorService scheduledExecutorService,
+ final long shutdownTimeout) {
this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+ Preconditions.checkArgument(shutdownTimeout >= 0L, "The shut down timeout must be larger than or equal than 0.");
+ this.shutdownTimeout = shutdownTimeout;
+
this.timeouts = new HashMap<>(16);
this.timeoutListener = null;
}
@@ -65,6 +77,17 @@ public class TimerService<K> {
timeoutListener = null;
scheduledExecutorService.shutdown();
+
+ try {
+ if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+ LOG.debug("The scheduled executor service did not properly terminate. Shutting " +
+ "it down now.");
+ scheduledExecutorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
+ scheduledExecutorService.shutdownNow();
+ }
}
/**
@@ -103,9 +126,7 @@ public class TimerService<K> {
*/
protected void unregisterAllTimeouts() {
for (Timeout<K> timeout : timeouts.values()) {
- if (timeout != null) {
- timeout.cancel();
- }
+ timeout.cancel();
}
timeouts.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 050db44..36fd65b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -106,7 +106,7 @@ public class TaskExecutorITCase {
final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class);
final FileCache fileCache = mock(FileCache.class);
- final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService));
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L));
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
index 9dd5f39..cad3624 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -48,7 +48,7 @@ public class TimerServiceTest {
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
.thenReturn(scheduledFuture);
- TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService);
+ TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService, 100L);
TimeoutListener<AllocationID> listener = mock(TimeoutListener.class);
timerService.start(listener);