You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/01 00:10:21 UTC

airavata git commit: Start aurora cloud monitoring service

Repository: airavata
Updated Branches:
  refs/heads/auroraMesosIntegration 58db74346 -> 6d51651ca


Start aurora cloud monitoring service


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d51651c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d51651c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d51651c

Branch: refs/heads/auroraMesosIntegration
Commit: 6d51651ca45920277ab327e676a080103d3fd9fd
Parents: 58db743
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 31 20:10:15 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 31 20:10:15 2016 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  | 37 +++++++++++++++-----
 .../airavata/gfac/impl/GFacEngineImpl.java      |  2 ++
 .../cpi/impl/SimpleOrchestratorImpl.java        |  4 ++-
 3 files changed, 33 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a808ad3..8dd8699 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -62,6 +62,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
+import org.apache.airavata.gfac.monitor.cloud.AuroraJobMonitor;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingFactory;
@@ -184,6 +185,9 @@ public abstract class Factory {
 	}
 
 	public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {
+		if(resourceJobManager == null)
+			return null;
+
 		ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
 		OutputParser outputParser;
 		try {
@@ -208,9 +212,9 @@ public abstract class Factory {
 			case UGE:
 				return new UGEJobConfiguration("UGE_Groovy.template", ".pbs", resourceJobManager.getJobManagerBinPath(),
 						resourceJobManager.getJobManagerCommands(), outputParser);
-            case FORK:
-                return new ForkJobConfiguration("FORK_Groovy.template", ".sh", resourceJobManager.getJobManagerBinPath(),
-                        resourceJobManager.getJobManagerCommands(), outputParser);
+			case FORK:
+				return new ForkJobConfiguration("FORK_Groovy.template", ".sh", resourceJobManager.getJobManagerBinPath(),
+						resourceJobManager.getJobManagerCommands(), outputParser);
 			default:
 				return null;
 		}
@@ -244,17 +248,23 @@ public abstract class Factory {
                     jobSubmissionProtocol == JobSubmissionProtocol.LOCAL_FORK) {
                 remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
             } else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
-                    jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+                    jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK
+					|| jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) {
+
                 remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
                         processContext.getSshKeyAuthentication());
-            }
+            }else {
+				throw new GFacException("No remote cluster implementation map to job submission protocol "
+						+ jobSubmissionProtocol.name());
+			}
             remoteClusterMap.put(key, remoteCluster);
         }else {
             AuthenticationInfo authentication = remoteCluster.getAuthentication();
             if (authentication instanceof SSHKeyAuthentication){
                 SSHKeyAuthentication sshKeyAuthentication = (SSHKeyAuthentication)authentication;
                 if (!sshKeyAuthentication.getUserName().equals(getLoginUserName(processContext))){
-                    JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+                    JobManagerConfiguration jobManagerConfiguration =
+							getJobManagerConfiguration(processContext.getResourceJobManager());
                     if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
                             jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
                         remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
@@ -281,7 +291,10 @@ public abstract class Factory {
             } else if (dataMovementProtocol == DataMovementProtocol.SCP) {
                 remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
                         processContext.getSshKeyAuthentication());
-            }
+            }else {
+				throw new GFacException("No remote cluster implementation map to job data movement protocol "
+						+ dataMovementProtocol.name());
+			}
 
             remoteClusterMap.put(key, remoteCluster);
         }else {
@@ -289,7 +302,8 @@ public abstract class Factory {
             if (authentication instanceof SSHKeyAuthentication){
                 SSHKeyAuthentication sshKeyAuthentication = (SSHKeyAuthentication)authentication;
                 if (!sshKeyAuthentication.getUserName().equals(getLoginUserName(processContext))){
-                    JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+                    JobManagerConfiguration jobManagerConfiguration =
+							getJobManagerConfiguration(processContext.getResourceJobManager());
                     dataMovementProtocol = processContext.getDataMovementProtocol();
                     if (dataMovementProtocol == DataMovementProtocol.SCP) {
                         remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
@@ -450,8 +464,13 @@ public abstract class Factory {
 						case JOB_EMAIL_NOTIFICATION_MONITOR:
 							EmailBasedMonitor emailBasedMonitor = new EmailBasedMonitor(Factory.getResourceConfig());
 							jobMonitorServices.put(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR, emailBasedMonitor);
-							jobMonitor = ((JobMonitor) emailBasedMonitor);
+							jobMonitor = emailBasedMonitor;
 							new Thread(emailBasedMonitor).start();
+							break;
+						case CLOUD_JOB_MONITOR:
+							AuroraJobMonitor auroraJobMonitor = AuroraJobMonitor.getInstance();
+							new Thread(auroraJobMonitor).start();
+							jobMonitorServices.put(MonitorMode.CLOUD_JOB_MONITOR, auroraJobMonitor);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 6f8e3ad..212b246 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -929,6 +929,8 @@ public class GFacEngineImpl implements GFacEngine {
                     (jsInterface.getJobSubmissionInterfaceId());
             processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
             resourceJobManager = sshJobSubmission.getResourceJobManager();
+        } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.CLOUD) {
+            return null;
         } else {
             throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
                     .name());

http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b97e79a..66bf5ca 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -291,7 +291,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
                 // TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here.
                 taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
             } else {
-                taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+                if(resourcePreference.getPreferredJobSubmissionProtocol() != JobSubmissionProtocol.CLOUD){
+                    taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+                }
                 taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
                 if (autoSchedule) {
                     List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();