You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/07 18:06:04 UTC

[helix] branch master updated: Shutdown the TaskStateModelFactory threads created in the tests. (#1140)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fc8708  Shutdown the TaskStateModelFactory threads created in the tests. (#1140)
3fc8708 is described below

commit 3fc870815219d910c6e357b6b4ba843b2244c776
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue Jul 7 11:05:56 2020 -0700

    Shutdown the TaskStateModelFactory threads created in the tests. (#1140)
    
    The TaskStateModelFactory initialization creates a thread pool. The expectation is that the application code closes the thread pool and the threads when the participant instance is shutting down. In most cases, this means the JVM is going to be shutdown. So this operation is not a must. However, in the test cases, these thread pools leak thousands of threads.
    This PR adds cleanup logic to shutdown thread pools that are created for the participant instances. Note that there is still thread leakage when the participants are created separately instead of using the general methods.
---
 .../apache/helix/task/TaskStateModelFactory.java   | 10 +++++++++
 .../helix/task/TaskSynchronizedTestBase.java       | 26 +++++++++++++++++-----
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index f30dd9f..6d7c80e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.HelixManager;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -95,6 +96,15 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
     }
   }
 
+  @VisibleForTesting
+  void shutdownNow() {
+    _taskExecutor.shutdownNow();
+    _timerTaskExecutor.shutdownNow();
+    if (_monitor != null) {
+      _monitor.unregister();
+    }
+  }
+
   public boolean isShutdown() {
     return _taskExecutor.isShutdown();
   }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index f90dd34..c281b16 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -35,6 +35,7 @@ import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -161,6 +162,10 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
   }
 
   protected void startParticipant(String zkAddr, int i) {
+    if (_participants[i] != null) {
+      stopParticipant(i);
+    }
+
     Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
     String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
@@ -168,7 +173,7 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
 
     // Register a Task state model factory.
     StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-    stateMachine.registerStateModelFactory("Task",
+    stateMachine.registerStateModelFactory(TaskConstants.STATE_MODEL_NAME,
         new TaskStateModelFactory(_participants[i], taskFactoryReg));
     _participants[i].syncStart();
   }
@@ -181,12 +186,23 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
 
   protected void stopParticipant(int i) {
     if (_participants.length <= i) {
-      throw new HelixException(
-          String.format("Can't stop participant %s, only %s participants" + "were set up.", i,
+      throw new HelixException(String
+          .format("Can't stop participant %s, only %s participants" + "were set up.", i,
               _participants.length));
     }
-    if (_participants[i] != null && _participants[i].isConnected()) {
-      _participants[i].syncStop();
+    if (_participants[i] != null) {
+      if (_participants[i].isConnected()) {
+        _participants[i].syncStop();
+      }
+      // Shutdown the state model factories to close all threads.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      if (stateMachine != null) {
+        StateModelFactory stateModelFactory =
+            stateMachine.getStateModelFactory(TaskConstants.STATE_MODEL_NAME);
+        if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
+          ((TaskStateModelFactory) stateModelFactory).shutdownNow();
+        }
+      }
     }
   }