You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2015/11/09 06:42:03 UTC
helix git commit: [HELIX-613] Fix thread leaking problems in
TaskStateModel by sharing one thread pool among all tasks and timeout tasks
from TaskStateModels created from the same TaskStateModelFactory.
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x b72ff29d1 -> 456ddb0c4
[HELIX-613] Fix thread leaking problems in TaskStateModel by sharing one thread pool among all tasks and timeout tasks from TaskStateModels created from the same TaskStateModelFactory.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/456ddb0c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/456ddb0c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/456ddb0c
Branch: refs/heads/helix-0.6.x
Commit: 456ddb0c4a900ee7cdf081777eff9445378df513
Parents: b72ff29
Author: Lei Xia <lx...@linkedin.com>
Authored: Sat Nov 7 22:43:04 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sat Nov 7 22:43:04 2015 -0800
----------------------------------------------------------------------
.../org/apache/helix/task/TaskStateModel.java | 55 +++++++++++---------
.../helix/task/TaskStateModelFactory.java | 33 ++++++++++--
2 files changed, 60 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 9ca9ee9..30939fc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -20,12 +20,8 @@ package org.apache.helix.task;
*/
import java.util.Map;
-import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
@@ -39,20 +35,16 @@ import org.apache.log4j.Logger;
public class TaskStateModel extends StateModel {
private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
private final HelixManager _manager;
- private final ExecutorService _taskExecutor;
+ private final ScheduledExecutorService _taskExecutor;
private final Map<String, TaskFactory> _taskFactoryRegistry;
- private final Timer _timer = new Timer("TaskStateModel time out daemon", true);
+ private ScheduledFuture timeout_task;
private TaskRunner _taskRunner;
- public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+ public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
+ ScheduledExecutorService taskExecutor) {
_manager = manager;
_taskFactoryRegistry = taskFactoryRegistry;
- _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "TaskStateModel-thread-pool");
- }
- });
+ _taskExecutor = taskExecutor;
}
public boolean isShutdown() {
@@ -65,8 +57,6 @@ public class TaskStateModel extends StateModel {
public void shutdown() {
reset();
- _taskExecutor.shutdown();
- _timer.cancel();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
@@ -91,6 +81,8 @@ public class TaskStateModel extends StateModel {
TaskResult r = _taskRunner.waitTillDone();
LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
+ timeout_task.cancel(false);
+
return r.getInfo();
}
@@ -108,6 +100,8 @@ public class TaskStateModel extends StateModel {
"Partition %s received a state transition to %s but the result status code is %s.",
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
+
+ timeout_task.cancel(false);
}
@Transition(to = "TIMED_OUT", from = "RUNNING")
@@ -125,6 +119,8 @@ public class TaskStateModel extends StateModel {
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
+ timeout_task.cancel(false);
+
return r.getInfo();
}
@@ -143,6 +139,8 @@ public class TaskStateModel extends StateModel {
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
+ timeout_task.cancel(false);
+
return r.getInfo();
}
@@ -153,7 +151,7 @@ public class TaskStateModel extends StateModel {
@Transition(to = "DROPPED", from = "INIT")
public void onBecomeDroppedFromInit(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "DROPPED", from = "RUNNING")
@@ -168,34 +166,35 @@ public class TaskStateModel extends StateModel {
TaskResult r = _taskRunner.waitTillDone();
LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
_taskRunner = null;
+ timeout_task.cancel(false);
}
@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "DROPPED", from = "STOPPED")
public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "DROPPED", from = "TIMED_OUT")
public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "DROPPED", from = "TASK_ERROR")
public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) {
- _taskRunner = null;
+ reset();
}
@Transition(to = "INIT", from = "RUNNING")
public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
- throw new IllegalStateException(String.format(
- "Invalid state transition. There is no running task for partition %s.", taskPartition));
+ throw new IllegalStateException(String
+ .format("Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
@@ -228,6 +227,11 @@ public class TaskStateModel extends StateModel {
public void reset() {
if (_taskRunner != null) {
_taskRunner.cancel();
+ _taskRunner = null;
+ }
+ if (timeout_task != null) {
+ timeout_task.cancel(false);
+ timeout_task = null;
}
}
@@ -276,13 +280,14 @@ public class TaskStateModel extends StateModel {
_taskRunner.waitTillStarted();
// Set up a timer to cancel the task when its time out expires.
- _timer.schedule(new TimerTask() {
+
+ timeout_task = _taskExecutor.schedule(new TimerTask() {
@Override
public void run() {
if (_taskRunner != null) {
_taskRunner.timeout();
}
}
- }, cfg.getTimeoutPerTask());
+ }, cfg.getTimeoutPerTask(), TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index b8e91f5..522c9e5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -20,6 +20,9 @@ package org.apache.helix.task;
*/
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import org.apache.helix.HelixManager;
import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -30,14 +33,38 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
private final HelixManager _manager;
private final Map<String, TaskFactory> _taskFactoryRegistry;
+ private final ScheduledExecutorService _taskExecutor;
+ private final static int TASK_THREADPOOL_SIZE = 40;
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+ this(manager, taskFactoryRegistry,
+ Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
+ @Override public Thread newThread(Runnable r) {
+ return new Thread(r, "TaskStateModel-thread-pool");
+ }
+ }));
+ }
+
+ public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
+ ScheduledExecutorService taskExecutor) {
_manager = manager;
_taskFactoryRegistry = taskFactoryRegistry;
+ _taskExecutor = taskExecutor;
+ }
+
+ @Override public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+ return new TaskStateModel(_manager, _taskFactoryRegistry, _taskExecutor);
+ }
+
+ public void shutdown() {
+ _taskExecutor.shutdown();
+ }
+
+ public boolean isShutdown() {
+ return _taskExecutor.isShutdown();
}
- @Override
- public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
- return new TaskStateModel(_manager, _taskFactoryRegistry);
+ public boolean isTerminated() {
+ return _taskExecutor.isTerminated();
}
}