You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 23:05:40 UTC

git commit: More fixed for BetterGFacImpl

Repository: airavata
Updated Branches:
  refs/heads/master d7c711667 -> a1bb76352


More fixed for BetterGFacImpl


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a1bb7635
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a1bb7635
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a1bb7635

Branch: refs/heads/master
Commit: a1bb763521cd80092df674e90663bf36fc37d913
Parents: d7c7116
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 17:05:21 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 17:05:21 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |  1 +
 .../gfac/core/context/JobExecutionContext.java  | 11 +++
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  6 +-
 .../org/apache/airavata/gfac/core/cpi/GFac.java | 14 ++++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |  6 +-
 .../core/provider/GFacRecoverableProvider.java  | 43 +++++++++++
 .../gfac/core/states/GfacExperimentState.java   | 81 ++++++++++++++++++++
 .../gfac/core/states/GfacPluginState.java       | 56 ++++++++++++++
 .../gsissh/provider/impl/GSISSHProvider.java    |  4 +
 .../handlers/GridPullMonitorHandler.java        |  8 +-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 14 +++-
 11 files changed, 240 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 60a0f21..2343a40 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -125,6 +125,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         }
         try {
             publisher = new MonitorPublisher(new EventBus());
+            BetterGfacImpl.setMonitorPublisher(publisher);
             registry = RegistryFactory.getDefaultRegistry();
             setGatewayProperties();
             BetterGfacImpl.startDaemonHandlers();

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 170c2c8..b378662 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.airavata.gfac.GFacConfiguration;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.notification.GFacNotifier;
 import org.apache.airavata.gfac.core.provider.GFacProvider;
 import org.apache.airavata.model.workspace.experiment.Experiment;
@@ -58,6 +59,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
 
     private WorkflowNodeDetails workflowNodeDetails;
 
+    GFac gfac;
+
 //    private ContextHeaderDocument.ContextHeader contextHeader;
 
     // Keep track of the current path of the message. Before hitting provider its in-path.
@@ -274,4 +277,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable
     public void setWorkflowNodeDetails(WorkflowNodeDetails workflowNodeDetails) {
         this.workflowNodeDetails = workflowNodeDetails;
     }
+
+    public GFac getGfac() {
+        return gfac;
+    }
+
+    public void setGfac(GFac gfac) {
+        this.gfac = gfac;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fa7eb33..b1aa201 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -276,7 +276,7 @@ public class BetterGfacImpl implements GFac {
                 serviceDescription.getType().getOutputParametersArray())));
 
         jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
-
+        jobExecutionContext.setGfac(this);
         return jobExecutionContext;
     }
 
@@ -627,6 +627,10 @@ public class BetterGfacImpl implements GFac {
     }
 
 
+    public static void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+        BetterGfacImpl.monitorPublisher = monitorPublisher;
+    }
+
     public AiravataAPI getAiravataAPI() {
         return airavataAPI;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index caf9575..291d3ce 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -39,4 +39,18 @@ public interface GFac {
      */
     public boolean submitJob(String experimentID,String taskID) throws GFacException;
 
+    /**
+     * This method can be used in a handler to ivvoke outhandler asynchronously
+     * @param jobExecutionContext
+     * @throws GFacException
+     */
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
+
+    /**
+     * This method can be used to handle re-run case asynchronously
+     * @param jobExecutionContext
+     * @throws GFacException
+     */
+    public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 1bc639a..ebe60b5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -275,7 +275,7 @@ public class GFacImpl implements GFac {
                 serviceDescription.getType().getOutputParametersArray())));
 
         jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
-
+        jobExecutionContext.setGfac(this);
         return jobExecutionContext;
     }
 
@@ -401,6 +401,10 @@ public class GFacImpl implements GFac {
         }
     }
 
+    public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        this.invokeOutFlowHandlers(jobExecutionContext);
+    }
+
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
         List<GFacHandlerConfig> handlers = null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
