You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2022/11/21 19:11:59 UTC

[airavata] branch develop updated: Fix monitoring issues

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ec0e93da6 Fix monitoring issues
     new 229e943e0a Merge pull request #339 from isururanawaka/metaschedular
3ec0e93da6 is described below

commit 3ec0e93da6ae3f512ea18a203b2ae209919154cd
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Mon Nov 21 14:11:08 2022 -0500

    Fix monitoring issues
---
 .../org/apache/airavata/agents/api/AgentUtils.java |   7 +-
 .../airavata/metascheduler/core/utils/Utils.java   |   8 +-
 .../resource/monitoring/job/MonitoringJob.java     | 190 +++++++++++----------
 .../monitoring/job/output/OutputParser.java        |   4 +-
 .../monitoring/job/output/OutputParserImpl.java    |  31 +++-
 .../resource/monitoring/utils/Constants.java       |   1 +
 6 files changed, 141 insertions(+), 100 deletions(-)

diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
index fa6c4481b9..cb8b716f69 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
@@ -16,16 +16,19 @@ public class AgentUtils {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
             final String serverHost = ServerSettings.getRegistryServerHost();
-            return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
+
+            return RegistryServiceClientFactory.createRegistryClient(serverHost,
+                    serverPort);
         } catch (RegistryServiceException | ApplicationSettingsException e) {
             throw new AgentException("Unable to create registry client...", e);
         }
+
     }
 
     public static CredentialStoreService.Client getCredentialClient() throws AgentException {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
-            final String serverHost =ServerSettings.getCredentialStoreServerHost();
+            final String serverHost = ServerSettings.getCredentialStoreServerHost();
             return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
         } catch (CredentialStoreException | ApplicationSettingsException e) {
             throw new AgentException("Unable to create credential client...", e);
diff --git a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 493aa482e6..0e9c787b86 100644
--- a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -25,13 +25,13 @@ public class Utils {
             return registryClientPool;
         }
         try {
-            final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
-            final String serverHost = ServerSettings.getRegistryServerHost();
+//            final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
+//            final String serverHost = ServerSettings.getRegistryServerHost();
             registryClientPool = new ThriftClientPool<>(
                     tProtocol -> new RegistryService.Client(tProtocol),
                     Utils.<RegistryService.Client>createGenericObjectPoolConfig(),
-                    serverHost,
-                    serverPort);
+                    "149.165.153.112",
+                    8970);
             return registryClientPool;
         } catch (Exception e) {
             throw new RuntimeException("Unable to create registry client...", e);
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
index 235fb2263c..463594c129 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
@@ -9,7 +9,7 @@ import org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
 import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ComputeResourcePolicy;
 import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
 import org.apache.airavata.model.status.QueueStatusModel;
 import org.apache.airavata.registry.api.RegistryService;
@@ -37,11 +37,10 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         RegistryService.Client client = null;
         try {
-            LOGGER.info("Executing ComputeResources ....... ");
+            LOGGER.debug("Executing ComputeResources Monitoring Job....... ");
 
             client = this.registryClientPool.getResource();
 
-            AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
 
             JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
             String metaSchedulerGateway = jobDataMap.getString(Constants.METASCHEDULER_GATEWAY);
@@ -50,29 +49,12 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
             int jobId = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOB_ID);
             int parallelJobs = jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOBS);
 
+            LOGGER.debug("Main Gateway:"+metaSchedulerGateway+" Group Resource Profile: "
+                    +metaSchedulerGRP+" username: "+username+" jobId: "+jobId+" parallellJobs: "+parallelJobs);
 
-            GroupResourceProfile groupResourceProfile = getGroupResourceProfile(metaSchedulerGRP);
-            List<GroupComputeResourcePreference> computeResourcePreferenceList = groupResourceProfile.getComputePreferences();
+            executeComputeResourceMonitoring(client, metaSchedulerGateway, username, metaSchedulerGRP, parallelJobs, jobId);
 
-            int size = computeResourcePreferenceList.size();
 
