You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/22 16:54:48 UTC
[1/2] git commit: reverting CreateLaunchExperiment
Repository: airavata
Updated Branches:
refs/heads/master 450d249d6 -> 5996b5cc6
reverting CreateLaunchExperiment
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e5c4d33d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e5c4d33d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e5c4d33d
Branch: refs/heads/master
Commit: e5c4d33d140463549187f16543676c923a9102f8
Parents: 450d249
Author: lahiru <la...@apache.org>
Authored: Fri Sep 19 12:46:47 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Fri Sep 19 12:46:47 2014 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 258 ++++++++++---------
1 file changed, 132 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e5c4d33d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 4dcfe7f..b0f7582 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -34,14 +34,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
public class CreateLaunchExperiment {
//FIXME: Read from a config file
- public static final String THRIFT_SERVER_HOST = "localhost";
- public static final int THRIFT_SERVER_PORT = 8930;
+ public static final String THRIFT_SERVER_HOST = "149.165.228.109";
+ public static final int THRIFT_SERVER_PORT = 9930;
private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
private static final String DEFAULT_USER = "default.registry.user";
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
@@ -50,7 +51,7 @@ public class CreateLaunchExperiment {
private static String wrfAppId = "WRF_5f097c9c-7066-49ec-aed7-4e39607b3adc";
private static String amberAppId = "Amber_89906be6-5678-49a6-9d04-a0604fbdef2e";
- private static String localHost = "localhost";
+ private static String localHost = "149.165.228.109";
private static String trestlesHostName = "trestles.sdsc.xsede.org";
private static String stampedeHostName = "stampede.tacc.xsede.org";
private static String br2HostName = "bigred2.uits.iu.edu";
@@ -60,21 +61,26 @@ public class CreateLaunchExperiment {
airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavataClient.getAPIVersion());
// registerApplications();
-//// final String expId = createExperimentForSSHHost(airavata);
-// final String expId = createEchoExperimentForTrestles(airavataClient);
-// final String expId = createEchoExperimentForStampede(airavataClient);
-// final String expId = createExperimentEchoForLocalHost(airavataClient);
-// final String expId = createExperimentWRFTrestles(airavataClient);
- final String expId = createExperimentForBR2(airavataClient);
-// final String expId = createExperimentForBR2Amber(airavataClient);
-// final String expId = createExperimentWRFStampede(airavataClient);
-// final String expId = createExperimentForStampedeAmber(airavataClient);
-// final String expId = createExperimentForTrestlesAmber(airavataClient);
-
-// System.out.println("Experiment ID : " + expId);
-// updateExperiment(airavata, expId);
- launchExperiment(airavataClient, expId);
- System.out.println(expId);
+ Date date = new Date();
+ long time = date.getTime();
+ int numberOfRequests = 1;
+ for (int i = 0; i < numberOfRequests; i++) {
+// (new Thread() {
+// @Override
+// public void run() {
+ try {
+ launchExperiment(airavataClient, createExperimentForBR2(airavataClient));
+ } catch (Exception e) {
+ logger.error("Error while connecting with server", e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ long time1 = (date.getTime()-time);
+ System.out.println("Number of requests: " + numberOfRequests + " time taken Miliseconds: " + time1);
+
+// }).start();
+// }
+
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
e.printStackTrace();
@@ -124,10 +130,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(trestlesHostName)){
+ if (resourceName.equals(trestlesHostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "sds128");
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -198,10 +204,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(wrfAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(stampedeHostName)){
+ if (resourceName.equals(stampedeHostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 2, 32, 1, "development", 90, 0, 1, "TG-STA110014S");
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -272,10 +278,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(wrfAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(trestlesHostName)){
+ if (resourceName.equals(trestlesHostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "sds128");
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -348,10 +354,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(localHost)){
+ if (resourceName.equals(localHost)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "");
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -448,10 +454,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(stampedeHostName)){
+ if (resourceName.equals(stampedeHostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, "TG-STA110014S");
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -572,11 +578,11 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(echoAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(br2HostName)){
- ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, null);
+ if (resourceName.equals(br2HostName)) {
+ ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 1, 1, 1, "normal", 1, 0, 1, null);
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
userConfigurationData.setOverrideManualScheduledParams(false);
@@ -655,10 +661,10 @@ public class CreateLaunchExperiment {
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(br2HostName)){
+ if (resourceName.equals(br2HostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0, 1, null);
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -737,10 +743,10 @@ public class CreateLaunchExperiment {
simpleExperiment.setExperimentOutputs(exOut);
Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
String resourceName = computeResources.get(id);
- if (resourceName.equals(stampedeHostName)){
+ if (resourceName.equals(stampedeHostName)) {
ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "development", 20, 0, 1, null);
UserConfigurationData userConfigurationData = new UserConfigurationData();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -768,86 +774,86 @@ public class CreateLaunchExperiment {
}
public static String createExperimentForTrestlesAmber(Airavata.Client client) throws TException {
- try {
- List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
- DataObjectType input = new DataObjectType();
- input.setKey("Heat_Restart_File");
- input.setType(DataType.URI);
- input.setValue("/Users/lahirugunathilake/Downloads/02_Heat.rst");
- exInputs.add(input);
-
- DataObjectType input1 = new DataObjectType();
- input1.setKey("Production_Control_File");
- input1.setType(DataType.URI);
- input1.setValue("/Users/lahirugunathilake/Downloads/03_Prod.in");
- exInputs.add(input1);
-
- DataObjectType input2 = new DataObjectType();
- input2.setKey("Production_Control_File");
- input2.setType(DataType.URI);
- input2.setValue("/Users/lahirugunathilake/Downloads/prmtop");
- exInputs.add(input2);
-
- List<DataObjectType> exOut = new ArrayList<DataObjectType>();
- DataObjectType output = new DataObjectType();
- output.setKey("AMBER_Execution_Summary");
- output.setType(DataType.URI);
- output.setValue("");
- exOut.add(output);
-
- DataObjectType output1 = new DataObjectType();
- output1.setKey("AMBER_Execution_log");
- output1.setType(DataType.URI);
- output1.setValue("");
- exOut.add(output1);
- DataObjectType output2 = new DataObjectType();
- output2.setKey("AMBER_Trajectory_file");
- output2.setType(DataType.URI);
- output2.setValue("");
- exOut.add(output2);
- DataObjectType output3 = new DataObjectType();
- output3.setKey("AMBER_Restart_file");
- output3.setType(DataType.URI);
- output3.setValue("");
- exOut.add(output3);
-
- Project project = ProjectModelUtil.createProject("default", "admin", "test project");
- String projectId = client.createProject(project);
-
- Experiment simpleExperiment =
- ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "sshEchoExperiment", "SimpleEchoBR", amberAppId, exInputs);
- simpleExperiment.setExperimentOutputs(exOut);
-
- Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
- if (computeResources != null && computeResources.size() != 0){
- for (String id : computeResources.keySet()){
- String resourceName = computeResources.get(id);
- if (resourceName.equals(trestlesHostName)){
- ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null);
- UserConfigurationData userConfigurationData = new UserConfigurationData();
- userConfigurationData.setAiravataAutoSchedule(false);
- userConfigurationData.setOverrideManualScheduledParams(false);
- userConfigurationData.setComputationalResourceScheduling(scheduling);
- simpleExperiment.setUserConfigurationData(userConfigurationData);
- return client.createExperiment(simpleExperiment);
- }
- }
- }
- } catch (AiravataSystemException e) {
- logger.error("Error occured while creating the experiment...", e.getMessage());
- throw new AiravataSystemException(e);
- } catch (InvalidRequestException e) {
- logger.error("Error occured while creating the experiment...", e.getMessage());
- throw new InvalidRequestException(e);
- } catch (AiravataClientException e) {
- logger.error("Error occured while creating the experiment...", e.getMessage());
- throw new AiravataClientException(e);
- } catch (TException e) {
- logger.error("Error occured while creating the experiment...", e.getMessage());
- throw new TException(e);
- }
+ try {
+ List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
+ DataObjectType input = new DataObjectType();
+ input.setKey("Heat_Restart_File");
+ input.setType(DataType.URI);
+ input.setValue("/Users/lahirugunathilake/Downloads/02_Heat.rst");
+ exInputs.add(input);
+
+ DataObjectType input1 = new DataObjectType();
+ input1.setKey("Production_Control_File");
+ input1.setType(DataType.URI);
+ input1.setValue("/Users/lahirugunathilake/Downloads/03_Prod.in");
+ exInputs.add(input1);
+
+ DataObjectType input2 = new DataObjectType();
+ input2.setKey("Production_Control_File");
+ input2.setType(DataType.URI);
+ input2.setValue("/Users/lahirugunathilake/Downloads/prmtop");
+ exInputs.add(input2);
+
+ List<DataObjectType> exOut = new ArrayList<DataObjectType>();
+ DataObjectType output = new DataObjectType();
+ output.setKey("AMBER_Execution_Summary");
+ output.setType(DataType.URI);
+ output.setValue("");
+ exOut.add(output);
+
+ DataObjectType output1 = new DataObjectType();
+ output1.setKey("AMBER_Execution_log");
+ output1.setType(DataType.URI);
+ output1.setValue("");
+ exOut.add(output1);
+ DataObjectType output2 = new DataObjectType();
+ output2.setKey("AMBER_Trajectory_file");
+ output2.setType(DataType.URI);
+ output2.setValue("");
+ exOut.add(output2);
+ DataObjectType output3 = new DataObjectType();
+ output3.setKey("AMBER_Restart_file");
+ output3.setType(DataType.URI);
+ output3.setValue("");
+ exOut.add(output3);
+
+ Project project = ProjectModelUtil.createProject("default", "admin", "test project");
+ String projectId = client.createProject(project);
+
+ Experiment simpleExperiment =
+ ExperimentModelUtil.createSimpleExperiment(projectId, "admin", "sshEchoExperiment", "SimpleEchoBR", amberAppId, exInputs);
+ simpleExperiment.setExperimentOutputs(exOut);
+
+ Map<String, String> computeResources = airavataClient.getAvailableAppInterfaceComputeResources(amberAppId);
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
+ String resourceName = computeResources.get(id);
+ if (resourceName.equals(trestlesHostName)) {
+ ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0, 1, null);
+ UserConfigurationData userConfigurationData = new UserConfigurationData();
+ userConfigurationData.setAiravataAutoSchedule(false);
+ userConfigurationData.setOverrideManualScheduledParams(false);
+ userConfigurationData.setComputationalResourceScheduling(scheduling);
+ simpleExperiment.setUserConfigurationData(userConfigurationData);
+ return client.createExperiment(simpleExperiment);
+ }
+ }
+ }
+ } catch (AiravataSystemException e) {
+ logger.error("Error occured while creating the experiment...", e.getMessage());
+ throw new AiravataSystemException(e);
+ } catch (InvalidRequestException e) {
+ logger.error("Error occured while creating the experiment...", e.getMessage());
+ throw new InvalidRequestException(e);
+ } catch (AiravataClientException e) {
+ logger.error("Error occured while creating the experiment...", e.getMessage());
+ throw new AiravataClientException(e);
+ } catch (TException e) {
+ logger.error("Error occured while creating the experiment...", e.getMessage());
+ throw new TException(e);
+ }
return null;
- }
+ }
public static void launchExperiment(Airavata.Client client, String expId)
throws TException {
@@ -979,12 +985,12 @@ public class CreateLaunchExperiment {
return null;
}
- public static void getExperiment (Airavata.Client client, String expId) throws Exception{
- try{
+ public static void getExperiment(Airavata.Client client, String expId) throws Exception {
+ try {
Experiment experiment = client.getExperiment(expId);
List<ErrorDetails> errors = experiment.getErrors();
- if (errors != null && !errors.isEmpty()){
- for (ErrorDetails error : errors){
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorDetails error : errors) {
System.out.println("ERROR MESSAGE : " + error.getActualErrorMessage());
}
}
[2/2] git commit: Adding threadpool for running output handlers-
AIRAVATA-1450
Posted by la...@apache.org.
Adding threadpool for running output handlers- AIRAVATA-1450
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5996b5cc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5996b5cc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5996b5cc
Branch: refs/heads/master
Commit: 5996b5cc6619ecc7f7d1b35455f5be7d56a4bed8
Parents: e5c4d33
Author: lahiru <la...@apache.org>
Authored: Mon Sep 22 10:54:03 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Sep 22 10:54:03 2014 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 9 +++
.../core/monitor/GfacInternalStatusUpdator.java | 4 ++
.../gfac/core/utils/OutHandlerWorker.java | 60 ++++++++++++++++++++
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 25 ++------
4 files changed, 78 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fe498ab..beaa124 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -26,6 +26,8 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
@@ -130,6 +132,8 @@ public class BetterGfacImpl implements GFac,Watcher {
private boolean cancelled = false;
+ private static ExecutorService cachedThreadPool;
+
/**
* Constructor for GFac
*
@@ -143,6 +147,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// this.airavataRegistry2 = airavataRegistry2;
monitorPublisher = publisher; // This is a EventBus common for gfac
this.zk = zooKeeper;
+ this.cachedThreadPool = Executors.newCachedThreadPool();
}
public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
@@ -1176,4 +1181,8 @@ public class BetterGfacImpl implements GFac,Watcher {
this.cancelled = true;
}
}
+
+ public static ExecutorService getCachedThreadPool(){
+ return cachedThreadPool;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index a1856e6..b7479d0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -82,9 +82,13 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
}
switch (statusChangeRequest.getState()) {
case COMPLETED:
+ logger.info("Experiment Completed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
ZKUtil.deleteRecursive(zk, experimentPath);
break;
case FAILED:
+ logger.info("Experiment Failed, So removing the ZK entry for the experiment" + monitorID.getExperimentID());
+ logger.info("Zookeeper experiment Path: " + experimentPath);
ZKUtil.deleteRecursive(zk,experimentPath);
break;
default:
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
new file mode 100644
index 0000000..64c7899
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutHandlerWorker implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
+
+ private GFac gfac;
+
+ private MonitorID monitorID;
+
+ private MonitorPublisher monitorPublisher;
+
+ public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+ this.gfac = gfac;
+ this.monitorID = monitorID;
+ this.monitorPublisher = monitorPublisher;
+ }
+
+ @Override
+ public void run() {
+ try {
+ gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
+ } catch (GFacException e) {
+ monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID()), TaskState.FAILED));
+ //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+ logger.info(e.getLocalizedMessage(), e);
+ }
+ monitorPublisher.publish(monitorID.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5996b5cc/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2275fb2..58b48ef 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -25,11 +25,13 @@ import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -47,12 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -212,6 +209,7 @@ public class HPCPullMonitor extends PullMonitor {
// lead to a memory leak
iterator.remove();
}
+ iterator = completedJobsFromPush.listIterator();
}
Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
for (MonitorID iMonitorID : monitorID) {
@@ -232,20 +230,7 @@ public class HPCPullMonitor extends PullMonitor {
completedJobs.add(iMonitorID);
// we run all the finished jobs in separate threads, because each job doesn't have to wait until
// each one finish transfering files
- final MonitorID tMonitorID = iMonitorID;
- (new Thread() {
- @Override
- public void run() {
- try {
- gfac.invokeOutFlowHandlers(tMonitorID.getJobExecutionContext());
- } catch (GFacException e) {
- publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(tMonitorID.getExperimentID(), tMonitorID.getWorkflowNodeID(),
- tMonitorID.getTaskID()), TaskState.FAILED));
- //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
- logger.info(e.getLocalizedMessage(), e);
- }
- }
- }).start();
+ BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
} else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
" 3 times, so skip this Job from Monitor");