You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/11/11 21:12:06 UTC

[02/50] [abbrv] airavata git commit: Removed legacy descriptions from MonitorID, GSISSH provider and utils and AMQPMonitor classes

Removed legacy descriptions from MonitorID, GSISSH provider and utils and AMQPMonitor classes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f29dfbe7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f29dfbe7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f29dfbe7

Branch: refs/heads/master
Commit: f29dfbe709c322703225e7f14f1bb1ffd1129a02
Parents: 14bd941
Author: shamrath <sh...@gmail.com>
Authored: Fri Oct 31 12:25:31 2014 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Fri Oct 31 12:25:31 2014 -0400

----------------------------------------------------------------------
 .../data/impl/GwyResourceProfileImpl.java       |   8 +-
 .../data/util/AppCatalogThriftConversion.java   |   4 +-
 .../app/catalog/test/GatewayProfileTest.java    |   8 +-
 .../gfac/core/context/JobExecutionContext.java  |   4 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  33 +++---
 .../airavata/gfac/core/monitor/MonitorID.java   |  19 ++--
 .../gsissh/provider/impl/GSISSHProvider.java    |  64 ++++++-----
 .../gfac/gsissh/util/GFACGSISSHUtils.java       | 108 ++++++++++---------
 .../monitor/impl/push/amqp/AMQPMonitor.java     |  57 +++++-----
 .../apache/airavata/job/AMQPMonitorTest.java    |  64 +++++++----
 10 files changed, 213 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
index ed66bff..101b647 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java
@@ -66,8 +66,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile {
                     resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId()));
                     resource.setGatewayId(profileResource.getGatewayID());
                     resource.setOverrideByAiravata(preference.isOverridebyAiravata());
-                    resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol());
-                    resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol());
+                    resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString());
+                    resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString());
                     resource.setBatchQueue(preference.getPreferredBatchQueue());
                     resource.setProjectNumber(preference.getAllocationProjectNumber());
                     resource.setScratchLocation(preference.getScratchLocation());
@@ -100,8 +100,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile {
                     resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId()));
                     resource.setGatewayId(gatewayId);
                     resource.setOverrideByAiravata(preference.isOverridebyAiravata());
-                    resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol());
-                    resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol());
+                    resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString());
+                    resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString());
                     resource.setBatchQueue(preference.getPreferredBatchQueue());
                     resource.setProjectNumber(preference.getAllocationProjectNumber());
                     resource.setScratchLocation(preference.getScratchLocation());

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
index 05cfa11..bc435f4 100644
--- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
+++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java
@@ -670,8 +670,8 @@ public class AppCatalogThriftConversion {
         ComputeResourcePreference preference = new ComputeResourcePreference();
         preference.setComputeResourceId(resource.getResourceId());
         preference.setOverridebyAiravata(resource.getOverrideByAiravata());
-        preference.setPreferredJobSubmissionProtocol(resource.getPreferredJobProtocol());
-        preference.setPreferredDataMovementProtocol(resource.getPreferedDMProtocol());
+        preference.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.valueOf(resource.getPreferredJobProtocol()));
+        preference.setPreferredDataMovementProtocol(DataMovementProtocol.valueOf(resource.getPreferedDMProtocol()));
         preference.setPreferredBatchQueue(resource.getBatchQueue());
         preference.setScratchLocation(resource.getScratchLocation());
         preference.setAllocationProjectNumber(resource.getProjectNumber());

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
----------------------------------------------------------------------
diff --git a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
index 66eb6bb..3593e11 100644
--- a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
+++ b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java
@@ -84,8 +84,8 @@ public class GatewayProfileTest {
         ComputeResourcePreference preference1 = new ComputeResourcePreference();
         preference1.setComputeResourceId(hostId1);
         preference1.setOverridebyAiravata(true);
-        preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH.toString());
-        preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP.toString());
+        preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH);
+                preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP);
         preference1.setPreferredBatchQueue("queue1");
         preference1.setScratchLocation("/tmp");
         preference1.setAllocationProjectNumber("project1");
@@ -93,8 +93,8 @@ public class GatewayProfileTest {
         ComputeResourcePreference preference2 = new ComputeResourcePreference();
         preference2.setComputeResourceId(hostId2);
         preference2.setOverridebyAiravata(true);
-        preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL.toString());
-        preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP.toString());
+        preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL);
+        preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP);
         preference2.setPreferredBatchQueue("queue2");
         preference2.setScratchLocation("/tmp");
         preference2.setAllocationProjectNumber("project2");

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index a95540c..891bece 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -430,4 +430,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
     public void setPreferredJobSubmissionInterface(JobSubmissionInterface preferredJobSubmissionInterface) {
         this.preferredJobSubmissionInterface = preferredJobSubmissionInterface;
     }