-            int chunkSize = size / parallelJobs;
-
-            int startIndex = jobId * chunkSize;
-
-            int endIndex = (jobId + 1) * chunkSize;
-
-            if (jobId == parallelJobs - 1) {
-                endIndex = size;
-            }
-
-            List<GroupComputeResourcePreference> computeResourcePreferences = computeResourcePreferenceList
-                    .subList(startIndex, endIndex);
-
-            for (GroupComputeResourcePreference computeResourcePreference : computeResourcePreferences) {
-                LOGGER.info("updating GRP########### PRID:"+computeResourcePreference.getComputeResourceId());
-                updateComputeResource(client, adaptorSupport, metaSchedulerGateway, username, metaSchedulerGRP, computeResourcePreference);
-            }
         } catch (Exception ex) {
             String msg = "Error occurred while executing job" + ex.getMessage();
             LOGGER.error(msg, ex);
@@ -85,13 +67,39 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
 
     }
 
+    private void executeComputeResourceMonitoring(RegistryService.Client client, String metaSchedulerGateway, String username,
+                                                  String metaSchedulerGRP, int parallelJobs, int jobId) throws Exception {
+        AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
+        GroupResourceProfile groupResourceProfile = getGroupResourceProfile(metaSchedulerGRP);
+//        List<GroupComputeResourcePreference> computeResourcePreferenceList = groupResourceProfile.getComputePreferences();
+
+
+        int size = groupResourceProfile.getComputeResourcePoliciesSize();
+
+        int chunkSize = size / parallelJobs;
+
+        int startIndex = jobId * chunkSize;
+
+        int endIndex = (jobId + 1) * chunkSize;
+
+        if (jobId == parallelJobs - 1) {
+            endIndex = size;
+        }
+
+        List<ComputeResourcePolicy> computeResourcePolicyList = groupResourceProfile.getComputeResourcePolicies().
+                subList(startIndex, endIndex);
+
+        for (ComputeResourcePolicy computeResourcePolicy : computeResourcePolicyList) {
+            updateComputeResource(client, adaptorSupport, metaSchedulerGateway, username, computeResourcePolicy);
+        }
+    }
+
 
     private void updateComputeResource(RegistryService.Client client, AdaptorSupport adaptorSupport,
                                        String gatewayId,
                                        String username,
-                                       String groupResourceProfileId,
-                                       GroupComputeResourcePreference computeResourcePreference) throws Exception {
-        String computeResourceId = computeResourcePreference.getComputeResourceId();
+                                       ComputeResourcePolicy computeResourcePolicy) throws Exception {
+        String computeResourceId = computeResourcePolicy.getComputeResourceId();
         ComputeResourceDescription comResourceDes = client.getComputeResource(computeResourceId);
         List<JobSubmissionInterface> jobSubmissionInterfaces = comResourceDes.getJobSubmissionInterfaces();
         Collections.sort(jobSubmissionInterfaces, Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
@@ -100,9 +108,8 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
 
         ResourceJobManager resourceJobManager = JobFactory.getResourceJobManager(client, jobSubmissionProtocol, jobSubmissionInterface);
 
-        LOGGER.info(" type "+ resourceJobManager.getResourceJobManagerType()+" jobSubmissionProtocol "+jobSubmissionProtocol);
         //TODO: intial phase we are only supporting SLURM
-        if (resourceJobManager.getResourceJobManagerType().equals("SLURM")) {
+        if (resourceJobManager.getResourceJobManagerType().name().equals("SLURM")) {
             String baseCommand = "sinfo";
 
             if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_CLUSTER_INFO)) {
@@ -110,75 +117,84 @@ public class MonitoringJob extends ComputeResourceMonitor implements Job {
             }
 
 
+            List<String> allowedBatchQueues = computeResourcePolicy.getAllowedBatchQueues();
+            List<QueueStatusModel> queueStatusModels = new ArrayList<>();
+            for (String queue : allowedBatchQueues) {
 
-            String finalCommand = baseCommand + "-p" + computeResourcePreference.getPreferredBatchQueue();
-
-            String computeResourceToken = getComputeResourceCredentialToken(
-                    gatewayId,
-                    username,
-                    computeResourceId,
-                    false,
-                    true,
-                    groupResourceProfileId);
-
-            String loginUsername = getComputeResourceLoginUserName(gatewayId,
-                    username,
-                    computeResourceId,
-                    false,
-                    true,
-                    groupResourceProfileId,
-                    null);
-
-            AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
-                    computeResourceId,
-                    jobSubmissionProtocol,
-                    computeResourceToken,
-                    loginUsername);
-
-            CommandOutput commandOutput = adaptor.executeCommand(finalCommand, null);
-
-            OutputParser outputParser = new OutputParserImpl();
-            boolean queueStatus = false;
-            int runningJobs = 0;
-            int pendingJobs = 0;
-            LOGGER.info("command output"+commandOutput.getStdOut()+" error "+commandOutput.getStdError()+" exist code "+commandOutput.getExitCode());
-            if (outputParser.isComputeResourceAvailable(commandOutput)) {
-                queueStatus = true;
-
-                String runningJobCommand = "squeue";
-                String pendingJobCommand = "squeue";
-                if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS)) {
-                    runningJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
-                }
+                String finalCommand = baseCommand + " -p " + queue;
 
-                if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS)) {
-                    pendingJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
-                }
+                String computeResourceToken = getComputeResourceCredentialToken(
+                        gatewayId,
+                        username,
+                        computeResourceId,
+                        false,
+                        true,
+                        computeResourcePolicy.getGroupResourceProfileId());
 
