You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2022/11/21 19:11:59 UTC
[airavata] branch develop updated: Fix monitoring issues
This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ec0e93da6 Fix monitoring issues
new 229e943e0a Merge pull request #339 from isururanawaka/metaschedular
3ec0e93da6 is described below
commit 3ec0e93da6ae3f512ea18a203b2ae209919154cd
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Mon Nov 21 14:11:08 2022 -0500
Fix monitoring issues
---
.../org/apache/airavata/agents/api/AgentUtils.java | 7 +-
.../airavata/metascheduler/core/utils/Utils.java | 8 +-
.../resource/monitoring/job/MonitoringJob.java | 190 +++++++++++----------
.../monitoring/job/output/OutputParser.java | 4 +-
.../monitoring/job/output/OutputParserImpl.java | 31 +++-
.../resource/monitoring/utils/Constants.java | 1 +
6 files changed, 141 insertions(+), 100 deletions(-)
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
index fa6c4481b9..cb8b716f69 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
@@ -16,16 +16,19 @@ public class AgentUtils {
try {
final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
final String serverHost = ServerSettings.getRegistryServerHost();
- return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
+
+ return RegistryServiceClientFactory.createRegistryClient(serverHost,
+ serverPort);
} catch (RegistryServiceException | ApplicationSettingsException e) {
throw new AgentException("Unable to create registry client...", e);
}
+
}
public static CredentialStoreService.Client getCredentialClient() throws AgentException {
try {
final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
- final String serverHost =ServerSettings.getCredentialStoreServerHost();
+ final String serverHost = ServerSettings.getCredentialStoreServerHost();
return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
} catch (CredentialStoreException | ApplicationSettingsException e) {
throw new AgentException("Unable to create credential client...", e);
diff --git a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 493aa482e6..0e9c787b86 100644
--- a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -25,13 +25,13 @@ public class Utils {
return registryClientPool;
}
try {
- final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
- final String serverHost = ServerSettings.getRegistryServerHost();
+// final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
+// final String serverHost = ServerSettings.getRegistryServerHost();
registryClientPool = new ThriftClientPool<>(
tProtocol -> new RegistryService.Client(tProtocol),
Utils.<RegistryService.Client>createGenericObjectPoolConfig(),
- serverHost,
- serverPort);
+ "149.165.153.112",
+ 8970);
return registryClientPool;
} catch (Exception e) {
throw new RuntimeException("Unable to create registry client...", e);
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
index 235fb2263c..463594c129 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
@@ -9,7 +9,7 @@ import org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ComputeResourcePolicy;
import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
import org.apache.airavata.model.status.QueueStatusModel;
import org.apache.airavata.registry.api.RegistryService;
@@ -37,11 +37,10 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
RegistryService.Client client = null;
try {
- LOGGER.info("Executing ComputeResources ....... ");
+ LOGGER.debug("Executing ComputeResources Monitoring Job....... ");
client = this.registryClientPool.getResource();
- AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
String metaSchedulerGateway = jobDataMap.getString(Constants.METASCHEDULER_GATEWAY);
@@ -50,29 +49,12 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
int jobId = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOB_ID);
int parallelJobs = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOBS);
+ LOGGER.debug("Main Gateway:"+metaSchedulerGateway+" Group Resource Profile: "
+ +metaSchedulerGRP+" username: "+username+" jobId: "+jobId+" parallellJobs: "+parallelJobs);
- GroupResourceProfile groupResourceProfile = getGroupResourceProfile(metaSchedulerGRP);
- List<GroupComputeResourcePreference> computeResourcePreferenceList = groupResourceProfile.getComputePreferences();
+ executeComputeResourceMonitoring(client, metaSchedulerGateway, username, metaSchedulerGRP, parallelJobs, jobId);
- int size = computeResourcePreferenceList.size();
- int chunkSize = size / parallelJobs;
-
- int startIndex = jobId * chunkSize;
-
- int endIndex = (jobId + 1) * chunkSize;
-
- if (jobId == parallelJobs - 1) {
- endIndex = size;
- }
-
- List<GroupComputeResourcePreference> computeResourcePreferences = computeResourcePreferenceList
- .subList(startIndex, endIndex);
-
- for (GroupComputeResourcePreference computeResourcePreference : computeResourcePreferences) {
- LOGGER.info("updating GRP########### PRID:"+computeResourcePreference.getComputeResourceId());
- updateComputeResource(client, adaptorSupport, metaSchedulerGateway, username, metaSchedulerGRP, computeResourcePreference);
- }
} catch (Exception ex) {
String msg = "Error occurred while executing job" + ex.getMessage();
LOGGER.error(msg, ex);
@@ -85,13 +67,39 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
}
+ private void executeComputeResourceMonitoring(RegistryService.Client client, String metaSchedulerGateway, String username,
+ String metaSchedulerGRP, int parallelJobs, int jobId) throws Exception {
+ AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
+ GroupResourceProfile groupResourceProfile = getGroupResourceProfile(metaSchedulerGRP);
+// List<GroupComputeResourcePreference> computeResourcePreferenceList = groupResourceProfile.getComputePreferences();
+
+
+ int size = groupResourceProfile.getComputeResourcePoliciesSize();
+
+ int chunkSize = size / parallelJobs;
+
+ int startIndex = jobId * chunkSize;
+
+ int endIndex = (jobId + 1) * chunkSize;
+
+ if (jobId == parallelJobs - 1) {
+ endIndex = size;
+ }
+
+ List<ComputeResourcePolicy> computeResourcePolicyList = groupResourceProfile.getComputeResourcePolicies().
+ subList(startIndex, endIndex);
+
+ for (ComputeResourcePolicy computeResourcePolicy : computeResourcePolicyList) {
+ updateComputeResource(client, adaptorSupport, metaSchedulerGateway, username, computeResourcePolicy);
+ }
+ }
+
private void updateComputeResource(RegistryService.Client client, AdaptorSupport adaptorSupport,
String gatewayId,
String username,
- String groupResourceProfileId,
- GroupComputeResourcePreference computeResourcePreference) throws Exception {
- String computeResourceId = computeResourcePreference.getComputeResourceId();
+ ComputeResourcePolicy computeResourcePolicy) throws Exception {
+ String computeResourceId = computeResourcePolicy.getComputeResourceId();
ComputeResourceDescription comResourceDes = client.getComputeResource(computeResourceId);
List<JobSubmissionInterface> jobSubmissionInterfaces = comResourceDes.getJobSubmissionInterfaces();
Collections.sort(jobSubmissionInterfaces, Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
@@ -100,9 +108,8 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
ResourceJobManager resourceJobManager = JobFactory.getResourceJobManager(client, jobSubmissionProtocol, jobSubmissionInterface);
- LOGGER.info(" type "+ resourceJobManager.getResourceJobManagerType()+" jobSubmissionProtocol "+jobSubmissionProtocol);
//TODO: intial phase we are only supporting SLURM
- if (resourceJobManager.getResourceJobManagerType().equals("SLURM")) {
+ if (resourceJobManager.getResourceJobManagerType().name().equals("SLURM")) {
String baseCommand = "sinfo";
if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_CLUSTER_INFO)) {
@@ -110,75 +117,84 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
}
+ List<String> allowedBatchQueues = computeResourcePolicy.getAllowedBatchQueues();
+ List<QueueStatusModel> queueStatusModels = new ArrayList<>();
+ for (String queue : allowedBatchQueues) {
- String finalCommand = baseCommand + "-p" + computeResourcePreference.getPreferredBatchQueue();
-
- String computeResourceToken = getComputeResourceCredentialToken(
- gatewayId,
- username,
- computeResourceId,
- false,
- true,
- groupResourceProfileId);
-
- String loginUsername = getComputeResourceLoginUserName(gatewayId,
- username,
- computeResourceId,
- false,
- true,
- groupResourceProfileId,
- null);
-
- AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
- computeResourceId,
- jobSubmissionProtocol,
- computeResourceToken,
- loginUsername);
-
- CommandOutput commandOutput = adaptor.executeCommand(finalCommand, null);
-
- OutputParser outputParser = new OutputParserImpl();
- boolean queueStatus = false;
- int runningJobs = 0;
- int pendingJobs = 0;
- LOGGER.info("command output"+commandOutput.getStdOut()+" error "+commandOutput.getStdError()+" exist code "+commandOutput.getExitCode());
- if (outputParser.isComputeResourceAvailable(commandOutput)) {
- queueStatus = true;
-
- String runningJobCommand = "squeue";
- String pendingJobCommand = "squeue";
- if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS)) {
- runningJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
- }
+ String finalCommand = baseCommand + " -p " + queue;
- if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS)) {
- pendingJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
- }
+ String computeResourceToken = getComputeResourceCredentialToken(
+ gatewayId,
+ username,
+ computeResourceId,
+ false,
+ true,
+ computeResourcePolicy.getGroupResourceProfileId());
- String runningJobsCommand = runningJobCommand + "-h -t running -r | wc -l";
- String pendingJobsCommand = pendingJobCommand + "-h -t pending -r | wc -l";
+ String loginUsername = getComputeResourceLoginUserName(gatewayId,
+ username,
+ computeResourceId,
+ false,
+ true,
+ computeResourcePolicy.getGroupResourceProfileId(),
+ null);
- CommandOutput runningJobsCommandOutput = adaptor.executeCommand(runningJobsCommand, null);
+ AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
+ computeResourceId,
+ jobSubmissionProtocol,
+ computeResourceToken,
+ loginUsername);
- CommandOutput pendingJobsCommandOutput = adaptor.executeCommand(pendingJobsCommand, null);
+ CommandOutput commandOutput = adaptor.executeCommand(finalCommand, null);
- runningJobs = outputParser.getNumberofJobs(runningJobsCommandOutput);
- pendingJobs = outputParser.getNumberofJobs(pendingJobsCommandOutput);
+ OutputParser outputParser = new OutputParserImpl();
+ boolean queueStatus = false;
+ int runningJobs = 0;
+ int pendingJobs = 0;
- }
+ if (outputParser.isComputeResourceAvailable(commandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+ queueStatus = true;
- QueueStatusModel queueStatusModel = new QueueStatusModel();
- queueStatusModel.setHostName(comResourceDes.getHostName());
- queueStatusModel.setQueueName(computeResourcePreference.getPreferredBatchQueue());
- LOGGER.info("Storing hostname "+comResourceDes.getHostName()+" batch queue "+computeResourcePreference.getPreferredBatchQueue());
- queueStatusModel.setQueueUp(queueStatus);
- queueStatusModel.setRunningJobs(runningJobs);
- queueStatusModel.setQueuedJobs(pendingJobs);
- List<QueueStatusModel> queueStatusModels = new ArrayList<>();
- queueStatusModels.add(queueStatusModel);
+ String runningJobCommand = "squeue";
+ String pendingJobCommand = "squeue";
+ if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS)) {
+ runningJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
+ }
- client.registerQueueStatuses(queueStatusModels);
+ if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS)) {
+ pendingJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
+ }
+
+ String runningJobsCommand = runningJobCommand + "-h -t running -r | wc -l";
+ String pendingJobsCommand = pendingJobCommand + "-h -t pending -r | wc -l";
+
+ CommandOutput runningJobsCommandOutput = adaptor.executeCommand(runningJobsCommand, null);
+
+ CommandOutput pendingJobsCommandOutput = adaptor.executeCommand(pendingJobsCommand, null);
+ runningJobs = outputParser.getNumberofJobs(runningJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+ pendingJobs = outputParser.getNumberofJobs(pendingJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+
+ }
+
+ QueueStatusModel queueStatusModel = new QueueStatusModel();
+ queueStatusModel.setHostName(comResourceDes.getHostName());
+ queueStatusModel.setQueueName(queue);
+ queueStatusModel.setQueueUp(queueStatus);
+ queueStatusModel.setRunningJobs(runningJobs);
+ queueStatusModel.setQueuedJobs(pendingJobs);
+ queueStatusModels.add(queueStatusModel);
+ queueStatusModel.setTime(System.currentTimeMillis());
+ }
+ client.registerQueueStatuses(queueStatusModels);
}
}
+
+ public static void main(String[] args) throws Exception {
+ MonitoringJob monitoringJob = new MonitoringJob();
+ monitoringJob.executeComputeResourceMonitoring(monitoringJob.registryClientPool.getResource(), "seagrid", "metascheacc",
+ "a2076a5a-0fbf-44f4-9d47-060153bc578b", 1, 0);
+ }
}
+
+
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
index a06b9b5e02..b984940886 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
@@ -8,9 +8,9 @@ import org.apache.airavata.agents.api.CommandOutput;
public interface OutputParser {
- boolean isComputeResourceAvailable(CommandOutput commandOutput);
+ boolean isComputeResourceAvailable(CommandOutput commandOutput, String type);
- int getNumberofJobs(CommandOutput commandOutput);
+ int getNumberofJobs(CommandOutput commandOutput, String type);
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
index e8a11ba274..37e51d95a0 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
@@ -1,6 +1,9 @@
package org.apache.airavata.compute.resource.monitoring.job.output;
import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.compute.resource.monitoring.utils.Constants;
+
+import java.util.Scanner;
/**
* This is parser output implementation
@@ -9,17 +12,35 @@ public class OutputParserImpl implements OutputParser {
@Override
- public boolean isComputeResourceAvailable(CommandOutput commandOutput) {
- if (commandOutput.getExitCode() == 0) {
- return true;
+ public boolean isComputeResourceAvailable(CommandOutput commandOutput, String type) {
+ if (commandOutput.getStdOut() != null && !commandOutput.getStdOut().isEmpty()) {
+ if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+ Scanner scanner = new Scanner(commandOutput.getStdOut());
+ if (scanner.hasNextLine()) {
+ String firstLine = scanner.nextLine();
+ }
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ String[] splittedString = line.split(" ");
+ for (String splitted : splittedString) {
+ if (splitted.trim().equals("up")) {
+ return true;
+ }
+ }
+ }
+
+ }
+
}
return false;
}
@Override
- public int getNumberofJobs(CommandOutput commandOutput) {
+ public int getNumberofJobs(CommandOutput commandOutput, String type) {
if (commandOutput.getStdOut() != null && !commandOutput.getStdOut().isEmpty()) {
- return Integer.parseInt(commandOutput.getStdOut());
+ if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+ return Integer.parseInt(commandOutput.getStdOut().trim());
+ }
}
return 0;
}
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
index 61ae5f87f7..76d7d0afaf 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
@@ -11,4 +11,5 @@ public class Constants {
public static final String COMPUTE_RESOURCE_SCANNER_GROUP = "compute.resource.scanner.group";
public static final String COMPUTE_RESOURCE_SCANNER_TRIGGER = "compute.resource.scanner.trigger";
public static final String COMPUTE_RESOURCE_SCANNER_JOB = "compute.resource.scanner.job";
+ public static final String JOB_SUBMISSION_PROTOCOL_SLURM = "SLURM";
}