+
+    public String getHostName() {
+        return applicationContext.getComputeResourceDescription().getHostName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 696b61b..e8e4c66 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -302,6 +302,20 @@ public class BetterGfacImpl implements GFac,Watcher {
         jobExecutionContext.setGfac(this);
         jobExecutionContext.setZk(zk);
         jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID, taskID));
+
+        List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
+        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+            Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+                @Override
+                public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                    return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                }
+            });
+
+            jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
+        }else {
+            throw new GFacException("Compute resource should have at least one job submission interface defined...");
+        }
         if (gatewayResourcePreferences != null ) {
             if (gatewayResourcePreferences.getScratchLocation() == null) {
                 gatewayResourcePreferences.setScratchLocation("/tmp");
@@ -326,22 +340,11 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr");
 
             jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol());
+            if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) {
+                jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0));
+                jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol());
+            }
         }
-
-        List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
-        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
-            Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
-                @Override
-                public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
-                    return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
-                }
-            });
-
-            jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
-        }else {
-            throw new GFacException("Compute resource should have at least one job submission interface defined...");
-        }
-
         return jobExecutionContext;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 6ea1839..55da288 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.core.monitor;
 
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -44,7 +43,7 @@ public class MonitorID {
 
     private Timestamp lastMonitored;
 
-    private HostDescription host;
+    private ComputeResourceDescription computeResourceDescription;
 
     private Map<String, Object> parameters;
 
@@ -67,7 +66,7 @@ public class MonitorID {
     public MonitorID() {
     }
     public MonitorID(MonitorID monitorID){
-        this.host = monitorID.getHost();
+        this.computeResourceDescription = monitorID.getComputeResourceDescription();
         this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.userName = monitorID.getUserName();
         this.jobID = monitorID.getJobID();
@@ -76,8 +75,8 @@ public class MonitorID {
         this.workflowNodeID = monitorID.getWorkflowNodeID();
         this.jobName = monitorID.getJobName();
     }
-    public MonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) {
-        this.host = host;
+    public MonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) {
+        this.computeResourceDescription = computeResourceDescription;
         this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.userName = userName;
         this.jobID = jobID;
@@ -89,7 +88,7 @@ public class MonitorID {
 
     public MonitorID(JobExecutionContext jobExecutionContext) {
         this.jobExecutionContext = jobExecutionContext;
-        host = jobExecutionContext.getApplicationContext().getHostDescription();
+        this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
         userName = jobExecutionContext.getExperiment().getUserName();
         taskID = jobExecutionContext.getTaskData().getTaskID();
         experimentID = jobExecutionContext.getExperiment().getExperimentID();
@@ -102,12 +101,12 @@ public class MonitorID {
         }
     }
 
-    public HostDescription getHost() {
-        return host;
+    public ComputeResourceDescription getComputeResourceDescription() {
+        return computeResourceDescription;
     }
 
-    public void setHost(HostDescription host) {
-        this.host = host;
+    public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+        this.computeResourceDescription = computeResourceDescription;
     }
 
     public Timestamp getLastMonitored() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index b5a325a..92a50e4 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -20,6 +20,9 @@
 */
 package org.apache.airavata.gfac.gsissh.provider.impl;
 
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.gfac.ExecutionMode;
 import org.apache.airavata.gfac.GFacException;
@@ -36,11 +39,16 @@ import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
+//import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
 import org.apache.zookeeper.KeeperException;
@@ -48,6 +56,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.monitor.Monitor;
 import java.util.List;
 import java.util.Map;
 
@@ -76,14 +85,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
         log.info("Invoking GSISSH Provider Invoke ...");
         StringBuffer data = new StringBuffer();
         jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-        HostDescriptionType host = jobExecutionContext.getApplicationContext().
-                getHostDescription().getType();
-        HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
-                getApplicationDeploymentDescription().getType();
+        ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
+        ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription();
         JobDetails jobDetails = new JobDetails();
         Cluster cluster = null;
-        
+
         try {
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
+                    jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
+
             if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
                 cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
             }
@@ -93,7 +106,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                 log.info("Successfully retrieved the Security Context");
             }
             // This installed path is a mandetory field, because this could change based on the computing resource
-            JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+            JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
             jobDetails.setJobName(jobDescriptor.getJobName());
 
             log.info(jobDescriptor.toXML());
@@ -113,10 +126,10 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
 
             // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
             // to perform monitoring, daemon handlers can be accessed from anywhere
-            delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobDetails.getJobID());
+            delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission , jobDetails.getJobID());
             // we know this host is type GsiSSHHostType
         } catch (Exception e) {
-		    String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+		    String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
             log.error(error);
             jobDetails.setJobID("none");
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
@@ -130,18 +143,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
           
     }
 
-    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
         List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
         if (daemonHandlers == null) {
             daemonHandlers = BetterGfacImpl.getDaemonHandlers();
         }
         ThreadedHandler pullMonitorHandler = null;
         ThreadedHandler pushMonitorHandler = null;
-        String monitorMode = host.getMonitorMode();
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
         for (ThreadedHandler threadedHandler : daemonHandlers) {
             if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pullMonitorHandler = threadedHandler;
-                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
                     log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
                     pullMonitorHandler.invoke(jobExecutionContext);
                 } else {
@@ -150,7 +163,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                 }
             } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pushMonitorHandler = threadedHandler;