-                String runningJobsCommand = runningJobCommand + "-h -t running -r | wc -l";
-                String pendingJobsCommand = pendingJobCommand + "-h -t pending -r | wc -l";
+                String loginUsername = getComputeResourceLoginUserName(gatewayId,
+                        username,
+                        computeResourceId,
+                        false,
+                        true,
+                        computeResourcePolicy.getGroupResourceProfileId(),
+                        null);
 
-                CommandOutput runningJobsCommandOutput = adaptor.executeCommand(runningJobsCommand, null);
+                AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
+                        computeResourceId,
+                        jobSubmissionProtocol,
+                        computeResourceToken,
+                        loginUsername);
 
-                CommandOutput pendingJobsCommandOutput = adaptor.executeCommand(pendingJobsCommand, null);
+                CommandOutput commandOutput = adaptor.executeCommand(finalCommand, null);
 
-                runningJobs = outputParser.getNumberofJobs(runningJobsCommandOutput);
-                pendingJobs = outputParser.getNumberofJobs(pendingJobsCommandOutput);
+                OutputParser outputParser = new OutputParserImpl();
+                boolean queueStatus = false;
+                int runningJobs = 0;
+                int pendingJobs = 0;
 
-            }
+                if (outputParser.isComputeResourceAvailable(commandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+                    queueStatus = true;
 
-            QueueStatusModel queueStatusModel = new QueueStatusModel();
-            queueStatusModel.setHostName(comResourceDes.getHostName());
-            queueStatusModel.setQueueName(computeResourcePreference.getPreferredBatchQueue());
-            LOGGER.info("Storing hostname "+comResourceDes.getHostName()+" batch queue "+computeResourcePreference.getPreferredBatchQueue());
-            queueStatusModel.setQueueUp(queueStatus);
-            queueStatusModel.setRunningJobs(runningJobs);
-            queueStatusModel.setQueuedJobs(pendingJobs);
-            List<QueueStatusModel> queueStatusModels = new ArrayList<>();
-            queueStatusModels.add(queueStatusModel);
+                    String runningJobCommand = "squeue";
+                    String pendingJobCommand = "squeue";
+                    if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS)) {
+                        runningJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
+                    }
 
-            client.registerQueueStatuses(queueStatusModels);
+                    if (resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS)) {
+                        pendingJobCommand = resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
+                    }
+
+                    String runningJobsCommand = runningJobCommand + "-h -t running -r | wc -l";
+                    String pendingJobsCommand = pendingJobCommand + "-h -t pending -r | wc -l";
+
+                    CommandOutput runningJobsCommandOutput = adaptor.executeCommand(runningJobsCommand, null);
+
+                    CommandOutput pendingJobsCommandOutput = adaptor.executeCommand(pendingJobsCommand, null);
 
