You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/08/16 18:24:04 UTC
[helix] branch master updated: Add TF Available Threads Metrics
(#1834)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 698bee8 Add TF Available Threads Metrics (#1834)
698bee8 is described below
commit 698bee866d99cbb141bbbdfc581e2a98e2128ddf
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Aug 16 11:22:49 2021 -0700
Add TF Available Threads Metrics (#1834)
Add metrics about Task Framework available threads in the cluster per job type.
---
.../stages/task/TaskSchedulingStage.java | 10 +++-
.../monitoring/mbeans/ClusterStatusMonitor.java | 12 +++++
.../apache/helix/monitoring/mbeans/JobMonitor.java | 11 ++++
.../helix/task/AssignableInstanceManager.java | 8 +++
.../mbeans/TestClusterStatusMonitor.java | 63 ++++++++++++++++++++++
5 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 9746bf9..dbedf7b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -81,13 +81,19 @@ public class TaskSchedulingStage extends AbstractBaseStage {
// Reset current INIT/RUNNING tasks on participants for throttling
cache.resetActiveTaskCount(currentStateOutput);
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
buildQuotaBasedWorkflowPQsAndInitDispatchers(cache,
- (HelixManager) event.getAttribute(AttributeName.helixmanager.name()),
- (ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()));
+ (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), clusterStatusMonitor);
final BestPossibleStateOutput bestPossibleStateOutput =
compute(event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.updateAvailableThreadsPerJob(cache.getAssignableInstanceManager()
+ .getGlobalCapacityMap());
+ }
}
private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index a9c1811..621710e 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -757,6 +757,18 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
/**
+ * For each JobType, report their total available threads across all instances to corresponding
+ * jobMonitors
+ * @param threadCapacityMap
+ */
+ public void updateAvailableThreadsPerJob(Map<String, Integer> threadCapacityMap) {
+ for (String jobType : threadCapacityMap.keySet()) {
+ JobMonitor jobMonitor = getJobMonitor(jobType);
+ jobMonitor.updateAvailableThreadGauge((long) threadCapacityMap.get(jobType));
+ }
+ }
+
+ /**
* TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because ClusterStatusMonitor is
* getting too big.
* Returns the appropriate JobMonitor for the given type. If it does not exist, create one and
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 241199e..2ab407a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -57,6 +57,7 @@ public class JobMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _existingJobGauge;
private SimpleDynamicMetric<Long> _queuedJobGauge;
private SimpleDynamicMetric<Long> _runningJobGauge;
+ private SimpleDynamicMetric<Long> _availableThreadGauge;
@Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
private SimpleDynamicMetric<Long> _maximumJobLatencyGauge;
@Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
@@ -81,6 +82,7 @@ public class JobMonitor extends DynamicMBeanProvider {
_existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L);
_queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L);
_runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L);
+ _availableThreadGauge = new SimpleDynamicMetric("AvailableThreadGauge", 0L);
_maximumJobLatencyGauge = new SimpleDynamicMetric("MaximumJobLatencyGauge", 0L);
_jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L);
@@ -159,6 +161,14 @@ public class JobMonitor extends DynamicMBeanProvider {
}
/**
+ * Update the available thread count to the AvailableThreadGauge
+ * @param availableThreads
+ */
+ public void updateAvailableThreadGauge(long availableThreads) {
+ _availableThreadGauge.updateValue(availableThreads);
+ }
+
+ /**
* Update SubmissionToProcessDelay to its corresponding HistogramDynamicMetric.
* @param delay
*/
@@ -196,6 +206,7 @@ public class JobMonitor extends DynamicMBeanProvider {
attributeList.add(_existingJobGauge);
attributeList.add(_queuedJobGauge);
attributeList.add(_runningJobGauge);
+ attributeList.add(_availableThreadGauge);
attributeList.add(_maximumJobLatencyGauge);
attributeList.add(_jobLatencyCount);
attributeList.add(_jobLatencyGauge);
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 88a12e9..d8b4820 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -461,6 +461,14 @@ public class AssignableInstanceManager {
}
/**
+ * Returns a mapping of: jobType -> available threads in all instances for this jobType
+ * @return globalThreadBasedQuotaMap
+ */
+ public Map<String, Integer> getGlobalCapacityMap() {
+ return Collections.unmodifiableMap(_globalThreadBasedQuotaMap);
+ }
+
+ /**
* Check remained global quota of certain quota type for skipping redundant computation
* @param quotaType
* @return
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 9958b68..b083786 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -44,6 +44,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.helix.TestHelper;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.AssignableInstanceManager;
+import org.apache.helix.task.assigner.TaskAssignResult;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -56,10 +61,13 @@ import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.tools.StateModelConfigGenerator;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Sets;
+import static org.mockito.Mockito.when;
+
public class TestClusterStatusMonitor {
private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
@@ -494,6 +502,61 @@ public class TestClusterStatusMonitor {
}
}
+ @Test
+ public void testRecordAvailableThreadsPerType() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ monitor.active();
+ ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+ Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ LiveInstance liveInstance = new LiveInstance(instanceName);
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ liveInstanceMap.put(instanceName, liveInstance);
+ instanceConfigMap.put(instanceName, instanceConfig);
+ }
+
+ ClusterConfig clusterConfig = new ClusterConfig(clusterName);
+ clusterConfig.resetTaskQuotaRatioMap();
+ clusterConfig.setTaskQuotaRatio("type1", 30);
+ clusterConfig.setTaskQuotaRatio("type2", 10);
+
+ TaskDataCache taskDataCache = Mockito.mock(TaskDataCache.class);
+ when(taskDataCache.getJobConfigMap()).thenReturn(Collections.emptyMap());
+
+ AssignableInstanceManager assignableInstanceManager = new AssignableInstanceManager();
+ assignableInstanceManager.buildAssignableInstances(clusterConfig, taskDataCache,
+ liveInstanceMap, instanceConfigMap);
+
+ monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+ ObjectName type1ObjectName = monitor.getObjectName(monitor.getJobBeanName("type1"));
+ ObjectName type2ObjectName = monitor.getObjectName(monitor.getJobBeanName("type2"));
+ Assert.assertTrue(_server.isRegistered(type1ObjectName));
+ Assert.assertEquals(_server.getAttribute(type1ObjectName, "AvailableThreadGauge"), 90L);
+ Assert.assertTrue(_server.isRegistered(type2ObjectName));
+ Assert.assertEquals(_server.getAttribute(type2ObjectName, "AvailableThreadGauge"), 30L);
+
+ TaskAssignResult taskAssignResult = Mockito.mock(TaskAssignResult.class);
+ when(taskAssignResult.getQuotaType()).thenReturn("type1");
+ // Use non-existing instance to bypass the actual assignment, but still decrease thread counts
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+ // Do it twice for type 1
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+ when(taskAssignResult.getQuotaType()).thenReturn("type2");
+ assignableInstanceManager.assign("UnknownInstance", taskAssignResult);
+
+ monitor.updateAvailableThreadsPerJob(assignableInstanceManager.getGlobalCapacityMap());
+ Assert.assertEquals(_server.getAttribute(type1ObjectName, "AvailableThreadGauge"), 88L);
+ Assert.assertEquals(_server.getAttribute(type2ObjectName, "AvailableThreadGauge"), 29L);
+ }
+
private void verifyCapacityMetrics(ClusterStatusMonitor monitor, Map<String, Double> maxUsageMap,
Map<String, Map<String, Integer>> instanceCapacityMap)
throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,