You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/22 16:54:49 UTC
[2/2] git commit: Adding threadpool for running output handlers-
AIRAVATA-1450
Adding threadpool for running output handlers- AIRAVATA-1450
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5996b5cc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5996b5cc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5996b5cc
Branch: refs/heads/master
Commit: 5996b5cc6619ecc7f7d1b35455f5be7d56a4bed8
Parents: e5c4d33
Author: lahiru <la...@apache.org>
Authored: Mon Sep 22 10:54:03 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Sep 22 10:54:03 2014 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 9 +++
.../core/monitor/GfacInternalStatusUpdator.java | 4 ++
.../gfac/core/utils/OutHandlerWorker.java | 60 ++++++++++++++++++++
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 25 ++------
4 files changed, 78 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fe498ab..beaa124 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -26,6 +26,8 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
@@ -130,6 +132,8 @@ public class BetterGfacImpl implements GFac,Watcher {
private boolean cancelled = false;
+ private static ExecutorService cachedThreadPool;
+
/**
* Constructor for GFac
*
@@ -143,6 +147,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// this.airavataRegistry2 = airavataRegistry2;
monitorPublisher = publisher; // This is a EventBus common for gfac
this.zk = zooKeeper;
+ this.cachedThreadPool = Executors.newCachedThreadPool();
}
public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
@@ -1176,4 +1181,8 @@ public class BetterGfacImpl implements GFac,Watcher {
this.cancelled = true;
}
}
+
+ public static ExecutorService getCachedThreadPool(){
+ return cachedThreadPool;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index a1856e6..b7479d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -82,9 +82,13 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
}
switch (statusChangeRequest.getState()) {
case COMPLETED:
+ logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
ZKUtil.deleteRecursive(zk, experimentPath);
break;
case FAILED:
+ logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
ZKUtil.deleteRecursive(zk,experimentPath);
break;
default:
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
new file mode 100644
index 0000000..64c7899
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutHandlerWorker implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
+
+ private GFac gfac;
+
+ private MonitorID monitorID;
+
+ private MonitorPublisher monitorPublisher;
+
+ public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+ this.gfac = gfac;
+ this.monitorID = monitorID;
+ this.monitorPublisher = monitorPublisher;
+ }
+
+ @Override
+ public void run() {
+ try {
+ gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
+ } catch (GFacException e) {
+ monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID()), TaskState.FAILED));
+ //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+ logger.info(e.getLocalizedMessage(), e);
+ }
+ monitorPublisher.publish(monitorID.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2275fb2..58b48ef 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -25,11 +25,13 @@ import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -47,12 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -212,6 +209,7 @@ public class HPCPullMonitor extends PullMonitor {
// lead to a memory leak
iterator.remove();
}
+ iterator = completedJobsFromPush.listIterator();
}
Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
for (MonitorID iMonitorID : monitorID) {
@@ -232,20 +230,7 @@ public class HPCPullMonitor extends PullMonitor {
completedJobs.add(iMonitorID);
// we run all the finished jobs in separate threads, because each job doesn't have to wait until
// each one finish transfering files
- final MonitorID tMonitorID = iMonitorID;
- (new Thread() {
- @Override
- public void run() {
- try {
- gfac.invokeOutFlowHandlers(tMonitorID.getJobExecutionContext());
- } catch (GFacException e) {
- publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(tMonitorID.getExperimentID(), tMonitorID.getWorkflowNodeID(),
- tMonitorID.getTaskID()), TaskState.FAILED));
- //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
- logger.info(e.getLocalizedMessage(), e);
- }
- }
- }).start();
+ BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
} else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
" 3 times, so skip this Job from Monitor");