+                    runningJobs = outputParser.getNumberofJobs(runningJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+                    pendingJobs = outputParser.getNumberofJobs(pendingJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+
+                }
+
+                QueueStatusModel queueStatusModel = new QueueStatusModel();
+                queueStatusModel.setHostName(comResourceDes.getHostName());
+                queueStatusModel.setQueueName(queue);
+                queueStatusModel.setQueueUp(queueStatus);
+                queueStatusModel.setRunningJobs(runningJobs);
+                queueStatusModel.setQueuedJobs(pendingJobs);
+                queueStatusModels.add(queueStatusModel);
+                queueStatusModel.setTime(System.currentTimeMillis());
+            }
+            client.registerQueueStatuses(queueStatusModels);
         }
     }
+
+    public static void main(String[] args) throws Exception {
+        MonitoringJob monitoringJob = new MonitoringJob();
+        monitoringJob.executeComputeResourceMonitoring(monitoringJob.registryClientPool.getResource(), "seagrid", "metascheacc",
+                "a2076a5a-0fbf-44f4-9d47-060153bc578b", 1, 0);
+    }
 }
+
+
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
index a06b9b5e02..b984940886 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
@@ -8,9 +8,9 @@ import org.apache.airavata.agents.api.CommandOutput;
 public interface OutputParser {
 
 
-    boolean isComputeResourceAvailable(CommandOutput commandOutput);
+    boolean isComputeResourceAvailable(CommandOutput commandOutput, String type);
 
-    int getNumberofJobs(CommandOutput commandOutput);
+    int getNumberofJobs(CommandOutput commandOutput, String type);
 
 
 
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
index e8a11ba274..37e51d95a0 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
@@ -1,6 +1,9 @@
 package org.apache.airavata.compute.resource.monitoring.job.output;
 
 import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.compute.resource.monitoring.utils.Constants;
+
+import java.util.Scanner;
 
 /**
  * This is parser output implementation
@@ -9,17 +12,35 @@ public class OutputParserImpl implements OutputParser {
 
 
     @Override
-    public boolean isComputeResourceAvailable(CommandOutput commandOutput) {
-        if (commandOutput.getExitCode() == 0) {
-            return true;
+    public boolean isComputeResourceAvailable(CommandOutput commandOutput, String type) {
+        if (commandOutput.getStdOut() != null && !commandOutput.getStdOut().isEmpty()) {
+            if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+                Scanner scanner = new Scanner(commandOutput.getStdOut());
+                if (scanner.hasNextLine()) {
+                    String firstLine = scanner.nextLine();
+                }
+                while (scanner.hasNextLine()) {
+                    String line = scanner.nextLine();
+                    String[] splittedString = line.split(" ");
+                    for (String splitted : splittedString) {
+                        if (splitted.trim().equals("up")) {
+                            return true;
+                        }
+                    }
+                }
+
+            }
+
         }
         return false;
     }
 
     @Override
-    public int getNumberofJobs(CommandOutput commandOutput) {
+    public int getNumberofJobs(CommandOutput commandOutput, String type) {
         if (commandOutput.getStdOut() != null && !commandOutput.getStdOut().isEmpty()) {
-            return Integer.parseInt(commandOutput.getStdOut());
+            if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+                return Integer.parseInt(commandOutput.getStdOut().trim());
+            }
         }
         return 0;
     }
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
index 61ae5f87f7..76d7d0afaf 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
@@ -11,4 +11,5 @@ public class Constants {
     public static final String COMPUTE_RESOURCE_SCANNER_GROUP = "compute.resource.scanner.group";
     public static final String COMPUTE_RESOURCE_SCANNER_TRIGGER = "compute.resource.scanner.trigger";
     public static  final String COMPUTE_RESOURCE_SCANNER_JOB = "compute.resource.scanner.job";
+    public static  final String JOB_SUBMISSION_PROTOCOL_SLURM = "SLURM";
 }