You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/22 16:54:49 UTC

[2/2] git commit: Adding threadpool for running output handlers- AIRAVATA-1450

Adding threadpool for running output handlers- AIRAVATA-1450


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5996b5cc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5996b5cc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5996b5cc

Branch: refs/heads/master
Commit: 5996b5cc6619ecc7f7d1b35455f5be7d56a4bed8
Parents: e5c4d33
Author: lahiru <la...@apache.org>
Authored: Mon Sep 22 10:54:03 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Sep 22 10:54:03 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  9 +++
 .../core/monitor/GfacInternalStatusUpdator.java |  4 ++
 .../gfac/core/utils/OutHandlerWorker.java       | 60 ++++++++++++++++++++
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 25 ++------
 4 files changed, 78 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fe498ab..beaa124 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -26,6 +26,8 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.xpath.XPathExpressionException;
@@ -130,6 +132,8 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancelled = false;
 
+    private static ExecutorService cachedThreadPool;
+
     /**
      * Constructor for GFac
      *
@@ -143,6 +147,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 //        this.airavataRegistry2 = airavataRegistry2;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
         this.zk = zooKeeper;
+       this.cachedThreadPool = Executors.newCachedThreadPool();
     }
 
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
@@ -1176,4 +1181,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             this.cancelled = true;
         }
     }
+
+    public static ExecutorService getCachedThreadPool(){
+        return cachedThreadPool;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index a1856e6..b7479d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -82,9 +82,13 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
         }
         switch (statusChangeRequest.getState()) {
             case COMPLETED:
+                logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+                logger.info("Zookeeper experiment Path: " + experimentPath);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
+                logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+                logger.info("Zookeeper experiment Path: " + experimentPath);
                 ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
new file mode 100644
index 0000000..64c7899
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutHandlerWorker implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
+
+    private GFac gfac;
+
+    private MonitorID monitorID;
+
+    private MonitorPublisher monitorPublisher;
+
+    public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+        this.gfac = gfac;
+        this.monitorID = monitorID;
+        this.monitorPublisher = monitorPublisher;
+    }
+
+    @Override
+    public void run() {
+        try {
+            gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
+        } catch (GFacException e) {
+            monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+                    monitorID.getTaskID()), TaskState.FAILED));
+            //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+            logger.info(e.getLocalizedMessage(), e);
+        }
+        monitorPublisher.publish(monitorID.getStatus());
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2275fb2..58b48ef 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -25,11 +25,13 @@ import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.TaskIdentity;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -47,12 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -212,6 +209,7 @@ public class HPCPullMonitor extends PullMonitor {
                             // lead to a memory leak
                             iterator.remove();
                         }
+                        iterator = completedJobsFromPush.listIterator();
                     }
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
                     for (MonitorID iMonitorID : monitorID) {
@@ -232,20 +230,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 completedJobs.add(iMonitorID);
                                 // we run all the finished jobs in separate threads, because each job doesn't have to wait until
                                 // each one finish transfering files
-                                    final MonitorID tMonitorID = iMonitorID;
-                                    (new Thread() {
-                                        @Override
-                                        public void run() {
-                                            try {
-                                                gfac.invokeOutFlowHandlers(tMonitorID.getJobExecutionContext());
-                                            } catch (GFacException e) {
-                                                publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(tMonitorID.getExperimentID(), tMonitorID.getWorkflowNodeID(),
-                                                        tMonitorID.getTaskID()), TaskState.FAILED));
-                                                //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
-                                                logger.info(e.getLocalizedMessage(), e);
-                                            }
-                                        }
-                                    }).start();
+                                BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
                             } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                                 logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
                                         " 3 times, so skip this Job from Monitor");