You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/22 16:54:48 UTC

[1/2] git commit: reverting CreateLaunchExperiment

Repository: airavata
Updated Branches:
  refs/heads/master 450d249d6 -> 5996b5cc6


reverting CreateLaunchExperiment


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e5c4d33d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e5c4d33d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e5c4d33d

Branch: refs/heads/master
Commit: e5c4d33d140463549187f16543676c923a9102f8
Parents: 450d249
Author: lahiru <la...@apache.org>
Authored: Fri Sep 19 12:46:47 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Fri Sep 19 12:46:47 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 258 ++++++++++---------
 1 file changed, 132 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e5c4d33d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 4dcfe7f..b0f7582 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -34,14 +34,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
 public class CreateLaunchExperiment {
 
     //FIXME: Read from a config file
-    public static final String THRIFT_SERVER_HOST = "localhost";
-    public static final int THRIFT_SERVER_PORT = 8930;
+    public static final String THRIFT_SERVER_HOST = "149.165.228.109";
+    public static final int THRIFT_SERVER_PORT = 9930;
     private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
@@ -50,7 +51,7 @@ public class CreateLaunchExperiment {
     private static String wrfAppId = "WRF_5f097c9c-7066-49ec-aed7-4e39607b3adc";
     private static String amberAppId = "Amber_89906be6-5678-49a6-9d04-a0604fbdef2e";
 
-    private static String localHost = "localhost";
+    private static String localHost = "149.165.228.109";
     private static String trestlesHostName = "trestles.sdsc.xsede.org";
     private static String stampedeHostName = "stampede.tacc.xsede.org";
     private static String br2HostName = "bigred2.uits.iu.edu";
@@ -60,21 +61,26 @@ public class CreateLaunchExperiment {
             airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavataClient.getAPIVersion());
 //            registerApplications();
-////          final String expId = createExperimentForSSHHost(airavata);
-//            final String expId = createEchoExperimentForTrestles(airavataClient);
-//            final String expId = createEchoExperimentForStampede(airavataClient);
-//            final String expId = createExperimentEchoForLocalHost(airavataClient);
-//            final String expId = createExperimentWRFTrestles(airavataClient);
-              final String expId = createExperimentForBR2(airavataClient);
-//            final String expId = createExperimentForBR2Amber(airavataClient);
-//            final String expId = createExperimentWRFStampede(airavataClient);
-//            final String expId = createExperimentForStampedeAmber(airavataClient);
-//            final String expId = createExperimentForTrestlesAmber(airavataClient);
-
-//            System.out.println("Experiment ID : " + expId);
-//            updateExperiment(airavata, expId);
-              launchExperiment(airavataClient, expId);
-              System.out.println(expId);
+            Date date = new Date();
+            long time = date.getTime();
+            int numberOfRequests = 1;
+            for (int i = 0; i < numberOfRequests; i++) {
+//                (new Thread() {
+//                    @Override
+//                    public void run() {
+                        try {
+                            launchExperiment(airavataClient, createExperimentForBR2(airavataClient));
+                        } catch (Exception e) {
+                            logger.error("Error while connecting with server", e.getMessage());
+                            e.printStackTrace();
+                        }
+                    }
+            long time1 = (date.getTime()-time);
+            System.out.println("Number of requests: " + numberOfRequests + " time taken Miliseconds: " + time1);
+
+//                }).start();
+//            }
+
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();
@@ -124,10 +130,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(trestlesHostName)){
+                    if (resourceName.equals(trestlesHostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "sds128");
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -198,10 +204,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(wrfAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(stampedeHostName)){
+                    if (resourceName.equals(stampedeHostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 2, 32, 1, "development", 90, 0, 1, "TG-STA110014S");
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -272,10 +278,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(wrfAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(trestlesHostName)){
+                    if (resourceName.equals(trestlesHostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "sds128");
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -348,10 +354,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(localHost)){
+                    if (resourceName.equals(localHost)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "");
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -448,10 +454,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(stampedeHostName)){
+                    if (resourceName.equals(stampedeHostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "TG-STA110014S");
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -572,11 +578,11 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(br2HostName)){
-                        ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id,  1, 1, 1, "normal", 1, 0, 1, null);
+                    if (resourceName.equals(br2HostName)) {
+                        ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, null);
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
                         userConfigurationData.setOverrideManualScheduledParams(false);
@@ -655,10 +661,10 @@ public class CreateLaunchExperiment {
 
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(br2HostName)){
+                    if (resourceName.equals(br2HostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0, 1, null);
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -737,10 +743,10 @@ public class CreateLaunchExperiment {
             simpleExperiment.setExperimentOutputs(exOut);
 
             Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
-            if (computeResources != null && computeResources.size() != 0){
-                for (String id : computeResources.keySet()){
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
                     String resourceName = computeResources.get(id);
-                    if (resourceName.equals(stampedeHostName)){
+                    if (resourceName.equals(stampedeHostName)) {
                         ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "development", 20, 0, 1, null);
                         UserConfigurationData userConfigurationData = new UserConfigurationData();
                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -768,86 +774,86 @@ public class CreateLaunchExperiment {
     }
 
     public static String createExperimentForTrestlesAmber(Airavata.Client client) throws TException {
-           try {
-               List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
-               DataObjectType input = new DataObjectType();
-               input.setKey("Heat_Restart_File");
-               input.setType(DataType.URI);
-               input.setValue("/Users/lahirugunathilake/Downloads/02_Heat.rst");
-               exInputs.add(input);
-
-               DataObjectType input1 = new DataObjectType();
-               input1.setKey("Production_Control_File");
-               input1.setType(DataType.URI);
-               input1.setValue("/Users/lahirugunathilake/Downloads/03_Prod.in");
-               exInputs.add(input1);
-
-               DataObjectType input2 = new DataObjectType();
-               input2.setKey("Production_Control_File");
-               input2.setType(DataType.URI);
-               input2.setValue("/Users/lahirugunathilake/Downloads/prmtop");
-               exInputs.add(input2);
-
-               List<DataObjectType> exOut = new ArrayList<DataObjectType>();
-               DataObjectType output = new DataObjectType();
-               output.setKey("AMBER_Execution_Summary");
-               output.setType(DataType.URI);
-               output.setValue("");
-               exOut.add(output);
-
-               DataObjectType output1 = new DataObjectType();
-               output1.setKey("AMBER_Execution_log");
-               output1.setType(DataType.URI);
-               output1.setValue("");
-               exOut.add(output1);
-               DataObjectType output2 = new DataObjectType();
-               output2.setKey("AMBER_Trajectory_file");
-               output2.setType(DataType.URI);
-               output2.setValue("");
-               exOut.add(output2);
-               DataObjectType output3 = new DataObjectType();
-               output3.setKey("AMBER_Restart_file");
-               output3.setType(DataType.URI);
-               output3.setValue("");
-               exOut.add(output3);
-
-               Project project = ProjectModelUtil.createProject("default", "admin", "test project");
-               String projectId = client.createProject(project);
-
-               Experiment simpleExperiment =
-                       ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "sshEchoExperiment", "SimpleEchoBR", amberAppId, exInputs);
-               simpleExperiment.setExperimentOutputs(exOut);
-
-               Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
-               if (computeResources != null && computeResources.size() != 0){
-                   for (String id : computeResources.keySet()){
-                       String resourceName = computeResources.get(id);
-                       if (resourceName.equals(trestlesHostName)){
-                           ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null);
-                           UserConfigurationData userConfigurationData = new UserConfigurationData();
-                           userConfigurationData.setAiravataAutoSchedule(false);
-                           userConfigurationData.setOverrideManualScheduledParams(false);
-                           userConfigurationData.setComputationalResourceScheduling(scheduling);
-                           simpleExperiment.setUserConfigurationData(userConfigurationData);
-                           return client.createExperiment(simpleExperiment);
-                       }
-                   }
-               }
-           } catch (AiravataSystemException e) {
-               logger.error("Error occured while creating the experiment...", e.getMessage());
-               throw new AiravataSystemException(e);
-           } catch (InvalidRequestException e) {
-               logger.error("Error occured while creating the experiment...", e.getMessage());
-               throw new InvalidRequestException(e);
-           } catch (AiravataClientException e) {
-               logger.error("Error occured while creating the experiment...", e.getMessage());
-               throw new AiravataClientException(e);
-           } catch (TException e) {
-               logger.error("Error occured while creating the experiment...", e.getMessage());
-               throw new TException(e);
-           }
+        try {
+            List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
+            DataObjectType input = new DataObjectType();
+            input.setKey("Heat_Restart_File");
+            input.setType(DataType.URI);
+            input.setValue("/Users/lahirugunathilake/Downloads/02_Heat.rst");
+            exInputs.add(input);
+
+            DataObjectType input1 = new DataObjectType();
+            input1.setKey("Production_Control_File");
+            input1.setType(DataType.URI);
+            input1.setValue("/Users/lahirugunathilake/Downloads/03_Prod.in");
+            exInputs.add(input1);
+
+            DataObjectType input2 = new DataObjectType();
+            input2.setKey("Production_Control_File");
+            input2.setType(DataType.URI);
+            input2.setValue("/Users/lahirugunathilake/Downloads/prmtop");
+            exInputs.add(input2);
+
+            List<DataObjectType> exOut = new ArrayList<DataObjectType>();
+            DataObjectType output = new DataObjectType();
+            output.setKey("AMBER_Execution_Summary");
+            output.setType(DataType.URI);
+            output.setValue("");
+            exOut.add(output);
+
+            DataObjectType output1 = new DataObjectType();
+            output1.setKey("AMBER_Execution_log");
+            output1.setType(DataType.URI);
+            output1.setValue("");
+            exOut.add(output1);
+            DataObjectType output2 = new DataObjectType();
+            output2.setKey("AMBER_Trajectory_file");
+            output2.setType(DataType.URI);
+            output2.setValue("");
+            exOut.add(output2);
+            DataObjectType output3 = new DataObjectType();
+            output3.setKey("AMBER_Restart_file");
+            output3.setType(DataType.URI);
+            output3.setValue("");
+            exOut.add(output3);
+
+            Project project = ProjectModelUtil.createProject("default", "admin", "test project");
+            String projectId = client.createProject(project);
+
+            Experiment simpleExperiment =
+                    ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "sshEchoExperiment", "SimpleEchoBR", amberAppId, exInputs);
+            simpleExperiment.setExperimentOutputs(exOut);
+
+            Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
+            if (computeResources != null && computeResources.size() != 0) {
+                for (String id : computeResources.keySet()) {
+                    String resourceName = computeResources.get(id);
+                    if (resourceName.equals(trestlesHostName)) {
+                        ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null);
+                        UserConfigurationData userConfigurationData = new UserConfigurationData();
+                        userConfigurationData.setAiravataAutoSchedule(false);
+                        userConfigurationData.setOverrideManualScheduledParams(false);
+                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                        return client.createExperiment(simpleExperiment);
+                    }
+                }
+            }
+        } catch (AiravataSystemException e) {
+            logger.error("Error occured while creating the experiment...", e.getMessage());
+            throw new AiravataSystemException(e);
+        } catch (InvalidRequestException e) {
+            logger.error("Error occured while creating the experiment...", e.getMessage());
+            throw new InvalidRequestException(e);
+        } catch (AiravataClientException e) {
+            logger.error("Error occured while creating the experiment...", e.getMessage());
+            throw new AiravataClientException(e);
+        } catch (TException e) {
+            logger.error("Error occured while creating the experiment...", e.getMessage());
+            throw new TException(e);
+        }
         return null;
-       }
+    }
 
     public static void launchExperiment(Airavata.Client client, String expId)
             throws TException {
@@ -979,12 +985,12 @@ public class CreateLaunchExperiment {
         return null;
     }
 
-    public static void getExperiment (Airavata.Client client, String expId) throws Exception{
-        try{
+    public static void getExperiment(Airavata.Client client, String expId) throws Exception {
+        try {
             Experiment experiment = client.getExperiment(expId);
             List<ErrorDetails> errors = experiment.getErrors();
-            if (errors != null && !errors.isEmpty()){
-                for (ErrorDetails error : errors){
+            if (errors != null && !errors.isEmpty()) {
+                for (ErrorDetails error : errors) {
                     System.out.println("ERROR MESSAGE : " + error.getActualErrorMessage());
                 }
             }


[2/2] git commit: Adding threadpool for running output handlers- AIRAVATA-1450

Posted by la...@apache.org.
Adding threadpool for running output handlers- AIRAVATA-1450


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5996b5cc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5996b5cc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5996b5cc

Branch: refs/heads/master
Commit: 5996b5cc6619ecc7f7d1b35455f5be7d56a4bed8
Parents: e5c4d33
Author: lahiru <la...@apache.org>
Authored: Mon Sep 22 10:54:03 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Sep 22 10:54:03 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  9 +++
 .../core/monitor/GfacInternalStatusUpdator.java |  4 ++
 .../gfac/core/utils/OutHandlerWorker.java       | 60 ++++++++++++++++++++
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 25 ++------
 4 files changed, 78 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fe498ab..beaa124 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -26,6 +26,8 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPathExpressionException;
@@ -130,6 +132,8 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancelled = false;
 
+    private static ExecutorService cachedThreadPool;
+
     /**
      * Constructor for GFac
      *
@@ -143,6 +147,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 //        this.airavataRegistry2 = airavataRegistry2;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
         this.zk = zooKeeper;
+       this.cachedThreadPool = Executors.newCachedThreadPool();
     }
 
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
@@ -1176,4 +1181,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             this.cancelled = true;
         }
     }
+
+    public static ExecutorService getCachedThreadPool(){
+        return cachedThreadPool;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index a1856e6..b7479d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -82,9 +82,13 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
         }
         switch (statusChangeRequest.getState()) {
             case COMPLETED:
+                logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+                logger.info("Zookeeper experiment Path: " + experimentPath);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
+                logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+                logger.info("Zookeeper experiment Path: " + experimentPath);
                 ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
new file mode 100644
index 0000000..64c7899
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutHandlerWorker implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
+
+    private GFac gfac;
+
+    private MonitorID monitorID;
+
+    private MonitorPublisher monitorPublisher;
+
+    public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+        this.gfac = gfac;
+        this.monitorID = monitorID;
+        this.monitorPublisher = monitorPublisher;
+    }
+
+    @Override
+    public void run() {
+        try {
+            gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
+        } catch (GFacException e) {
+            monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+                    monitorID.getTaskID()), TaskState.FAILED));
+            //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+            logger.info(e.getLocalizedMessage(), e);
+        }
+        monitorPublisher.publish(monitorID.getStatus());
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2275fb2..58b48ef 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -25,11 +25,13 @@ import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.TaskIdentity;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -47,12 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -212,6 +209,7 @@ public class HPCPullMonitor extends PullMonitor {
                             // lead to a memory leak
                             iterator.remove();
                         }
+                        iterator = completedJobsFromPush.listIterator();
                     }
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
                     for (MonitorID iMonitorID : monitorID) {
@@ -232,20 +230,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 completedJobs.add(iMonitorID);
                                 // we run all the finished jobs in separate threads, because each job doesn't have to wait until
                                 // each one finish transfering files
-                                    final MonitorID tMonitorID = iMonitorID;
-                                    (new Thread() {
-                                        @Override
-                                        public void run() {
-                                            try {
-                                                gfac.invokeOutFlowHandlers(tMonitorID.getJobExecutionContext());
-                                            } catch (GFacException e) {
-                                                publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(tMonitorID.getExperimentID(), tMonitorID.getWorkflowNodeID(),
-                                                        tMonitorID.getTaskID()), TaskState.FAILED));
-                                                //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
-                                                logger.info(e.getLocalizedMessage(), e);
-                                            }
-                                        }
-                                    }).start();
+                                BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
                             } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                                 logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
                                         " 3 times, so skip this Job from Monitor");