-                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+                if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
                     log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
                     pushMonitorHandler.invoke(jobExecutionContext);
                 } else {
@@ -166,18 +179,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
         }
     }
 
-    public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+    public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
         List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
         if (daemonHandlers == null) {
             daemonHandlers = BetterGfacImpl.getDaemonHandlers();
         }
         ThreadedHandler pullMonitorHandler = null;
         ThreadedHandler pushMonitorHandler = null;
-        String monitorMode = host.getMonitorMode();
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
         for (ThreadedHandler threadedHandler : daemonHandlers) {
             if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pullMonitorHandler = threadedHandler;
-                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
                     jobExecutionContext.setProperty("cancel","true");
                     pullMonitorHandler.invoke(jobExecutionContext);
                 } else {
@@ -186,7 +199,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                 }
             } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pushMonitorHandler = threadedHandler;
-                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+                if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
                     pushMonitorHandler.invoke(jobExecutionContext);
                 } else {
                     log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
@@ -208,8 +221,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
     public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
         //To change body of implemented methods use File | Settings | File Templates.
         log.info("canceling the job status in GSISSHProvider!!!!!");
-        HostDescriptionType host = jobExecutionContext.getApplicationContext().
-                getHostDescription().getType();
         JobDetails jobDetails = jobExecutionContext.getJobDetails();
         try {
             Cluster cluster = null;
@@ -236,14 +247,14 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             // we know this host is type GsiSSHHostType
         } catch (SSHApiException e) {
-            String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+            String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
             log.error(error);
             jobDetails.setJobID("none");
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
             GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             throw new GFacProviderException(error, e);
         } catch (Exception e) {
-            String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+            String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
             log.error(error);
             jobDetails.setJobID("none");
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
@@ -255,8 +266,8 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
     public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
         // have to implement the logic to recover a gfac failure
         log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
-        HostDescriptionType host = jobExecutionContext.getApplicationContext().
-                getHostDescription().getType();
+        ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
         String jobId = "";
         String jobDesc = "";
         try {
@@ -306,8 +317,11 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                     throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
                 }
             }
-            delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobId);
-        } catch (GFacHandlerException e) {
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(
+                    jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId());
+            delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId);
+        } catch (Exception e) {
             throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 4d338e3..baca65c 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -20,21 +20,19 @@
 */
 package org.apache.airavata.gfac.gsissh.util;
 
-import java.sql.SQLException;
-import java.util.*;
-
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.credential.store.credential.Credential;
 import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
@@ -47,22 +45,26 @@ import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
 import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
 import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.FileArrayType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.SSHHostType;
 import org.apache.airavata.schemas.gfac.StringArrayType;
 import org.apache.airavata.schemas.gfac.URIArrayType;
-import org.apache.airavata.schemas.gfac.UnicoreHostType;
-import org.apache.openjpa.lib.log.Log;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.constraints.Max;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 
 
 public class GFACGSISSHUtils {
@@ -74,32 +76,35 @@ public class GFACGSISSHUtils {
     public static int maxClusterCount = 5;
     public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
     public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
-        HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription();
-        if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType
-                || registeredHost.getType() instanceof SSHHostType) {
-            logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
-        } else if (registeredHost.getType() instanceof GsisshHostType) {
-            String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
-            RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
-            requestData.setTokenId(credentialStoreToken);
-            PBSCluster pbsCluster = null;
-            GSISecurityContext context = null;
-            try {
+        JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+        JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+        try {
+            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+            if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE
+                    || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) {
+                logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+            } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+                String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+                RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway());
+                requestData.setTokenId(credentialStoreToken);
+                PBSCluster pbsCluster = null;
+                GSISecurityContext context = null;
+
                 TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
                 CredentialReader credentialReader = GFacUtils.getCredentialReader();
-                if(credentialReader != null){
-                	CertificateCredential credential = null;
-					try {
-						credential = (CertificateCredential)credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken);
-			      		requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
-					} catch (Exception e) {
-						logger.error(e.getLocalizedMessage());
-					}
+                if (credentialReader != null) {
+                    CertificateCredential credential = null;
+                    try {
+                        credential = (CertificateCredential) credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken);
+                        requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
+                    } catch (Exception e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
                 }
 
-                GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType();
-                String key = requestData.getMyProxyUserName() + registeredHost.getType().getHostAddress() +
-                        gsisshHostType.getPort();
+                String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+
+                        sshJobSubmission.getSshPort();
                 boolean recreate = false;
                 synchronized (clusters) {
                     if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
@@ -112,7 +117,7 @@ public class GFACGSISSHUtils {
                             clusters.get(key).remove(i);
                             recreate = true;
                         }
-                        if(!recreate) {
+                        if (!recreate) {
                             try {
                                 pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
                             } catch (Exception e) {
@@ -129,13 +134,12 @@ public class GFACGSISSHUtils {
                     }
 
                     if (recreate) {
-                        ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(),
-                                gsisshHostType.getPort());
+                        ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
+                                sshJobSubmission.getSshPort());
 
                         JobManagerConfiguration jConfig = null;
-                        String installedParentPath = ((HpcApplicationDeploymentType)
-                                jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
-                        String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager();
+                        String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
+                        String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
                         if (jobManager == null) {
                             logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
                             jConfig = CommonUtils.getPBSJobManager(installedParentPath);
@@ -160,28 +164,30 @@ public class GFACGSISSHUtils {
                         clusters.put(key, pbsClusters);
                     }
                 }
-            } catch (Exception e) {
-                throw new GFacException("An error occurred while creating GSI security context", e);
+
+                jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
             }
-            jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
+        } catch (Exception e) {
+            throw new GFacException("An error occurred while creating GSI security context", e);
         }
     }
 
-    public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext,
-                                                    ApplicationDeploymentDescriptionType app, Cluster cluster) {
+    public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
         JobDescriptor jobDescriptor = new JobDescriptor();
+        ApplicationContext applicationContext = jobExecutionContext.getApplicationContext();
+        ApplicationDeploymentDescription app = applicationContext.getApplicationDeploymentDescription();
         // this is common for any application descriptor
         jobDescriptor.setCallBackIp(ServerSettings.getIp());
         jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
-        jobDescriptor.setInputDirectory(app.getInputDataDirectory());
-        jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
-        jobDescriptor.setExecutablePath(app.getExecutableLocation());
-        jobDescriptor.setStandardOutFile(app.getStandardOutput());
-        jobDescriptor.setStandardErrorFile(app.getStandardError());
+        jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+        jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+        jobDescriptor.setExecutablePath(app.getExecutablePath());
+        jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+        jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
         Random random = new Random();
         int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name
         jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
-        jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
+        jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
 
         List<String> inputValues = new ArrayList<String>();
         MessageContext input = jobExecutionContext.getInMessageContext();

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index baab7b4..28d13f2 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -30,12 +30,12 @@ import java.util.concurrent.BlockingQueue;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.monitor.core.PushMonitor;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -107,30 +107,37 @@ public class AMQPMonitor extends PushMonitor {
     @Override
     public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
         // we subscribe to read user-host based subscription
-        HostDescription host = monitorID.getHost();
-        String hostAddress = host.getType().getHostAddress();
-        // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
-        // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
-        String channelID = CommonUtils.getChannelID(monitorID);
-        if(availableChannels.get(channelID) == null){
-        try {
-            //todo need to fix this rather getting it from a file
-            Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
-            Channel channel = null;
-            channel = connection.createChannel();
-            availableChannels.put(channelID, channel);
-            String queueName = channel.queueDeclare().getQueue();
-
-            BasicConsumer consumer = new
-                    BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
-            channel.basicConsume(queueName, true, consumer);
-            String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
-            // here we queuebind to a particular user in a particular machine
-            channel.queueBind(queueName, "glue2.computing_activity", filterString);
-            logger.info("Using filtering string to monitor: " + filterString);
-        } catch (IOException e) {
-            logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
-        }
+        ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
+        if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
+            // we get first ip address for the moment
+            String hostAddress = computeResourceDescription.getIpAddresses().get(0);
+            // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+            // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+            String channelID = CommonUtils.getChannelID(monitorID);
+            if (availableChannels.get(channelID) == null) {
+                try {
+                    //todo need to fix this rather getting it from a file
+                    Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+                    Channel channel = null;
+                    channel = connection.createChannel();
+                    availableChannels.put(channelID, channel);
+                    String queueName = channel.queueDeclare().getQueue();
+
+                    BasicConsumer consumer = new
+                            BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
+                    channel.basicConsume(queueName, true, consumer);
+                    String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+                    // here we queuebind to a particular user in a particular machine
+                    channel.queueBind(queueName, "glue2.computing_activity", filterString);
+                    logger.info("Using filtering string to monitor: " + filterString);
+                } catch (IOException e) {
+                    logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+                }
+            }
+        } else {
+            throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
+                    " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
+                    "IpAddress with it");
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f29dfbe7/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 94528b9..a979890 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -20,15 +20,11 @@
 */
 package org.apache.airavata.job;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -38,14 +34,29 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class AMQPMonitorTest {
 
@@ -54,12 +65,13 @@ public class AMQPMonitorTest {
     private String certificateLocation;
     private String pbsFilePath;
     private String workingDirectory;
-    private HostDescription hostDescription;
     private MonitorPublisher monitorPublisher;
     private BlockingQueue<MonitorID> finishQueue;
     private BlockingQueue<MonitorID> pushQueue;
     private Thread pushThread;
     private String proxyFilePath;
+    private ComputeResourceDescription computeResourceDescription;
+
     @Before
     public void setUp() throws Exception {
         System.setProperty("myproxy.username", "ogce");
@@ -98,14 +110,26 @@ public class AMQPMonitorTest {
         } catch (Exception e) {
             e.printStackTrace();
         }
+        computeResourceDescription = new ComputeResourceDescription("TestComputerResoruceId", "TestHostName");
+        computeResourceDescription.setHostName("stampede-host");
+        computeResourceDescription.addToIpAddresses("login1.stampede.tacc.utexas.edu");
+        ResourceJobManager resourceJobManager = new ResourceJobManager("1234", ResourceJobManagerType.SLURM);
+        Map<JobManagerCommand, String> commandMap = new HashMap<JobManagerCommand, String>();
+        commandMap.put(JobManagerCommand.SUBMISSION, "test");
+        resourceJobManager.setJobManagerCommands(commandMap);
+        resourceJobManager.setJobManagerBinPath("/usr/bin/");
+        resourceJobManager.setPushMonitoringEndpoint("push"); // TODO - add monitor mode
+        SSHJobSubmission sshJobSubmission = new SSHJobSubmission("TestSSHJobSubmissionInterfaceId", SecurityProtocol.GSI,
+                resourceJobManager);
+
+        AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+        String jobSubmissionID = appCatalog.getComputeResource().addSSHJobSubmission(sshJobSubmission);
+
+        JobSubmissionInterface jobSubmissionInterface = new JobSubmissionInterface(jobSubmissionID, JobSubmissionProtocol.SSH, 1);
+
+        computeResourceDescription.addToJobSubmissionInterfaces(jobSubmissionInterface);
+        computeResourceDescription.addToDataMovementInterfaces(new DataMovementInterface("4532", DataMovementProtocol.SCP, 1));
 
-        hostDescription = new HostDescription(GsisshHostType.type);
-        hostDescription.getType().setHostAddress("login1.stampede.tacc.utexas.edu");
-        hostDescription.getType().setHostName("stampede-host");
-        ((GsisshHostType) hostDescription.getType()).setJobManager("slurm");
-        ((GsisshHostType) hostDescription.getType()).setInstalledPath("/usr/bin/");
-        ((GsisshHostType) hostDescription.getType()).setPort(2222);
-        ((GsisshHostType) hostDescription.getType()).setMonitorMode("push");
     }
 
     @Test
@@ -151,7 +175,7 @@ public class AMQPMonitorTest {
         String jobID = pbsCluster.submitBatchJob(jobDescriptor);
         System.out.println(jobID);
         try {
-            pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce", jobName));
+            pushQueue.add(new MonitorID(computeResourceDescription, jobID,null,null,null, "ogce", jobName));
         } catch (Exception e) {
             e.printStackTrace();
         }