You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/01 16:12:27 UTC

[4/6] airavata git commit: Added job submission task and aurora montoring services

Added job submission task and aurora montoring services


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/21b1923d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/21b1923d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/21b1923d

Branch: refs/heads/develop
Commit: 21b1923da2184d6df71a855caf9aea80ce33b300
Parents: 325d163
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 31 19:36:17 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 31 19:36:17 2016 -0400

----------------------------------------------------------------------
 modules/cloud/aurora-client/pom.xml             |   8 +
 .../cloud/aurora/client/AuroraThriftClient.java |   2 +-
 .../main/resources/aurora-scheduler.properties  |   2 +-
 modules/cloud/cloud-provisioning/pom.xml        |   7 +
 .../apache/airavata/gfac/core/GFacUtils.java    |  88 ++++++-
 modules/gfac/gfac-impl/pom.xml                  |   5 +
 .../apache/airavata/gfac/impl/AuroraUtils.java  |  30 +++
 .../gfac/impl/task/AuroraJobSubmission.java     | 146 +++++++++++
 .../gfac/monitor/cloud/AuroraJobMonitor.java    | 247 +++++++++++++++++++
 .../core/utils/OrchestratorUtils.java           |  16 ++
 .../cpi/impl/SimpleOrchestratorImpl.java        |   5 +-
 pom.xml                                         |   2 +-
 12 files changed, 543 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/pom.xml
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/pom.xml b/modules/cloud/aurora-client/pom.xml
index 4cac7d9..8188c49 100644
--- a/modules/cloud/aurora-client/pom.xml
+++ b/modules/cloud/aurora-client/pom.xml
@@ -70,7 +70,15 @@
 				<directory>src/test/resources</directory>
 			</testResource>
 		</testResources>
+
 		<plugins>
