You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 23:06:26 UTC

git commit: Fixing the build and more issues related to gfac

Repository: airavata
Updated Branches:
  refs/heads/master 39370c526 -> 14c1841ca


Fixing the build and more issues related to gfac


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/14c1841c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/14c1841c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/14c1841c

Branch: refs/heads/master
Commit: 14c1841ca20d08bc1e9da254dbce5cb8a7266b26
Parents: 39370c5
Author: lahiru <la...@apache.org>
Authored: Mon May 5 17:06:10 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon May 5 17:06:10 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |  3 +-
 .../airavata/common/utils/ServerSettings.java   |  5 ++
 .../server/src/main/resources/gfac-config.xml   |  6 +-
 .../org/apache/airavata/gfac/core/cpi/GFac.java |  7 ---
 .../apache/airavata/gfac/core/cpi/GFacImpl.java | 46 +++++++++++++--
 .../gfac/core/handler/AbstractHandler.java      | 25 ++++++++-
 .../core/monitor/AiravataJobStatusUpdator.java  | 18 ------
 .../gfac/core/provider/AbstractProvider.java    | 19 ++++++-
 .../gsissh/provider/impl/GSISSHProvider.java    |  2 +-
 .../gfac/local/provider/impl/LocalProvider.java | 10 ++++
 .../gfac/services/impl/LocalProviderTest.java   |  7 +++
 .../airavata/gfac/monitor/HPCMonitorID.java     |  2 +-
 .../handlers/GridPullMonitorHandler.java        | 29 +---------
 .../gfac/monitor/impl/LocalJobMonitor.java      | 59 --------------------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |  7 +++
 .../apache/airavata/job/AMQPMonitorTest.java    |  2 +-
 .../core/impl/EmbeddedGFACJobSubmitter.java     |  8 ---
 .../orchestrator/core/job/JobSubmitter.java     |  7 ---
 18 files changed, 123 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 89bab76..f207720 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -64,8 +64,9 @@ public class CreateLaunchExperiment {
 //            addDescriptors();
 //            final String expId = createExperimentForSSHHost(airavata);
 //            final String expId = createExperimentForSSHHost(airavata);
-            final String expId = createExperimentForTrestles(airavata);
+//            final String expId = createExperimentForTrestles(airavata);
 //            final String expId = createExperimentForStampede(airavata);
+            final String expId = createExperimentForLocalHost(airavata);
             System.out.println("Experiment ID : " + expId);
             String clonedExpId = cloneExperiment(airavata, expId);
             System.out.println("Cloned Experiment ID : " + clonedExpId);

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 1380955..71e1437 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -65,6 +65,8 @@ public class ServerSettings extends ApplicationSettings{
     private static final String MY_PROXY_USER = "myproxy.user";
     private static final String MY_PROXY_PASSWORD = "myproxy.password";
     private static final String MY_PROXY_LIFETIME = "myproxy.life";
+    private static final String ACTIVITY_LISTENERS = "activity.listeners";
+
     private static boolean stopAllThreads = false;
 
     public static String getDefaultUser() throws ApplicationSettingsException{
@@ -213,4 +215,7 @@ public class ServerSettings extends ApplicationSettings{
     public static int getMyProxyLifetime() throws ApplicationSettingsException {
         return Integer.parseInt(getSetting(MY_PROXY_LIFETIME));
     }
+    public static String[] getActivityListeners() throws ApplicationSettingsException {
+        return getSetting(ACTIVITY_LISTENERS).split(",");
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 29ef0f8..6d81a59 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -12,13 +12,11 @@
 
 <GFac>
     <DaemonHandlers>
-        <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler">
-                <property name="listeners" value="org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator"/>
-        </Handler>
+        <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"/>
     </DaemonHandlers>
     <GlobalHandlers>
         <InHandlers>
-            <Handler class="org.apache.airavata.gfac.core.handlerAppDescriptorCheckHandler">
+            <Handler class="org.apache.airavata.gfac.core.handler.AppDescriptorCheckHandler">
                     <property name="name" value="value"/>
             </Handler>
         </InHandlers>

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index c0f7984..60e499f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -39,11 +39,4 @@ public interface GFac {
      */
     public JobExecutionContext submitJob(String experimentID,String taskID) throws GFacException;
 
-    /**
-     *  This method has to be invoked after submitting the job and have to make sure job is properly finished
-     * @param jobExecutionContext
-     * @throws GFacException
-     */
-    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 15b0d72..1d5c6f9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -28,7 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -40,6 +42,8 @@ import org.apache.airavata.gfac.Scheduler;
 import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.AbstractActivityListener;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
 import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
 import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
@@ -81,6 +85,11 @@ public class GFacImpl implements GFac {
     private static List<ThreadedHandler> daemonHandlers;
 
     private File gfacConfigFile;
+
+    private List<AbstractActivityListener> activityListeners;
+
+    private static MonitorPublisher monitorPublisher;
+
     /**
      * Constructor for GFac
      *
@@ -93,9 +102,32 @@ public class GFacImpl implements GFac {
         this.airavataAPI = airavataAPI;
         this.airavataRegistry2 = airavataRegistry2;
         daemonHandlers = new ArrayList<ThreadedHandler>();
+        activityListeners = new ArrayList<AbstractActivityListener>();
+        monitorPublisher = new MonitorPublisher(new EventBus());     // This is a EventBus common for gfac
+        startStatusUpdators();
         startDaemonHandlers();
     }
 
+    private void startStatusUpdators() {
+        try {
+            String[] listenerClassList = ServerSettings.getActivityListeners();
+            for (String listenerClass : listenerClassList) {
+                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+                AbstractActivityListener abstractActivityListener = aClass.newInstance();
+                activityListeners.add(abstractActivityListener);
+                abstractActivityListener.setup(getMonitorPublisher(),registry);
+                getMonitorPublisher().registerListener(abstractActivityListener);
+            }
+        }catch (ClassNotFoundException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (InstantiationException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (IllegalAccessException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (ApplicationSettingsException e){
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        }
+    }
     private void startDaemonHandlers()  {
         List<GFacHandlerConfig> daemonHandlerConfig = null;
         URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -205,15 +237,15 @@ public class GFacImpl implements GFac {
 
         // start constructing jobexecutioncontext
         jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+        // setting experiment/task/workflownode related information
         Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentID);
         jobExecutionContext.setExperiment(experiment);
         jobExecutionContext.setExperimentID(experimentID);
-
+        jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
         jobExecutionContext.setTaskData(taskData);
 
-
-
-
+        // setting the registry
         jobExecutionContext.setRegistry(registry);
 
         ApplicationContext applicationContext = new ApplicationContext();
@@ -399,5 +431,11 @@ public class GFacImpl implements GFac {
         return gfacConfigFile;
     }
 
+    public static MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
 
+    public Registry getRegistry() {
+        return registry;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index dcbc79d..0049c3c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -21,17 +21,40 @@
 package org.apache.airavata.gfac.core.handler;
 
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 
 public abstract class AbstractHandler implements GFacHandler {
 	protected Registry registry = null;
 
-	public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+    protected MonitorPublisher publisher = null;
+
+    protected AbstractHandler() {
+        publisher = GFacImpl.getMonitorPublisher();   // This will not be null because this will be initialize in GFacIml
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
 		registry = jobExecutionContext.getRegistry();
         if(registry == null){
             registry = RegistryFactory.getDefaultRegistry();
         }
 	}
 
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public void setRegistry(Registry registry) {
+        this.registry = registry;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index aaf7084..254d7fc 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -43,8 +43,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
 
     private MonitorPublisher monitorPublisher;
 
-    private BlockingQueue<MonitorID> jobsToMonitor;
-
 
     public Registry getAiravataRegistry() {
         return airavataRegistry;
@@ -54,13 +52,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
         this.airavataRegistry = airavataRegistry;
     }
 
-    public BlockingQueue<MonitorID> getJobsToMonitor() {
-        return jobsToMonitor;
-    }
-
-    public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
-        this.jobsToMonitor = jobsToMonitor;
-    }
 
     @Subscribe
     public void updateRegistry(JobStatusChangeRequest jobStatus) {
@@ -77,13 +68,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             }
             logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
-            switch (state) {
-                case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
-                    jobsToMonitor.remove(jobStatus.getMonitorID());
-                    break;
-			default:
-				break;
-            }
         }
     }
 
@@ -135,8 +119,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
 		for (Object configuration : configurations) {
 			if (configuration instanceof Registry){
 				this.airavataRegistry=(Registry)configuration;
-			} else if (configuration instanceof BlockingQueue<?>){
-				this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
 			} else if (configuration instanceof MonitorPublisher){
 				this.monitorPublisher=(MonitorPublisher) configuration;
 			} 

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 4d28d72..5b65185 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -21,8 +21,11 @@
 
 package org.apache.airavata.gfac.core.provider;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobStatus;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -38,11 +41,25 @@ public abstract class AbstractProvider implements GFacProvider{
 	protected JobStatus status;   //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
 	protected JobExecutionContext jobExecutionContext;
 
-	public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+    protected MonitorPublisher monitorPublisher;
+
+    protected AbstractProvider() {
+        this.monitorPublisher = GFacImpl.getMonitorPublisher();
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
         log.debug("Initializing " + this.getClass().getName());
 		registry = RegistryFactory.getDefaultRegistry();
 		details = new JobDetails();
 		status = new JobStatus();
 		this.jobExecutionContext=jobExecutionContext;
 	}
+
+    public MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+        this.monitorPublisher = monitorPublisher;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 9e66b71..880cfbd 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -103,7 +103,7 @@ public class GSISSHProvider extends AbstractProvider {
                     pullMonitorHandler = threadedHandler;
                 }
             }
-            // we know this hos is type GsiSSHHostType
+            // we know this host is type GsiSSHHostType
             String monitorMode = ((GsisshHostType) host).getMonitorMode();
             if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
                 log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 5494629..8e78f5a 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -29,6 +29,9 @@ import java.util.Map;
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
@@ -163,6 +166,13 @@ public class LocalProvider extends AbstractProvider {
                     .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
                     .append(String.valueOf(returnValue));
             log.info(buf.toString());
+            MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(),jobId,
+                    jobExecutionContext.getTaskData().getTaskID(),
+                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getExperiment().getUserName());
+            JobStatusChangeRequest jobStatusChangeRequest = new JobStatusChangeRequest(monitorID);
+            jobStatusChangeRequest.setState(JobState.COMPLETE);
+            this.getMonitorPublisher().publish(jobStatusChangeRequest);
         } catch (IOException io) {
             throw new GFacProviderException(io.getMessage(), io);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index df628e4..a6f689f 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -20,16 +20,20 @@
 */
 package org.apache.airavata.core.gfac.services.impl;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.commons.gfac.type.*;
 import org.apache.airavata.gfac.GFacConfiguration;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.ApplicationContext;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
 import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
+import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.InputParameterType;
@@ -141,8 +145,10 @@ public class LocalProviderTest {
         jobExecutionContext.setOutMessageContext(outMessage);
 
         jobExecutionContext.setExperimentID("test123");
+        jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp"));
         jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID()));
         jobExecutionContext.setRegistry(new LoggingRegistryImpl());
+        jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none"));
 
 
     }
@@ -165,6 +171,7 @@ public class LocalProviderTest {
         LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
         localDirectorySetupHandler.invoke(jobExecutionContext);
         LocalProvider localProvider = new LocalProvider();
+        localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
         localProvider.initialize(jobExecutionContext);
         localProvider.execute(jobExecutionContext);
         localProvider.dispose(jobExecutionContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
index da4ff88..942f6ae 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -80,7 +80,7 @@ public class HPCMonitorID extends MonitorID {
         jobID = jobExecutionContext.getJobDetails().getJobID();
         taskID = jobExecutionContext.getTaskData().getTaskID();
         experimentID = jobExecutionContext.getExperiment().getExperimentID();
-        workflowNodeID =  jobExecutionContext.getExperiment().getWorkflowNodeDetailsList().get(0).getNodeInstanceId();// at this point we only have one node todo: fix this
+        workflowNodeID =  jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index a87b8a5..a9d2e73 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.monitor.handlers;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.monitor.AbstractActivityListener;
@@ -53,11 +54,7 @@ public class GridPullMonitorHandler extends ThreadedHandler {
 
     private AuthenticationInfo authenticationInfo;
 
-    private List<AbstractActivityListener> activityListeners;
-
-    boolean registrySet = false;
     public void initProperties(Map<String, String> properties) throws GFacHandlerException {
-        activityListeners = new ArrayList<AbstractActivityListener>();
         String myProxyUser = null;
         try {
             myProxyUser = ServerSettings.getSetting("myproxy.username");
@@ -66,24 +63,9 @@ public class GridPullMonitorHandler extends ThreadedHandler {
             String myProxyServer = ServerSettings.getSetting("myproxy.server");
             setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
                     7512, 17280000, certPath));
-            hpcPullMonitor = new HPCPullMonitor();
-            String listeners = properties.get("listeners");
-            String[] split = listeners.split(",");
-            for (String listenerClass : split) {
-                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
-                AbstractActivityListener abstractActivityListener = aClass.newInstance();
-                activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(hpcPullMonitor.getQueue(), hpcPullMonitor.getPublisher());
-                hpcPullMonitor.getPublisher().registerListener(abstractActivityListener);
-            }
+            hpcPullMonitor = new HPCPullMonitor(GFacImpl.getMonitorPublisher());
         } catch (ApplicationSettingsException e) {
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        } catch (ClassNotFoundException e) {
-            logger.error("Error loading the listener classes configured in gfac-config.xml");
-        } catch (InstantiationException e) {
-            logger.error("Error loading the listener classes configured in gfac-config.xml");
-        } catch (IllegalAccessException e) {
-            logger.error("Error loading the listener classes configured in gfac-config.xml");
         }
     }
 
@@ -92,13 +74,8 @@ public class GridPullMonitorHandler extends ThreadedHandler {
     }
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-        if(!registrySet){
-            for(AbstractActivityListener listener:activityListeners){
-                listener.setup(jobExecutionContext.getRegistry());
-            }
-        }
         super.invoke(jobExecutionContext);
-        MonitorID monitorID = new HPCMonitorID(authenticationInfo, jobExecutionContext);
+        MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
         try {
             CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
         } catch (AiravataMonitorException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
deleted file mode 100644
index 3c87d7d..0000000
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.JobIdentity;
-import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor can be used to monitor a job which runs locally,
- * Since its a local job job doesn't have states, once it get executed
- * then the job starts running
- */
-public class LocalJobMonitor extends AiravataAbstractMonitor {
-    // Though we have a qeuue here, it not going to be used in local jobs
-    BlockingQueue<MonitorID> jobQueue;
-
-    public void run() {
-        do {
-            try {
-                MonitorID take = jobQueue.take();
-                getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } while (!ServerSettings.isStopAllThreads());
-    }
-
-    public BlockingQueue<MonitorID> getJobQueue() {
-        return jobQueue;
-    }
-
-    public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
-        this.jobQueue = jobQueue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 38453e8..38b631c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -63,6 +63,13 @@ public class HPCPullMonitor extends PullMonitor {
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = new MonitorPublisher(new EventBus());
     }
+
+     public HPCPullMonitor(MonitorPublisher monitorPublisher){
+        connections = new HashMap<String, ResourceConnection>();
+        this.queue = new LinkedBlockingDeque<UserMonitorData>();
+        publisher = monitorPublisher;
+    }
+
     public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
         this.queue = queue;
         this.publisher = publisher;

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 2c34df9..9c3d08b 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -62,7 +62,7 @@ public class AMQPMonitorTest {
     @Before
     public void setUp() throws Exception {
         System.setProperty("myproxy.username", "ogce");
-        System.setProperty("myproxy.password", "OpenGwy14");
+        System.setProperty("myproxy.password", "");
         System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
         System.setProperty("gsi.working.directory", "/home1/01437/ogce");
         System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates");

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index cbc564d..90b07a2 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -82,12 +82,4 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
     public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
         this.orchestratorContext = orchestratorContext;
     }
-
-    public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException {
-        try {
-            gfac.invokeOutFlowHandlers(jobExecutionContext);
-        } catch (GFacException e) {
-            throw new OrchestratorException(e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index b1787a3..1fd5269 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -52,11 +52,4 @@ public interface JobSubmitter {
      * @return JobExecutionContext return the jobExecutionContext from GFac
      */
     JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException;
-
-    /**
-     * This can be use to handle any after Jobsubmission task
-     * @param jobExecutionContext
-     * @throws OrchestratorException
-     */
-    void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException;
 }