You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/11 21:12:06 UTC
[02/50] [abbrv] airavata git commit: Removed legacy descriptions from
MonitorID, GSISSH provider and utils and AMQPMonitor classes
Removed legacy descriptions from MonitorID, GSISSH provider and utils and AMQPMonitor classes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f29dfbe7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f29dfbe7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f29dfbe7
Branch: refs/heads/master
Commit: f29dfbe709c322703225e7f14f1bb1ffd1129a02
Parents: 14bd941
Author: shamrath <sh...@gmail.com>
Authored: Fri Oct 31 12:25:31 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Oct 31 12:25:31 2014 -0400
----------------------------------------------------------------------
.../data/impl/GwyResourceProfileImpl.java | 8 +-
.../data/util/AppCatalogThriftConversion.java | 4 +-
.../app/catalog/test/GatewayProfileTest.java | 8 +-
.../gfac/core/context/JobExecutionContext.java | 4 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 33 +++---
.../airavata/gfac/core/monitor/MonitorID.java | 19 ++--
.../gsissh/provider/impl/GSISSHProvider.java | 64 ++++++-----
.../gfac/gsissh/util/GFACGSISSHUtils.java | 108 ++++++++++---------
.../monitor/impl/push/amqp/AMQPMonitor.java | 57 +++++-----
.../apache/airavata/job/AMQPMonitorTest.java | 64 +++++++----
10 files changed, 213 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
index ed66bff..101b647 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
@@ -66,8 +66,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile {
resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId()));
resource.setGatewayId(profileResource.getGatewayID());
resource.setOverrideByAiravata(preference.isOverridebyAiravata());
- resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol());
- resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol());
+ resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString());
+ resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString());
resource.setBatchQueue(preference.getPreferredBatchQueue());
resource.setProjectNumber(preference.getAllocationProjectNumber());
resource.setScratchLocation(preference.getScratchLocation());
@@ -100,8 +100,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile {
resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId()));
resource.setGatewayId(gatewayId);
resource.setOverrideByAiravata(preference.isOverridebyAiravata());
- resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol());
- resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol());
+ resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString());
+ resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString());
resource.setBatchQueue(preference.getPreferredBatchQueue());
resource.setProjectNumber(preference.getAllocationProjectNumber());
resource.setScratchLocation(preference.getScratchLocation());
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
index 05cfa11..bc435f4 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
@@ -670,8 +670,8 @@ public class AppCatalogThriftConversion {
ComputeResourcePreference preference = new ComputeResourcePreference();
preference.setComputeResourceId(resource.getResourceId());
preference.setOverridebyAiravata(resource.getOverrideByAiravata());
- preference.setPreferredJobSubmissionProtocol(resource.getPreferredJobProtocol());
- preference.setPreferredDataMovementProtocol(resource.getPreferedDMProtocol());
+ preference.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.valueOf(resource.getPreferredJobProtocol()));
+ preference.setPreferredDataMovementProtocol(DataMovementProtocol.valueOf(resource.getPreferedDMProtocol()));
preference.setPreferredBatchQueue(resource.getBatchQueue());
preference.setScratchLocation(resource.getScratchLocation());
preference.setAllocationProjectNumber(resource.getProjectNumber());
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
index 66eb6bb..3593e11 100644
--- a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
+++ b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
@@ -84,8 +84,8 @@ public class GatewayProfileTest {
ComputeResourcePreference preference1 = new ComputeResourcePreference();
preference1.setComputeResourceId(hostId1);
preference1.setOverridebyAiravata(true);
- preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH.toString());
- preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP.toString());
+ preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH);
+ preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP);
preference1.setPreferredBatchQueue("queue1");
preference1.setScratchLocation("/tmp");
preference1.setAllocationProjectNumber("project1");
@@ -93,8 +93,8 @@ public class GatewayProfileTest {
ComputeResourcePreference preference2 = new ComputeResourcePreference();
preference2.setComputeResourceId(hostId2);
preference2.setOverridebyAiravata(true);
- preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL.toString());
- preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP.toString());
+ preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL);
+ preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP);
preference2.setPreferredBatchQueue("queue2");
preference2.setScratchLocation("/tmp");
preference2.setAllocationProjectNumber("project2");
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index a95540c..891bece 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -430,4 +430,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
public void setPreferredJobSubmissionInterface(JobSubmissionInterface preferredJobSubmissionInterface) {
this.preferredJobSubmissionInterface = preferredJobSubmissionInterface;
}
+
+ public String getHostName() {
+ return applicationContext.getComputeResourceDescription().getHostName();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 696b61b..e8e4c66 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -302,6 +302,20 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setGfac(this);
jobExecutionContext.setZk(zk);
jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID, taskID));
+
+ List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+
+ jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
+ }else {
+ throw new GFacException("Compute resource should have at least one job submission interface defined...");
+ }
if (gatewayResourcePreferences != null ) {
if (gatewayResourcePreferences.getScratchLocation() == null) {
gatewayResourcePreferences.setScratchLocation("/tmp");
@@ -326,22 +340,11 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol());
+ if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
+ jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
+ jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
+ }
}
-
- List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
- if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
- Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
- @Override
- public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
- return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
- }
- });
-
- jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
- }else {
- throw new GFacException("Compute resource should have at least one job submission interface defined...");
- }
-
return jobExecutionContext;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 6ea1839..55da288 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.core.monitor;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -44,7 +43,7 @@ public class MonitorID {
private Timestamp lastMonitored;
- private HostDescription host;
+ private ComputeResourceDescription computeResourceDescription;
private Map<String, Object> parameters;
@@ -67,7 +66,7 @@ public class MonitorID {
public MonitorID() {
}
public MonitorID(MonitorID monitorID){
- this.host = monitorID.getHost();
+ this.computeResourceDescription = monitorID.getComputeResourceDescription();
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.userName = monitorID.getUserName();
this.jobID = monitorID.getJobID();
@@ -76,8 +75,8 @@ public class MonitorID {
this.workflowNodeID = monitorID.getWorkflowNodeID();
this.jobName = monitorID.getJobName();
}
- public MonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) {
- this.host = host;
+ public MonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) {
+ this.computeResourceDescription = computeResourceDescription;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.userName = userName;
this.jobID = jobID;
@@ -89,7 +88,7 @@ public class MonitorID {
public MonitorID(JobExecutionContext jobExecutionContext) {
this.jobExecutionContext = jobExecutionContext;
- host = jobExecutionContext.getApplicationContext().getHostDescription();
+ this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
userName = jobExecutionContext.getExperiment().getUserName();
taskID = jobExecutionContext.getTaskData().getTaskID();
experimentID = jobExecutionContext.getExperiment().getExperimentID();
@@ -102,12 +101,12 @@ public class MonitorID {
}
}
- public HostDescription getHost() {
- return host;
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
}
- public void setHost(HostDescription host) {
- this.host = host;
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
}
public Timestamp getLastMonitored() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index b5a325a..92a50e4 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -20,6 +20,9 @@
*/
package org.apache.airavata.gfac.gsissh.provider.impl;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacException;
@@ -36,11 +39,16 @@ import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
+//import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
import org.apache.zookeeper.KeeperException;
@@ -48,6 +56,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.monitor.Monitor;
import java.util.List;
import java.util.Map;
@@ -76,14 +85,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
log.info("Invoking GSISSH Provider Invoke ...");
StringBuffer data = new StringBuffer();
jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
- HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
- getApplicationDeploymentDescription().getType();
+ ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+ .getComputeResourceDescription();
+ ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription();
JobDetails jobDetails = new JobDetails();
Cluster cluster = null;
-
+
try {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
+ jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
+
if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
}
@@ -93,7 +106,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
log.info("Successfully retrieved the Security Context");
}
// This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+ JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
jobDetails.setJobName(jobDescriptor.getJobName());
log.info(jobDescriptor.toXML());
@@ -113,10 +126,10 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
// Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
// to perform monitoring, daemon handlers can be accessed from anywhere
- delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobDetails.getJobID());
+ delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission , jobDetails.getJobID());
// we know this host is type GsiSSHHostType
} catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
@@ -130,18 +143,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
- public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+ public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
if (daemonHandlers == null) {
daemonHandlers = BetterGfacImpl.getDaemonHandlers();
}
ThreadedHandler pullMonitorHandler = null;
ThreadedHandler pushMonitorHandler = null;
- String monitorMode = host.getMonitorMode();
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
for (ThreadedHandler threadedHandler : daemonHandlers) {
if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
pullMonitorHandler = threadedHandler;
- if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
pullMonitorHandler.invoke(jobExecutionContext);
} else {
@@ -150,7 +163,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
} else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
pushMonitorHandler = threadedHandler;
- if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+ if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
pushMonitorHandler.invoke(jobExecutionContext);
} else {
@@ -166,18 +179,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
}
- public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+ public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
if (daemonHandlers == null) {
daemonHandlers = BetterGfacImpl.getDaemonHandlers();
}
ThreadedHandler pullMonitorHandler = null;
ThreadedHandler pushMonitorHandler = null;
- String monitorMode = host.getMonitorMode();
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
for (ThreadedHandler threadedHandler : daemonHandlers) {
if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
pullMonitorHandler = threadedHandler;
- if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+ if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
jobExecutionContext.setProperty("cancel","true");
pullMonitorHandler.invoke(jobExecutionContext);
} else {
@@ -186,7 +199,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
} else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
pushMonitorHandler = threadedHandler;
- if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+ if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
pushMonitorHandler.invoke(jobExecutionContext);
} else {
log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
@@ -208,8 +221,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
//To change body of implemented methods use File | Settings | File Templates.
log.info("canceling the job status in GSISSHProvider!!!!!");
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
JobDetails jobDetails = jobExecutionContext.getJobDetails();
try {
Cluster cluster = null;
@@ -236,14 +247,14 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
@@ -255,8 +266,8 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
// have to implement the logic to recover a gfac failure
log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
- HostDescriptionType host = jobExecutionContext.getApplicationContext().
- getHostDescription().getType();
+ ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+ .getComputeResourceDescription();
String jobId = "";
String jobDesc = "";
try {
@@ -306,8 +317,11 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
}
- delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobId);
- } catch (GFacHandlerException e) {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
+ jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
+ delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId);
+ } catch (Exception e) {
throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 4d338e3..baca65c 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -20,21 +20,19 @@
*/
package org.apache.airavata.gfac.gsissh.util;
-import java.sql.SQLException;
-import java.util.*;
-
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.credential.store.credential.Credential;
import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
import org.apache.airavata.gfac.core.utils.GFacUtils;
@@ -47,22 +45,26 @@ import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.FileArrayType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.SSHHostType;
import org.apache.airavata.schemas.gfac.StringArrayType;
import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.UnicoreHostType;
-import org.apache.openjpa.lib.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.constraints.Max;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
public class GFACGSISSHUtils {
@@ -74,32 +76,35 @@ public class GFACGSISSHUtils {
public static int maxClusterCount = 5;
public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
- HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
- if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType
- || registeredHost.getType() instanceof SSHHostType) {
- logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
- } else if (registeredHost.getType() instanceof GsisshHostType) {
- String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
- RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
- requestData.setTokenId(credentialStoreToken);
- PBSCluster pbsCluster = null;
- GSISecurityContext context = null;
- try {
+ JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+ try {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE
+ || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) {
+ logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+ } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+ String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+ RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
+ requestData.setTokenId(credentialStoreToken);
+ PBSCluster pbsCluster = null;
+ GSISecurityContext context = null;
+
TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
CredentialReader credentialReader = GFacUtils.getCredentialReader();
- if(credentialReader != null){
- CertificateCredential credential = null;
- try {
- credential = (CertificateCredential)credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken);
- requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
+ if (credentialReader != null) {
+ CertificateCredential credential = null;
+ try {
+ credential = (CertificateCredential) credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken);
+ requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
+ }
}
- GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType();
- String key = requestData.getMyProxyUserName() + registeredHost.getType().getHostAddress() +
- gsisshHostType.getPort();
+ String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+
+ sshJobSubmission.getSshPort();
boolean recreate = false;
synchronized (clusters) {
if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
@@ -112,7 +117,7 @@ public class GFACGSISSHUtils {
clusters.get(key).remove(i);
recreate = true;
}
- if(!recreate) {
+ if (!recreate) {
try {
pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
} catch (Exception e) {
@@ -129,13 +134,12 @@ public class GFACGSISSHUtils {
}
if (recreate) {
- ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(),
- gsisshHostType.getPort());
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
+ sshJobSubmission.getSshPort());
JobManagerConfiguration jConfig = null;
- String installedParentPath = ((HpcApplicationDeploymentType)
- jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
- String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
+ String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
+ String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
if (jobManager == null) {
logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
jConfig = CommonUtils.getPBSJobManager(installedParentPath);
@@ -160,28 +164,30 @@ public class GFACGSISSHUtils {
clusters.put(key, pbsClusters);
}
}
- } catch (Exception e) {
- throw new GFacException("An error occurred while creating GSI security context", e);
+
+ jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
}
- jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
+ } catch (Exception e) {
+ throw new GFacException("An error occurred while creating GSI security context", e);
}
}
- public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext,
- ApplicationDeploymentDescriptionType app, Cluster cluster) {
+ public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
JobDescriptor jobDescriptor = new JobDescriptor();
+ ApplicationContext applicationContext = jobExecutionContext.getApplicationContext();
+ ApplicationDeploymentDescription app = applicationContext.getApplicationDeploymentDescription();
// this is common for any application descriptor
jobDescriptor.setCallBackIp(ServerSettings.getIp());
jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
- jobDescriptor.setInputDirectory(app.getInputDataDirectory());
- jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
- jobDescriptor.setExecutablePath(app.getExecutableLocation());
- jobDescriptor.setStandardOutFile(app.getStandardOutput());
- jobDescriptor.setStandardErrorFile(app.getStandardError());
+ jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+ jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+ jobDescriptor.setExecutablePath(app.getExecutablePath());
+ jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+ jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
Random random = new Random();
int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name
jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
- jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
+ jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
List<String> inputValues = new ArrayList<String>();
MessageContext input = jobExecutionContext.getInMessageContext();
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index baab7b4..28d13f2 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -30,12 +30,12 @@ import java.util.concurrent.BlockingQueue;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.monitor.core.PushMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -107,30 +107,37 @@ public class AMQPMonitor extends PushMonitor {
@Override
public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
// we subscribe to read user-host based subscription
- HostDescription host = monitorID.getHost();
- String hostAddress = host.getType().getHostAddress();
- // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
- // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
- String channelID = CommonUtils.getChannelID(monitorID);
- if(availableChannels.get(channelID) == null){
- try {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
- Channel channel = null;
- channel = connection.createChannel();
- availableChannels.put(channelID, channel);
- String queueName = channel.queueDeclare().getQueue();
-
- BasicConsumer consumer = new
- BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
- channel.basicConsume(queueName, true, consumer);
- String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
- // here we queuebind to a particular user in a particular machine
- channel.queueBind(queueName, "glue2.computing_activity", filterString);
- logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
- }
+ ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
+ if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
+ // we get first ip address for the moment
+ String hostAddress = computeResourceDescription.getIpAddresses().get(0);
+ // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+ // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+ String channelID = CommonUtils.getChannelID(monitorID);
+ if (availableChannels.get(channelID) == null) {
+ try {
+ //todo need to fix this rather getting it from a file
+ Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+ Channel channel = null;
+ channel = connection.createChannel();
+ availableChannels.put(channelID, channel);
+ String queueName = channel.queueDeclare().getQueue();
+
+ BasicConsumer consumer = new
+ BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
+ channel.basicConsume(queueName, true, consumer);
+ String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+ // here we queuebind to a particular user in a particular machine
+ channel.queueBind(queueName, "glue2.computing_activity", filterString);
+ logger.info("Using filtering string to monitor: " + filterString);
+ } catch (IOException e) {
+ logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+ }
+ }
+ } else {
+ throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
+ " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
+ "IpAddress with it");
}
return true;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 94528b9..a979890 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -20,15 +20,11 @@
*/
package org.apache.airavata.job;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -38,14 +34,29 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
public class AMQPMonitorTest {
@@ -54,12 +65,13 @@ public class AMQPMonitorTest {
private String certificateLocation;
private String pbsFilePath;
private String workingDirectory;
- private HostDescription hostDescription;
private MonitorPublisher monitorPublisher;
private BlockingQueue<MonitorID> finishQueue;
private BlockingQueue<MonitorID> pushQueue;
private Thread pushThread;
private String proxyFilePath;
+ private ComputeResourceDescription computeResourceDescription;
+
@Before
public void setUp() throws Exception {
System.setProperty("myproxy.username", "ogce");
@@ -98,14 +110,26 @@ public class AMQPMonitorTest {
} catch (Exception e) {
e.printStackTrace();
}
+ computeResourceDescription = new ComputeResourceDescription("TestComputerResoruceId", "TestHostName");
+ computeResourceDescription.setHostName("stampede-host");
+ computeResourceDescription.addToIpAddresses("login1.stampede.tacc.utexas.edu");
+ ResourceJobManager resourceJobManager = new ResourceJobManager("1234", ResourceJobManagerType.SLURM);
+ Map<JobManagerCommand, String> commandMap = new HashMap<JobManagerCommand, String>();
+ commandMap.put(JobManagerCommand.SUBMISSION, "test");
+ resourceJobManager.setJobManagerCommands(commandMap);
+ resourceJobManager.setJobManagerBinPath("/usr/bin/");
+ resourceJobManager.setPushMonitoringEndpoint("push"); // TODO - add monitor mode
+ SSHJobSubmission sshJobSubmission = new SSHJobSubmission("TestSSHJobSubmissionInterfaceId", SecurityProtocol.GSI,
+ resourceJobManager);
+
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ String jobSubmissionID = appCatalog.getComputeResource().addSSHJobSubmission(sshJobSubmission);
+
+ JobSubmissionInterface jobSubmissionInterface = new JobSubmissionInterface(jobSubmissionID, JobSubmissionProtocol.SSH, 1);
+
+ computeResourceDescription.addToJobSubmissionInterfaces(jobSubmissionInterface);
+ computeResourceDescription.addToDataMovementInterfaces(new DataMovementInterface("4532", DataMovementProtocol.SCP, 1));
- hostDescription = new HostDescription(GsisshHostType.type);
- hostDescription.getType().setHostAddress("login1.stampede.tacc.utexas.edu");
- hostDescription.getType().setHostName("stampede-host");
- ((GsisshHostType) hostDescription.getType()).setJobManager("slurm");
- ((GsisshHostType) hostDescription.getType()).setInstalledPath("/usr/bin/");
- ((GsisshHostType) hostDescription.getType()).setPort(2222);
- ((GsisshHostType) hostDescription.getType()).setMonitorMode("push");
}
@Test
@@ -151,7 +175,7 @@ public class AMQPMonitorTest {
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println(jobID);
try {
- pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce", jobName));
+ pushQueue.add(new MonitorID(computeResourceDescription, jobID,null,null,null, "ogce", jobName));
} catch (Exception e) {
e.printStackTrace();
}