+			<plugin>
+				<groupId>com.mycila</groupId>
+				<artifactId>license-maven-plugin</artifactId>
+				<configuration>
+					<header>../../../apache-license-header.txt</header>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
index 3fb2468..0e0b36e 100644
--- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
+++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
@@ -193,7 +193,7 @@ public class AuroraThriftClient {
 	 * @return the job details
 	 * @throws Exception the exception
 	 */
-	public ResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
+	public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception {
 		JobDetailsResponseBean response = null;
 		try {
 			if(jobKeyBean != null) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
----------------------------------------------------------------------
diff --git a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
index 6cb1fe9..0e1cc95 100644
--- a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
+++ b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties
@@ -1,6 +1,6 @@
 #Aurora scheduler properties
 
-aurora.scheduler.host=mesos-master-1
+aurora.scheduler.host=52.15.129.208
 
 aurora.scheduler.port=8081
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/cloud-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/modules/cloud/cloud-provisioning/pom.xml b/modules/cloud/cloud-provisioning/pom.xml
index c08cfb9..7981a10 100644
--- a/modules/cloud/cloud-provisioning/pom.xml
+++ b/modules/cloud/cloud-provisioning/pom.xml
@@ -76,6 +76,13 @@
 			</testResource>
 		</testResources>
 		<plugins>
+			<plugin>
+				<groupId>com.mycila</groupId>
+				<artifactId>license-maven-plugin</artifactId>
+				<configuration>
+					<header>../../../apache-license-header.txt</header>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index b69764e..66998c3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -21,10 +21,14 @@ package org.apache.airavata.gfac.core;
 
 import groovy.lang.Writable;
 import groovy.text.GStringTemplateEngine;
-import groovy.text.SimpleTemplateEngine;
 import groovy.text.TemplateEngine;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.*;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -32,23 +36,54 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.CloudJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.data.replica.DataProductModel;
+import org.apache.airavata.model.data.replica.DataProductType;
+import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
+import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
+import org.apache.airavata.model.data.replica.ReplicaPersistentType;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.parallelism.ApplicationParallelismType;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
-import org.apache.airavata.model.status.*;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.JobSubmissionTaskModel;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.GwyResourceProfile;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -65,15 +100,35 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.xml.xpath.*;
-import java.io.*;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.SecureRandom;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -374,6 +429,17 @@ public class GFacUtils {
         }
     }
 
+    public static CloudJobSubmission getCloudJobSubmission(String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+            return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            log.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
     /**
      * To convert list to separated value
      *
@@ -700,7 +766,7 @@ public class GFacUtils {
         return null;
     }
 
-    private static int generateJobName() {
+    public static int generateJobName() {
         Random random = new Random();
         int i = random.nextInt(Integer.MAX_VALUE);
         i = i + 99999999;

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/pom.xml b/modules/gfac/gfac-impl/pom.xml
index 2a0a949..60e3259 100644
--- a/modules/gfac/gfac-impl/pom.xml
+++ b/modules/gfac/gfac-impl/pom.xml
@@ -122,5 +122,10 @@
             <artifactId>commons-httpclient</artifactId>
             <version>3.1</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>aurora-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
new file mode 100644
index 0000000..4412694
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+public class AuroraUtils {
+
+    public static final String ENVIRONMENT = "devel";
+    public static final String ROLE = "centos";
+    public static final String AURORA_SCHEDULER_PROP_FILE = "aurora-scheduler.properties";
+    public static final String CLUSTER = "example";
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
new file mode 100644
index 0000000..c2e12d5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.IdentityBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.bean.ProcessBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResourceBean;
+import org.apache.airavata.cloud.aurora.client.bean.ResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean;
+import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
+import org.apache.airavata.cloud.aurora.util.Constants;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AuroraJobSubmission implements JobSubmissionTask{
+
+    private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmission.class);
+
+    @Override
+    public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+        return null;
+    }
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed.
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        JobModel jobModel = processContext.getJobModel();
+        jobModel.setTaskId(taskContext.getTaskId());
+        String jobIdAndName = "A" + GFacUtils.generateJobName();
+        jobModel.setJobName(jobIdAndName);
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobState(JobState.SUBMITTED);
+
+        try {
+            JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName);
+            IdentityBean owner = new IdentityBean(AuroraUtils.ROLE);
+            // only autodoc vina
+            String workingDir = taskContext.getWorkingDir();
+            ProcessBean proc1 = new ProcessBean("process_1", "mkdir -p " + workingDir, false);
+            ProcessBean proc2 = new ProcessBean("process_1", "cp -rf /home/centos/efs-mount-point/autodock-vina/* " + workingDir , false);
+            ProcessBean proc3 = new ProcessBean("process_2", "cd " + workingDir + " && ./vina_screenA.sh", false);
+            Set<ProcessBean> processes = new LinkedHashSet<>();
+            processes.add(proc1);
+            processes.add(proc2);
+            processes.add(proc3);
+
+            ResourceBean resources = new ResourceBean(1.5, 512, 512);
+
+            TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources);
+            JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER);
+
+            String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig);
+            log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson);
+
+            AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(Constants.AURORA_SCHEDULER_PROP_FILE);
+            ResponseBean response = client.createJob(jobConfig);
+            log.info("Response for job {}, {}", jobIdAndName, response);
+
+            jobModel.setJobId(jobIdAndName);
+            jobStatus.setReason("Successfully Submitted");
+            jobModel.setJobStatuses(Arrays.asList(jobStatus ));
+            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.getParentProcessContext().setJobModel(jobModel);
+
+            GFacUtils.saveJobModel(processContext, jobModel);
+            GFacUtils.saveJobStatus(processContext, jobModel);
+            taskStatus.setReason("Successfully submitted job to Aurora");
+        } catch (Exception e) {
+            String msg = "Error occurred while submitting the job";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+
+        taskContext.setTaskStatus(taskStatus);
+        try {
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+        } catch (GFacException e) {
+            log.error("Error while saving task status", e);
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return null;
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
new file mode 100644
index 0000000..5fe9dd8
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.monitor.cloud;
+
+import org.apache.airavata.cloud.aurora.client.AuroraThriftClient;
+import org.apache.airavata.cloud.aurora.client.bean.JobDetailsResponseBean;
+import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean;
+import org.apache.airavata.cloud.aurora.client.sdk.ScheduledTask;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.gfac.impl.AuroraUtils;
+import org.apache.airavata.gfac.impl.GFacWorker;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AuroraJobMonitor implements JobMonitor, Runnable {
+    private static final Logger log = LoggerFactory.getLogger(AuroraJobMonitor.class);
+
+
+
+    private static AuroraJobMonitor auroraJobMonitor;
+    private Timer timer;
+    private Map<String,TaskContext> jobMonitoringMap;
+    private AuroraJobMonitor(){
+        jobMonitoringMap = new ConcurrentHashMap<>();
+        timer = new Timer("Aurora status poll timer");
+
+    }
+
+    public static AuroraJobMonitor getInstance(){
+        if (auroraJobMonitor == null) {
+            synchronized (AuroraJobMonitor.class){
+                if (auroraJobMonitor == null) {
+                    auroraJobMonitor = new AuroraJobMonitor();
+                }
+            }
+        }
+        return auroraJobMonitor;
+    }
+    @Override
+    public void run() {
+        AuroraTimer task = null;
+        try {
+            task = new AuroraTimer();
+            timer.schedule(task, 5000, 2000);
+        } catch (Exception e) {
+            log.error("Error couldn't run Aurora status poll timer task");
+        }
+    }
+
+    @Override
+    public void monitor(String jobId, TaskContext taskContext) {
+        jobMonitoringMap.put(jobId, taskContext);
+        log.info("Added JobId : {} to Aurora Job Monitoring map", jobId);
+        taskContext.getParentProcessContext().setPauseTaskExecution(true);
+
+    }
+
+    @Override
+    public void stopMonitor(String jobId, boolean runOutFlow) {
+        jobMonitoringMap.remove(jobId);
+    }
+
+    @Override
+    public boolean isMonitoring(String jobId) {
+        return jobMonitoringMap.get(jobId) != null;
+    }
+
+    @Override
+    public void canceledJob(String jobId) {
+        throw new IllegalStateException("Method not yet implemented");
+    }
+
+    class AuroraTimer extends TimerTask {
+
+        AuroraThriftClient client;
+        public AuroraTimer() throws Exception {
+            client = AuroraThriftClient.getAuroraThriftClient(AuroraUtils.AURORA_SCHEDULER_PROP_FILE);
+
+        }
+
+
+        @Override
+
+        public void run() {
+            JobKeyBean jobKeyBean = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, "dummy");
+            Iterator<Map.Entry<String, TaskContext>> iterator = jobMonitoringMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, TaskContext> currentEntry = iterator.next();
+                try {
+                    jobKeyBean.setName(currentEntry.getKey());
+                    JobDetailsResponseBean jobDetailsResponseBean = client.getJobDetails(jobKeyBean);
+                    List<ScheduledTask> tasks = jobDetailsResponseBean.getTasks();
+                    switch (tasks.get(0).getStatus()) {
+                        case FINISHED:
+                            iterator.remove();
+                            processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.COMPLETE);
+                            break;
+                        case FAILED:
+                            iterator.remove();
+                            processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.FAILED);
+                            break;
+                        case RUNNING:
+                            updateStatus(currentEntry.getKey(), currentEntry.getValue(), JobState.ACTIVE);
+                            break;
+                        default:
+                            log.info("Job {} is in {} state", currentEntry.getKey(), tasks.get(0).getStatus().name());
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.error("Error while getting response for job : {}", currentEntry.getKey());
+
+                }
+            }
+        }
+
+        private void updateStatus(String jobKey, TaskContext taskContext, JobState jobState) {
+            ProcessContext pc = taskContext.getParentProcessContext();
+            ExperimentCatalog experimentCatalog = pc.getExperimentCatalog();
+            List<Object> objects = null;
+            try {
+                objects = experimentCatalog.get(ExperimentCatalogModelType.JOB_STATUS, taskContext.getTaskId(), jobKey);
+            } catch (RegistryException e) {
+                log.error("Error while getting job statuses for job : {} , task : {}, process : {}", jobKey,
+                        taskContext.getTaskId(), pc.getProcessId());
+            }
+            List<JobState> jobStatuses = objects.stream()
+                    .map(o -> ((JobStatus) o).getJobState())
+                    .collect(Collectors.toList());
+            if (!jobStatuses.contains(jobState)) {
+                JobStatus jobStatus = new JobStatus(jobState);
+                jobStatus.setReason("Aurora return " + jobState.name());
+                jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                JobModel jobModel = pc.getJobModel();
+                jobModel.setJobStatuses(Arrays.asList(jobStatus));
+                try {
+                    GFacUtils.saveJobStatus(pc, jobModel);
+                } catch (GFacException e) {
+                    log.error("Error while saving job status {}, job : {}, task :{}, process:{} exp:{}",
+                            jobState.name(), jobKey, taskContext.getTaskId(), pc.getProcessId(), pc.getExperimentId());
+                }
+            }
+        }
+
+        private void processJob(String jobKey, TaskContext taskContext, JobState jobState) {
+            JobStatus jobStatus = new JobStatus();
+            jobStatus.setJobState(jobState);
+            if (jobState == JobState.COMPLETE) {
+                jobStatus.setReason("Aurora Job completed");
+            } else if (jobState == JobState.FAILED) {
+                jobStatus.setReason("Aurora Job Failed");
+            }
+            ProcessContext pc = taskContext.getParentProcessContext();
+            JobModel jobModel = pc.getJobModel();
+            jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            jobModel.setJobStatuses(Arrays.asList(jobStatus));
+            try {
+                GFacUtils.saveJobStatus(pc, jobModel);
+            } catch (GFacException e) {
+                log.error("Error while saving job status for job : {} ", jobKey);
+            }
+
+            TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED);
+            taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            try {
+                GFacUtils.saveAndPublishTaskStatus(taskContext);
+            } catch (GFacException e) {
+                log.error("Error while saving task status for exp : {} , process : {} , task : {} , job : {}",
+                        taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), jobKey);
+            }
+
+            if (pc.isCancel()) {
+                ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING);
+                processStatus.setReason("Process has been cancelled");
+                pc.setProcessStatus(processStatus);
+                try {
+                    GFacUtils.saveAndPublishProcessStatus(pc);
+                } catch (GFacException e) {
+                    log.error("Error while cancelling process, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId());
+                }
+            }
+
+            try {
+                GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(pc));
+            } catch (GFacException e) {
+                log.error("Error while running output tasks for exp : {} , process : {}", taskContext.getExperimentId(), pc.getProcessId());
+
+                ProcessStatus processStatus = new ProcessStatus(ProcessState.FAILED);
+                processStatus.setReason("Failed to run output tasks");
+                processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                pc.setProcessStatus(processStatus);
+                try {
+                    GFacUtils.saveAndPublishProcessStatus(pc);
+                } catch (GFacException ex) {
+                    log.error("Error while updating process status to FAILED, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId());
+                }
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 74bd2db..83c9273 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -262,6 +262,11 @@ public class OrchestratorUtils {
                 if (sshJobSubmission != null) {
                     return sshJobSubmission.getSecurityProtocol();
                 }
+            } else if (submissionProtocol == JobSubmissionProtocol.CLOUD) {
+                CloudJobSubmission cloudJobSubmission = getCloudJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (cloudJobSubmission != null) {
+                    return cloudJobSubmission.getSecurityProtocol();
+                }
             }
         } catch (RegistryException e) {
             logger.error("Error occurred while retrieving security protocol", e);
@@ -302,6 +307,17 @@ public class OrchestratorUtils {
         }
     }
 
+    public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
     public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index dec9b2c..b97e79a 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -481,6 +481,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
             monitorMode = MonitorMode.FORK;
         } else if(jobSubmissionProtocol == JobSubmissionProtocol.LOCAL){
             monitorMode = MonitorMode.LOCAL;
+        } else if (jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) {
+            monitorMode = MonitorMode.CLOUD_JOB_MONITOR;
         }else {
             logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
                     processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
@@ -507,7 +509,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         submissionTaskIds.add(taskModel.getTaskId());
 
         // create monitor task for this Email based monitor mode job
-        if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+        if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR
+                || monitorMode == MonitorMode.CLOUD_JOB_MONITOR) {
             TaskModel monitorTaskModel = new TaskModel();
             monitorTaskModel.setParentProcessId(processModel.getProcessId());
             monitorTaskModel.setCreationTime(new Date().getTime());

http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71fc9ba..81b258f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -626,7 +626,7 @@
 				<module>modules/orchestrator</module>
 				<module>modules/monitoring</module>
 				<module>modules/user-profile</module>
-				<!--<module>modules/cloud</module>-->
+				<module>modules/cloud</module>
 				<module>modules/server</module>
 				<module>modules/workflow</module>
 				<module>modules/test-suite</module>