You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2015/11/09 06:42:03 UTC

helix git commit: [HELIX-613] Fix thread leaking problems in TaskStateModel by sharing one thread pool among all tasks and timeout tasks from TaskStateModels created from the same TaskStateModelFactory.

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x b72ff29d1 -> 456ddb0c4


[HELIX-613] Fix thread leaking problems in TaskStateModel by sharing one thread pool among all tasks and timeout tasks from TaskStateModels created from the same TaskStateModelFactory.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/456ddb0c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/456ddb0c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/456ddb0c

Branch: refs/heads/helix-0.6.x
Commit: 456ddb0c4a900ee7cdf081777eff9445378df513
Parents: b72ff29
Author: Lei Xia <lx...@linkedin.com>
Authored: Sat Nov 7 22:43:04 2015 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sat Nov 7 22:43:04 2015 -0800

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskStateModel.java   | 55 +++++++++++---------
 .../helix/task/TaskStateModelFactory.java       | 33 ++++++++++--
 2 files changed, 60 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 9ca9ee9..30939fc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -20,12 +20,8 @@ package org.apache.helix.task;
  */
 
 import java.util.Map;
-import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
@@ -39,20 +35,16 @@ import org.apache.log4j.Logger;
 public class TaskStateModel extends StateModel {
   private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
   private final HelixManager _manager;
-  private final ExecutorService _taskExecutor;
+  private final ScheduledExecutorService _taskExecutor;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final Timer _timer = new Timer("TaskStateModel time out daemon", true);
+  private ScheduledFuture timeout_task;
   private TaskRunner _taskRunner;
 
-  public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+  public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
+      ScheduledExecutorService taskExecutor) {
     _manager = manager;
     _taskFactoryRegistry = taskFactoryRegistry;
-    _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Thread(r, "TaskStateModel-thread-pool");
-      }
-    });
+    _taskExecutor = taskExecutor;
   }
 
   public boolean isShutdown() {
@@ -65,8 +57,6 @@ public class TaskStateModel extends StateModel {
 
   public void shutdown() {
     reset();
-    _taskExecutor.shutdown();
-    _timer.cancel();
   }
 
   public boolean awaitTermination(long timeout, TimeUnit unit)
@@ -91,6 +81,8 @@ public class TaskStateModel extends StateModel {
     TaskResult r = _taskRunner.waitTillDone();
     LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
 
+    timeout_task.cancel(false);
+
     return r.getInfo();
   }
 
@@ -108,6 +100,8 @@ public class TaskStateModel extends StateModel {
           "Partition %s received a state transition to %s but the result status code is %s.",
           msg.getPartitionName(), msg.getToState(), r.getStatus()));
     }
+
+    timeout_task.cancel(false);
   }
 
   @Transition(to = "TIMED_OUT", from = "RUNNING")
@@ -125,6 +119,8 @@ public class TaskStateModel extends StateModel {
           msg.getPartitionName(), msg.getToState(), r.getStatus()));
     }
 
+    timeout_task.cancel(false);
+
     return r.getInfo();
   }
 
@@ -143,6 +139,8 @@ public class TaskStateModel extends StateModel {
           msg.getPartitionName(), msg.getToState(), r.getStatus()));
     }
 
+    timeout_task.cancel(false);
+
     return r.getInfo();
   }
 
@@ -153,7 +151,7 @@ public class TaskStateModel extends StateModel {
 
   @Transition(to = "DROPPED", from = "INIT")
   public void onBecomeDroppedFromInit(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "DROPPED", from = "RUNNING")
@@ -168,34 +166,35 @@ public class TaskStateModel extends StateModel {
     TaskResult r = _taskRunner.waitTillDone();
     LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
     _taskRunner = null;
+    timeout_task.cancel(false);
   }
 
   @Transition(to = "DROPPED", from = "COMPLETED")
   public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "DROPPED", from = "STOPPED")
   public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "DROPPED", from = "TIMED_OUT")
   public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "DROPPED", from = "TASK_ERROR")
   public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) {
-    _taskRunner = null;
+    reset();
   }
 
   @Transition(to = "INIT", from = "RUNNING")
   public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
     String taskPartition = msg.getPartitionName();
     if (_taskRunner == null) {
-      throw new IllegalStateException(String.format(
-          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+      throw new IllegalStateException(String
+          .format("Invalid state transition. There is no running task for partition %s.", taskPartition));
     }
 
     _taskRunner.cancel();
@@ -228,6 +227,11 @@ public class TaskStateModel extends StateModel {
   public void reset() {
     if (_taskRunner != null) {
       _taskRunner.cancel();
+      _taskRunner = null;
+    }
+    if (timeout_task != null) {
+      timeout_task.cancel(false);
+      timeout_task = null;
     }
   }
 
@@ -276,13 +280,14 @@ public class TaskStateModel extends StateModel {
     _taskRunner.waitTillStarted();
 
     // Set up a timer to cancel the task when its time out expires.
-    _timer.schedule(new TimerTask() {
+
+    timeout_task = _taskExecutor.schedule(new TimerTask() {
       @Override
       public void run() {
         if (_taskRunner != null) {
           _taskRunner.timeout();
         }
       }
-    }, cfg.getTimeoutPerTask());
+    }, cfg.getTimeoutPerTask(), TimeUnit.MILLISECONDS);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index b8e91f5..522c9e5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -20,6 +20,9 @@ package org.apache.helix.task;
  */
 
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -30,14 +33,38 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
+  private final ScheduledExecutorService _taskExecutor;
+  private final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
+    this(manager, taskFactoryRegistry,
+        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
+          @Override public Thread newThread(Runnable r) {
+            return new Thread(r, "TaskStateModel-thread-pool");
+          }
+        }));
+  }
+
+  public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
+      ScheduledExecutorService taskExecutor) {
     _manager = manager;
     _taskFactoryRegistry = taskFactoryRegistry;
+    _taskExecutor = taskExecutor;
+  }
+
+  @Override public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    return new TaskStateModel(_manager, _taskFactoryRegistry, _taskExecutor);
+  }
+
+  public void shutdown() {
+    _taskExecutor.shutdown();
+  }
+
+  public boolean isShutdown() {
+    return _taskExecutor.isShutdown();
   }
 
-  @Override
-  public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
-    return new TaskStateModel(_manager, _taskFactoryRegistry);
+  public boolean isTerminated() {
+    return _taskExecutor.isTerminated();
   }
 }