You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/08/16 18:24:04 UTC

[helix] branch master updated: Add TF Available Threads Metrics (#1834)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 698bee8  Add TF Available Threads Metrics (#1834)
698bee8 is described below

commit 698bee866d99cbb141bbbdfc581e2a98e2128ddf
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Aug 16 11:22:49 2021 -0700

    Add TF Available Threads Metrics (#1834)
    
    Add metrics about Task Framework available threads in the cluster per job type.
---
 .../stages/task/TaskSchedulingStage.java           | 10 +++-
 .../monitoring/mbeans/ClusterStatusMonitor.java    | 12 +++++
 .../apache/helix/monitoring/mbeans/JobMonitor.java | 11 ++++
 .../helix/task/AssignableInstanceManager.java      |  8 +++
 .../mbeans/TestClusterStatusMonitor.java           | 63 ++++++++++++++++++++++
 5 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 9746bf9..dbedf7b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -81,13 +81,19 @@ public class TaskSchedulingStage extends AbstractBaseStage {
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
     buildQuotaBasedWorkflowPQsAndInitDispatchers(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()),
-        (ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()));
+        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), clusterStatusMonitor);
 
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.updateAvailableThreadsPerJob(cache.getAssignableInstanceManager()
+          .getGlobalCapacityMap());
+    }
   }
 
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index a9c1811..621710e 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -757,6 +757,18 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
+   * For each JobType, report their total available threads across all instances to corresponding
+   * jobMonitors
+   * @param threadCapacityMap
+   */
+  public void updateAvailableThreadsPerJob(Map<String, Integer> threadCapacityMap) {
+    for (String jobType : threadCapacityMap.keySet()) {
+      JobMonitor jobMonitor = getJobMonitor(jobType);
+      jobMonitor.updateAvailableThreadGauge((long) threadCapacityMap.get(jobType));
+    }
+  }
+
+  /**
    * TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because ClusterStatusMonitor is
    * getting too big.
    * Returns the appropriate JobMonitor for the given type. If it does not exist, create one and
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 241199e..2ab407a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -57,6 +57,7 @@ public class JobMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _existingJobGauge;
   private SimpleDynamicMetric<Long> _queuedJobGauge;
   private SimpleDynamicMetric<Long> _runningJobGauge;
+  private SimpleDynamicMetric<Long> _availableThreadGauge;
   @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
   private SimpleDynamicMetric<Long> _maximumJobLatencyGauge;
   @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
@@ -81,6 +82,7 @@ public class JobMonitor extends DynamicMBeanProvider {
     _existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L);
     _queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L);
     _runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L);
+    _availableThreadGauge = new SimpleDynamicMetric("AvailableThreadGauge", 0L);
     _maximumJobLatencyGauge = new SimpleDynamicMetric("MaximumJobLatencyGauge", 0L);
     _jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L);
 
@@ -159,6 +161,14 @@ public class JobMonitor extends DynamicMBeanProvider {
   }
 
   /**
+   * Update the available thread count to the AvailableThreadGauge
+   * @param availableThreads
+   */
+  public void updateAvailableThreadGauge(long availableThreads) {
+    _availableThreadGauge.updateValue(availableThreads);
+  }
+
+  /**
    * Update SubmissionToProcessDelay to its corresponding HistogramDynamicMetric.
    * @param delay
    */
@@ -196,6 +206,7 @@ public class JobMonitor extends DynamicMBeanProvider {
     attributeList.add(_existingJobGauge);
     attributeList.add(_queuedJobGauge);
     attributeList.add(_runningJobGauge);
+    attributeList.add(_availableThreadGauge);
     attributeList.add(_maximumJobLatencyGauge);
     attributeList.add(_jobLatencyCount);
     attributeList.add(_jobLatencyGauge);
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 88a12e9..d8b4820 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -461,6 +461,14 @@ public class AssignableInstanceManager {
   }
 
   /**
+   * Returns a mapping of: jobType -> available threads in all instances for this jobType
+   * @return globalThreadBasedQuotaMap
+   */
+  public Map<String, Integer> getGlobalCapacityMap() {
+    return Collections.unmodifiableMap(_globalThreadBasedQuotaMap);
+  }
+
+  /**
    * Check remained global quota of certain quota type for skipping redundant computation
    * @param quotaType
    * @return
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 9958b68..b083786 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -44,6 +44,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.TestHelper;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.AssignableInstanceManager;
+import org.apache.helix.task.assigner.TaskAssignResult;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -56,10 +61,13 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Sets;
 
+import static org.mockito.Mockito.when;
+
 
 public class TestClusterStatusMonitor {
   private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
@@ -494,6 +502,61 @@ public class TestClusterStatusMonitor {
     }
   }
 
+  @Test
+  public void testRecordAvailableThreadsPerType() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      LiveInstance liveInstance = new LiveInstance(instanceName);
+      InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+      liveInstanceMap.put(instanceName, liveInstance);
+      instanceConfigMap.put(instanceName, instanceConfig);
+    }
+
+    ClusterConfig clusterConfig = new ClusterConfig(clusterName);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio("type1", 30);
+    clusterConfig.setTaskQuotaRatio("type2", 10);
+
+    TaskDataCache taskDataCache = Mockito.mock(TaskDataCache.class);
+    when(taskDataCache.getJobConfigMap()).thenReturn(Collections.emptyMap());
+
+    AssignableInstanceManager assignableInstanceManager = new AssignableInstanceManager();
+    assignableInstanceManager.buildAssignableInstances(clusterConfig, taskDataCache,
+        liveInstanceMap, instanceConfigMap);
+
+    monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+    ObjectName type1ObjectName = monitor.getObjectName(monitor.getJobBeanName("type1"));
+    ObjectName type2ObjectName = monitor.getObjectName(monitor.getJobBeanName("type2"));
+    Assert.assertTrue(_server.isRegistered(type1ObjectName));
+    Assert.assertEquals(_server.getAttribute(type1ObjectName, "AvailableThreadGauge"), 90L);
+    Assert.assertTrue(_server.isRegistered(type2ObjectName));
+    Assert.assertEquals(_server.getAttribute(type2ObjectName, "AvailableThreadGauge"), 30L);
+
+    TaskAssignResult taskAssignResult = Mockito.mock(TaskAssignResult.class);
+    when(taskAssignResult.getQuotaType()).thenReturn("type1");
+    // Use non-existing instance to bypass the actual assignment, but still decrease thread counts
+    assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+    // Do it twice for type 1
+    assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+    when(taskAssignResult.getQuotaType()).thenReturn("type2");
+    assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+
+    monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+    Assert.assertEquals(_server.getAttribute(type1ObjectName, "AvailableThreadGauge"), 88L);
+    Assert.assertEquals(_server.getAttribute(type2ObjectName, "AvailableThreadGauge"), 29L);
+  }
+
   private void verifyCapacityMetrics(ClusterStatusMonitor monitor, Map<String, Double> maxUsageMap,
       Map<String, Map<String, Integer>> instanceCapacityMap)
       throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,