You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/01 16:12:29 UTC
[6/6] airavata git commit: Start aurora cloud monitoring service
Start aurora cloud monitoring service
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d51651c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d51651c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d51651c
Branch: refs/heads/develop
Commit: 6d51651ca45920277ab327e676a080103d3fd9fd
Parents: 58db743
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Oct 31 20:10:15 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Oct 31 20:10:15 2016 -0400
----------------------------------------------------------------------
.../org/apache/airavata/gfac/impl/Factory.java | 37 +++++++++++++++-----
.../airavata/gfac/impl/GFacEngineImpl.java | 2 ++
.../cpi/impl/SimpleOrchestratorImpl.java | 4 ++-
3 files changed, 33 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a808ad3..8dd8699 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -62,6 +62,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
import org.apache.airavata.gfac.impl.task.ArchiveTask;
import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
+import org.apache.airavata.gfac.monitor.cloud.AuroraJobMonitor;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingFactory;
@@ -184,6 +185,9 @@ public abstract class Factory {
}
public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {
+ if(resourceJobManager == null)
+ return null;
+
ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
OutputParser outputParser;
try {
@@ -208,9 +212,9 @@ public abstract class Factory {
case UGE:
return new UGEJobConfiguration("UGE_Groovy.template", ".pbs", resourceJobManager.getJobManagerBinPath(),
resourceJobManager.getJobManagerCommands(), outputParser);
- case FORK:
- return new ForkJobConfiguration("FORK_Groovy.template", ".sh", resourceJobManager.getJobManagerBinPath(),
- resourceJobManager.getJobManagerCommands(), outputParser);
+ case FORK:
+ return new ForkJobConfiguration("FORK_Groovy.template", ".sh", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), outputParser);
default:
return null;
}
@@ -244,17 +248,23 @@ public abstract class Factory {
jobSubmissionProtocol == JobSubmissionProtocol.LOCAL_FORK) {
remoteCluster = new LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, null);
} else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
- jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+ jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK
+ || jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) {
+
remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
processContext.getSshKeyAuthentication());
- }
+ }else {
+ throw new GFacException("No remote cluster implementation map to job submission protocol "
+ + jobSubmissionProtocol.name());
+ }
remoteClusterMap.put(key, remoteCluster);
}else {
AuthenticationInfo authentication = remoteCluster.getAuthentication();
if (authentication instanceof SSHKeyAuthentication){
SSHKeyAuthentication sshKeyAuthentication = (SSHKeyAuthentication)authentication;
if (!sshKeyAuthentication.getUserName().equals(getLoginUserName(processContext))){
- JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+ JobManagerConfiguration jobManagerConfiguration =
+ getJobManagerConfiguration(processContext.getResourceJobManager());
if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
@@ -281,7 +291,10 @@ public abstract class Factory {
} else if (dataMovementProtocol == DataMovementProtocol.SCP) {
remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
processContext.getSshKeyAuthentication());
- }
+ }else {
+ throw new GFacException("No remote cluster implementation map to job data movement protocol "
+ + dataMovementProtocol.name());
+ }
remoteClusterMap.put(key, remoteCluster);
}else {
@@ -289,7 +302,8 @@ public abstract class Factory {
if (authentication instanceof SSHKeyAuthentication){
SSHKeyAuthentication sshKeyAuthentication = (SSHKeyAuthentication)authentication;
if (!sshKeyAuthentication.getUserName().equals(getLoginUserName(processContext))){
- JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(processContext.getResourceJobManager());
+ JobManagerConfiguration jobManagerConfiguration =
+ getJobManagerConfiguration(processContext.getResourceJobManager());
dataMovementProtocol = processContext.getDataMovementProtocol();
if (dataMovementProtocol == DataMovementProtocol.SCP) {
remoteCluster = new HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration,
@@ -450,8 +464,13 @@ public abstract class Factory {
case JOB_EMAIL_NOTIFICATION_MONITOR:
EmailBasedMonitor emailBasedMonitor = new EmailBasedMonitor(Factory.getResourceConfig());
jobMonitorServices.put(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR, emailBasedMonitor);
- jobMonitor = ((JobMonitor) emailBasedMonitor);
+ jobMonitor = emailBasedMonitor;
new Thread(emailBasedMonitor).start();
+ break;
+ case CLOUD_JOB_MONITOR:
+ AuroraJobMonitor auroraJobMonitor = AuroraJobMonitor.getInstance();
+ new Thread(auroraJobMonitor).start();
+ jobMonitorServices.put(MonitorMode.CLOUD_JOB_MONITOR, auroraJobMonitor);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 6f8e3ad..212b246 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -929,6 +929,8 @@ public class GFacEngineImpl implements GFacEngine {
(jsInterface.getJobSubmissionInterfaceId());
processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
resourceJobManager = sshJobSubmission.getResourceJobManager();
+ } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.CLOUD) {
+ return null;
} else {
throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
.name());
http://git-wip-us.apache.org/repos/asf/airavata/blob/6d51651c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b97e79a..66bf5ca 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -291,7 +291,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
// TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here.
taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
} else {
- taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+ if(resourcePreference.getPreferredJobSubmissionProtocol() != JobSubmissionProtocol.CLOUD){
+ taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+ }
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
if (autoSchedule) {
List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();