new file mode 100644
index 0000000..6f98cf6
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.provider;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+
+/**
+ * This provider type can be used to implement stateful Operation and
+ * we recommend to use the ZK client to store and retrieve the states
+ * of the handler implementation. Framework level we use
+ * ZK to decide handler ran successfully or not so each handler
+ * execution details can be found in following zk path
+ * /gfac-experiment/<gfac-node-name>/experimentId+taskId/full-qualified-handlername/state
+ * ex: /gfac-experiment/gfac-node0/echoExperiment_2c6c11b8-dea0-4ec8-9832-f3e69fe2e6bb+IDontNeedaNode_682faa66-6218-4897-9271-656bfb8b2bd1/org.apache.airavata.gfac.handlers.Test/state
+ */
+public interface GFacRecoverableProvider extends GFacProvider {
+    /**
+     * This method can be used to implement recovering part of the stateful handler
+     * If you do not want to recover an already ran handler you can simply implement
+     * GfacAbstract Handler or GFacHandler or leave this recover method empty.
+     *
+     * @param jobExecutionContext
+     */
+    public void recover(JobExecutionContext jobExecutionContext);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacExperimentState.java
new file mode 100644
index 0000000..7bf7d3b
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacExperimentState.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.core.states;
+
+
+public enum GfacExperimentState {
+    LAUNCHED(0),
+    ACCEPTED(1),
+    INHANDLERSINVOKING(2),
+    INHANDLERSINVOKED(3),
+    PROVIDERINVOKING(4),
+    PROVIDERINVOKED(5),
+    OUTHANDLERSINVOKING(6),
+    OUTHANDLERSINVOKED(7),
+    COMPLETED(8),
+    FAILED(9),
+    UNKNOWN(10);
+
+    private final int value;
+
+    private GfacExperimentState(int value) {
+        this.value = value;
+    }
+
+    /**
+     * Get the integer value of this enum value, as defined in the Thrift IDL.
+     */
+    public int getValue() {
+        return value;
+    }
+
+    /**
+     * Find a the enum type by its integer value, as defined in the Thrift IDL.
+     *
+     * @return null if the value is not found.
+     */
+    public static GfacExperimentState findByValue(int value) {
+        switch (value) {
+            case 0:
+                return INHANDLERSINVOKING;
+            case 1:
+                return INHANDLERSINVOKED;
+            case 2:
+                return PROVIDERINVOKING;
+            case 3:
+                return PROVIDERINVOKED;
+            case 4:
+                return OUTHANDLERSINVOKING;
+            case 5:
+                return OUTHANDLERSINVOKED;
+            case 6:
+                return COMPLETED;
+            case 7:
+                return FAILED;
+            case 8:
+                return UNKNOWN;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
new file mode 100644
index 0000000..53df1d4
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.states;
+
+public enum GfacPluginState {
+    INVOKING(0),
+
+    INVOKED(1);
+
+    private final int value;
+
+    private GfacPluginState(int value) {
+        this.value = value;
+    }
+
+    /**
+     * Get the integer value of this enum value, as defined in the Thrift IDL.
+     */
+    public int getValue() {
+        return value;
+    }
+
+    /**
+     * Find a the enum type by its integer value, as defined in the Thrift IDL.
+     *
+     * @return null if the value is not found.
+     */
+    public static GfacPluginState findByValue(int value) {
+        switch (value) {
+            case 0:
+                return INVOKING;
+            case 1:
+                return INVOKED;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c9b3aa1..49f9235 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.gsissh.provider.impl;
 import org.apache.airavata.gfac.ExecutionMode;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
@@ -98,6 +99,9 @@ public class GSISSHProvider extends AbstractProvider {
             // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
             // to perform monitoring, daemon handlers can be accessed from anywhere
             List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+            if(daemonHandlers == null){
+                daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+            }
             ThreadedHandler pullMonitorHandler = null;
             ThreadedHandler pushMonitorHandler = null;
             String monitorMode = ((GsisshHostType) host).getMonitorMode();

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index bb7d639..d6cdcaf 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.monitor.handlers;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
@@ -60,7 +61,11 @@ public class GridPullMonitorHandler extends ThreadedHandler {
             String myProxyServer = ServerSettings.getSetting("myproxy.server");
             setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
                     7512, 17280000, certPath));
-            hpcPullMonitor = new HPCPullMonitor(GFacImpl.getMonitorPublisher());
+            if(GFacImpl.getMonitorPublisher() != null){
+                hpcPullMonitor = new HPCPullMonitor(GFacImpl.getMonitorPublisher());    // todo remove the GFacImpl
+            }else {
+                hpcPullMonitor = new HPCPullMonitor(BetterGfacImpl.getMonitorPublisher());
+            }
         } catch (ApplicationSettingsException e) {
             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
         }
@@ -72,6 +77,7 @@ public class GridPullMonitorHandler extends ThreadedHandler {
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         super.invoke(jobExecutionContext);
+        hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
         MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
         try {
             CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a1bb7635/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 03f1940..a126243 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -25,6 +25,7 @@ import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.notification.MonitorPublisher;
@@ -61,6 +62,9 @@ public class HPCPullMonitor extends PullMonitor {
 
     private MonitorPublisher publisher;
 
+
+    private GFac gfac;
+
     public HPCPullMonitor(){
         connections = new HashMap<String, ResourceConnection>();
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
@@ -160,7 +164,7 @@ public class HPCPullMonitor extends PullMonitor {
                         if (jobStatus.getState().equals(JobState.COMPLETE)) {
                             completedJobs.add(iMonitorID);
                             try {
-								CommonUtils.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
+								gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
 							} catch (GFacException e) {
 								logger.info(e.getLocalizedMessage(),e);
 							}
@@ -295,4 +299,12 @@ public class HPCPullMonitor extends PullMonitor {
     public void setStartPulling(boolean startPulling) {
         this.startPulling = startPulling;
     }
+
+    public GFac getGfac() {
+        return gfac;
+    }
+
+    public void setGfac(GFac gfac) {
+        this.gfac = gfac;
+    }
 }