You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/01 16:12:27 UTC
[4/6] airavata git commit: Added job submission task and aurora
montoring services
Added job submission task and aurora montoring services
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/21b1923d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/21b1923d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/21b1923d
Branch: refs/heads/develop
Commit: 21b1923da2184d6df71a855caf9aea80ce33b300
Parents: 325d163
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 31 19:36:17 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 31 19:36:17 2016 -0400
----------------------------------------------------------------------
modules/cloud/aurora-client/pom.xml | 8 +
.../cloud/aurora/client/AuroraThriftClient.java | 2 +-
.../main/resources/aurora-scheduler.properties | 2 +-
modules/cloud/cloud-provisioning/pom.xml | 7 +
.../apache/airavata/gfac/core/GFacUtils.java | 88 ++++++-
modules/gfac/gfac-impl/pom.xml | 5 +
.../apache/airavata/gfac/impl/AuroraUtils.java | 30 +++
.../gfac/impl/task/AuroraJobSubmission.java | 146 +++++++++++
.../gfac/monitor/cloud/AuroraJobMonitor.java | 247 +++++++++++++++++++
.../core/utils/OrchestratorUtils.java | 16 ++
.../cpi/impl/SimpleOrchestratorImpl.java | 5 +-
pom.xml | 2 +-
12 files changed, 543 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/pom.xml
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/pom.xml b/modules/cloud/aurora-client/pom.xml
index 4cac7d9..8188c49 100644
--- a/modules/cloud/aurora-client/pom.xml
+++ b/modules/cloud/aurora-client/pom.xml
@@ -70,7 +70,15 @@
<directory>src/test/resources</directory>
</testResource>
</testResources>
+
<plugins>
+ <plugin>
+ <groupId>com.mycila</groupId>
+ <artifactId>license-maven-plugin</artifactId>
+ <configuration>
+ <header>../../../apache-license-header.txt</header>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
index 3fb2468..0e0b36e 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
@@ -193,7 +193,7 @@ public class AuroraThriftClient {
* @return the job details
* @throws Exception the exception
*/
- public ResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
+ public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
JobDetailsResponseBean response = null;
try {
if(jobKeyBean != null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
index 6cb1fe9..0e1cc95 100644
--- a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
+++ b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
@@ -1,6 +1,6 @@
#Aurora scheduler properties
-aurora.scheduler.host=mesos-master-1
+aurora.scheduler.host=52.15.129.208
aurora.scheduler.port=8081
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/cloud-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/modules/cloud/cloud-provisioning/pom.xml b/modules/cloud/cloud-provisioning/pom.xml
index c08cfb9..7981a10 100644
--- a/modules/cloud/cloud-provisioning/pom.xml
+++ b/modules/cloud/cloud-provisioning/pom.xml
@@ -76,6 +76,13 @@
</testResource>
</testResources>
<plugins>
+ <plugin>
+ <groupId>com.mycila</groupId>
+ <artifactId>license-maven-plugin</artifactId>
+ <configuration>
+ <header>../../../apache-license-header.txt</header>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index b69764e..66998c3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -21,10 +21,14 @@ package org.apache.airavata.gfac.core;
import groovy.lang.Writable;
import groovy.text.GStringTemplateEngine;
-import groovy.text.SimpleTemplateEngine;
import groovy.text.TemplateEngine;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.*;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -32,23 +36,54 @@ import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.CloudJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.data.replica.DataProductModel;
+import org.apache.airavata.model.data.replica.DataProductType;
+import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
+import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
+import org.apache.airavata.model.data.replica.ReplicaPersistentType;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.parallelism.ApplicationParallelismType;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
-import org.apache.airavata.model.status.*;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.JobSubmissionTaskModel;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.GwyResourceProfile;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -65,15 +100,35 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.xml.xpath.*;
-import java.io.*;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -374,6 +429,17 @@ public class GFacUtils {
}
}
+ public static CloudJobSubmission getCloudJobSubmission(String submissionId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+ return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+ log.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+
/**
* To convert list to separated value
*
@@ -700,7 +766,7 @@ public class GFacUtils {
return null;
}
- private static int generateJobName() {
+ public static int generateJobName() {
Random random = new Random();
int i = random.nextInt(Integer.MAX_VALUE);
i = i + 99999999;
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/pom.xml b/modules/gfac/gfac-impl/pom.xml
index 2a0a949..60e3259 100644
--- a/modules/gfac/gfac-impl/pom.xml
+++ b/modules/gfac/gfac-impl/pom.xml
@@ -122,5 +122,10 @@
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>aurora-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
new file mode 100644
index 0000000..4412694
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+public class AuroraUtils {
+
+ public static final String ENVIRONMENT = "devel";
+ public static final String ROLE = "centos";
+ public static final String AURORA_SCHEDULER_PROP_FILE = "aurora-scheduler.properties";
+ public static final String CLUSTER = "example";
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
new file mode 100644
index 0000000..c2e12d5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.IdentityBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.bean.ProcessBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResourceBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean;
+import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
+import org.apache.airavata.cloud.aurora.util.Constants;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AuroraJobSubmission implements JobSubmissionTask{
+
+ private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmission.class);
+
+ @Override
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+ return null;
+ }
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ jobModel.setTaskId(taskContext.getTaskId());
+ String jobIdAndName = "A" + GFacUtils.generateJobName();
+ jobModel.setJobName(jobIdAndName);
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(JobState.SUBMITTED);
+
+ try {
+ JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName);
+ IdentityBean owner = new IdentityBean(AuroraUtils.ROLE);
+ // only autodoc vina
+ String workingDir = taskContext.getWorkingDir();
+ ProcessBean proc1 = new ProcessBean("process_1", "mkdir -p " + workingDir, false);
+ ProcessBean proc2 = new ProcessBean("process_1", "cp -rf /home/centos/efs-mount-point/autodock-vina/* " + workingDir , false);
+ ProcessBean proc3 = new ProcessBean("process_2", "cd " + workingDir + " && ./vina_screenA.sh", false);
+ Set<ProcessBean> processes = new LinkedHashSet<>();
+ processes.add(proc1);
+ processes.add(proc2);
+ processes.add(proc3);
+
+ ResourceBean resources = new ResourceBean(1.5, 512, 512);
+
+ TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources);
+ JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER);
+
+ String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig);
+ log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson);
+
+ AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(Constants.AURORA_SCHEDULER_PROP_FILE);
+ ResponseBean response = client.createJob(jobConfig);
+ log.info("Response for job {}, {}", jobIdAndName, response);
+
+ jobModel.setJobId(jobIdAndName);
+ jobStatus.setReason("Successfully Submitted");
+ jobModel.setJobStatuses(Arrays.asList(jobStatus ));
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.getParentProcessContext().setJobModel(jobModel);
+
+ GFacUtils.saveJobModel(processContext, jobModel);
+ GFacUtils.saveJobStatus(processContext, jobModel);
+ taskStatus.setReason("Successfully submitted job to Aurora");
+ } catch (Exception e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ taskStatus.setState(TaskState.FAILED);
+ taskStatus.setReason(msg);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+
+ taskContext.setTaskStatus(taskStatus);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return taskStatus;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return null;
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
new file mode 100644
index 0000000..5fe9dd8
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.monitor.cloud;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.JobDetailsResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.sdk.ScheduledTask;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.gfac.impl.GFacWorker;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AuroraJobMonitor implements JobMonitor, Runnable {
+ private static final Logger log = LoggerFactory.getLogger(AuroraJobMonitor.class);
+
+
+
+ private static AuroraJobMonitor auroraJobMonitor;
+ private Timer timer;
+ private Map<String,TaskContext> jobMonitoringMap;
+ private AuroraJobMonitor(){
+ jobMonitoringMap = new ConcurrentHashMap<>();
+ timer = new Timer("Aurora status poll timer");
+
+ }
+
+ public static AuroraJobMonitor getInstance(){
+ if (auroraJobMonitor == null) {
+ synchronized (AuroraJobMonitor.class){
+ if (auroraJobMonitor == null) {
+ auroraJobMonitor = new AuroraJobMonitor();
+ }
+ }
+ }
+ return auroraJobMonitor;
+ }
+ @Override
+ public void run() {
+ AuroraTimer task = null;
+ try {
+ task = new AuroraTimer();
+ timer.schedule(task, 5000, 2000);
+ } catch (Exception e) {
+ log.error("Error couldn't run Aurora status poll timer task");
+ }
+ }
+
+ @Override
+ public void monitor(String jobId, TaskContext taskContext) {
+ jobMonitoringMap.put(jobId, taskContext);
+ log.info("Added JobId : {} to Aurora Job Monitoring map", jobId);
+ taskContext.getParentProcessContext().setPauseTaskExecution(true);
+
+ }
+
+ @Override
+ public void stopMonitor(String jobId, boolean runOutFlow) {
+ jobMonitoringMap.remove(jobId);
+ }
+
+ @Override
+ public boolean isMonitoring(String jobId) {
+ return jobMonitoringMap.get(jobId) != null;
+ }
+
+ @Override
+ public void canceledJob(String jobId) {
+ throw new IllegalStateException("Method not yet implemented");
+ }
+
+ class AuroraTimer extends TimerTask {
+
+ AuroraThriftClient client;
+ public AuroraTimer() throws Exception {
+ client = AuroraThriftClient.getAuroraThriftClient(AuroraUtils.AURORA_SCHEDULER_PROP_FILE);
+
+ }
+
+
+ @Override
+
+ public void run() {
+ JobKeyBean jobKeyBean = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, "dummy");
+ Iterator<Map.Entry<String, TaskContext>> iterator = jobMonitoringMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, TaskContext> currentEntry = iterator.next();
+ try {
+ jobKeyBean.setName(currentEntry.getKey());
+ JobDetailsResponseBean jobDetailsResponseBean = client.getJobDetails(jobKeyBean);
+ List<ScheduledTask> tasks = jobDetailsResponseBean.getTasks();
+ switch (tasks.get(0).getStatus()) {
+ case FINISHED:
+ iterator.remove();
+ processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.COMPLETE);
+ break;
+ case FAILED:
+ iterator.remove();
+ processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.FAILED);
+ break;
+ case RUNNING:
+ updateStatus(currentEntry.getKey(), currentEntry.getValue(), JobState.ACTIVE);
+ break;
+ default:
+ log.info("Job {} is in {} state", currentEntry.getKey(), tasks.get(0).getStatus().name());
+ break;
+ }
+ } catch (Exception e) {
+ log.error("Error while getting response for job : {}", currentEntry.getKey());
+
+ }
+ }
+ }
+
+ private void updateStatus(String jobKey, TaskContext taskContext, JobState jobState) {
+ ProcessContext pc = taskContext.getParentProcessContext();
+ ExperimentCatalog experimentCatalog = pc.getExperimentCatalog();
+ List<Object> objects = null;
+ try {
+ objects = experimentCatalog.get(ExperimentCatalogModelType.JOB_STATUS, taskContext.getTaskId(), jobKey);
+ } catch (RegistryException e) {
+ log.error("Error while getting job statuses for job : {} , task : {}, process : {}", jobKey,
+ taskContext.getTaskId(), pc.getProcessId());
+ }
+ List<JobState> jobStatuses = objects.stream()
+ .map(o -> ((JobStatus) o).getJobState())
+ .collect(Collectors.toList());
+ if (!jobStatuses.contains(jobState)) {
+ JobStatus jobStatus = new JobStatus(jobState);
+ jobStatus.setReason("Aurora return " + jobState.name());
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ JobModel jobModel = pc.getJobModel();
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ try {
+ GFacUtils.saveJobStatus(pc, jobModel);
+ } catch (GFacException e) {
+ log.error("Error while saving job status {}, job : {}, task :{}, process:{} exp:{}",
+ jobState.name(), jobKey, taskContext.getTaskId(), pc.getProcessId(), pc.getExperimentId());
+ }
+ }
+ }
+
+ private void processJob(String jobKey, TaskContext taskContext, JobState jobState) {
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobState(jobState);
+ if (jobState == JobState.COMPLETE) {
+ jobStatus.setReason("Aurora Job completed");
+ } else if (jobState == JobState.FAILED) {
+ jobStatus.setReason("Aurora Job Failed");
+ }
+ ProcessContext pc = taskContext.getParentProcessContext();
+ JobModel jobModel = pc.getJobModel();
+ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ jobModel.setJobStatuses(Arrays.asList(jobStatus));
+ try {
+ GFacUtils.saveJobStatus(pc, jobModel);
+ } catch (GFacException e) {
+ log.error("Error while saving job status for job : {} ", jobKey);
+ }
+
+ TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status for exp : {} , process : {} , task : {} , job : {}",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), jobKey);
+ }
+
+ if (pc.isCancel()) {
+ ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING);
+ processStatus.setReason("Process has been cancelled");
+ pc.setProcessStatus(processStatus);
+ try {
+ GFacUtils.saveAndPublishProcessStatus(pc);
+ } catch (GFacException e) {
+ log.error("Error while cancelling process, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId());
+ }
+ }
+
+ try {
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(pc));
+ } catch (GFacException e) {
+ log.error("Error while running output tasks for exp : {} , process : {}", taskContext.getExperimentId(), pc.getProcessId());
+
+ ProcessStatus processStatus = new ProcessStatus(ProcessState.FAILED);
+ processStatus.setReason("Failed to run output tasks");
+ processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ pc.setProcessStatus(processStatus);
+ try {
+ GFacUtils.saveAndPublishProcessStatus(pc);
+ } catch (GFacException ex) {
+ log.error("Error while updating process status to FAILED, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId());
+ }
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 74bd2db..83c9273 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -262,6 +262,11 @@ public class OrchestratorUtils {
if (sshJobSubmission != null) {
return sshJobSubmission.getSecurityProtocol();
}
+ } else if (submissionProtocol == JobSubmissionProtocol.CLOUD) {
+ CloudJobSubmission cloudJobSubmission = getCloudJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (cloudJobSubmission != null) {
+ return cloudJobSubmission.getSecurityProtocol();
+ }
}
} catch (RegistryException e) {
logger.error("Error occurred while retrieving security protocol", e);
@@ -302,6 +307,17 @@ public class OrchestratorUtils {
}
}
+ public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+
public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
try {
AppCatalog appCatalog = context.getRegistry().getAppCatalog();
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index dec9b2c..b97e79a 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -481,6 +481,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
monitorMode = MonitorMode.FORK;
} else if(jobSubmissionProtocol == JobSubmissionProtocol.LOCAL){
monitorMode = MonitorMode.LOCAL;
+ } else if (jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) {
+ monitorMode = MonitorMode.CLOUD_JOB_MONITOR;
}else {
logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
@@ -507,7 +509,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
submissionTaskIds.add(taskModel.getTaskId());
// create monitor task for this Email based monitor mode job
- if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR
+ || monitorMode == MonitorMode.CLOUD_JOB_MONITOR) {
TaskModel monitorTaskModel = new TaskModel();
monitorTaskModel.setParentProcessId(processModel.getProcessId());
monitorTaskModel.setCreationTime(new Date().getTime());
http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71fc9ba..81b258f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -626,7 +626,7 @@
<module>modules/orchestrator</module>
<module>modules/monitoring</module>
<module>modules/user-profile</module>
- <!--<module>modules/cloud</module>-->
+ <module>modules/cloud</module>
<module>modules/server</module>
<module>modules/workflow</module>
<module>modules/test-suite</module>