You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/14 06:20:35 UTC
airavata git commit: fixing issue with threadpool
Repository: airavata
Updated Branches:
refs/heads/master a773a2858 -> 7f89109fe
fixing issue with threadpool
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7f89109f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7f89109f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7f89109f
Branch: refs/heads/master
Commit: 7f89109fe22680f14901139ce4056458b2665ff2
Parents: a773a28
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Apr 14 00:20:33 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Apr 14 00:20:33 2015 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 38 +++++++++-----
.../airavata/gfac/server/GfacServerHandler.java | 55 ++++++++++++++++----
.../gfac/core/utils/InputHandlerWorker.java | 13 +++--
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 10 ++--
4 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 9ad71f4..be07975 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -58,10 +58,10 @@ public class CreateLaunchExperiment {
private static final String DEFAULT_GATEWAY = "php_reference_gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60";
+ private static String echoAppId = "Echo_4efc544e-b72b-45d4-9f2d-ea1fae602d02";
private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
- private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c";
+ private static String amberAppId = "Amber_1e7c89b0-4e03-4f36-8bdd-86c8bb9df8a1";
private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -168,12 +168,12 @@ public class CreateLaunchExperiment {
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
// final String expId = createEchoExperimentForTrestles(airavataClient);
-// final String expId = createExperimentEchoForLocalHost(airavataClient);
+ final String expId = createExperimentEchoForLocalHost(airavataClient);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
// final String expId = createExperimentWRFStampede(airavataClient);
- final String expId = createExperimentForStampedeAmber(airavataClient);
+// final String expId = createExperimentForStampedeAmber(airavataClient);
// String expId = createExperimentForTrestlesAmber(airavataClient);
// final String expId = createExperimentGROMACSStampede(airavataClient);
// final String expId = createExperimentESPRESSOStampede(airavataClient);
@@ -191,10 +191,20 @@ public class CreateLaunchExperiment {
launchExperiment(airavataClient, expId);
}
- Thread.sleep(10000);
- for (String exId : experimentIds) {
- Experiment experiment = airavataClient.getExperiment(exId);
- System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name());
+ boolean allNotFinished = true;
+ while(allNotFinished) {
+ allNotFinished = false;
+ for (String exId : experimentIds) {
+ Experiment experiment = airavataClient.getExperiment(exId);
+ if(!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.COMPLETED)&&
+ !experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.FAILED)
+ &&!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.CANCELED)){
+ allNotFinished = true;
+ }
+ System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name());
+ }
+ System.out.println("----------------------------------------------------");
+ Thread.sleep(10000);
}
@@ -1313,11 +1323,11 @@ public class CreateLaunchExperiment {
// }
for (InputDataObjectType inputDataObjectType : exInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop");
}
}
@@ -1378,11 +1388,11 @@ public class CreateLaunchExperiment {
// }
for (InputDataObjectType inputDataObjectType : exInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in");
} else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
- inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
+ inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop");
}
}
List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 7d5e223..f3e1d91 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -25,6 +25,7 @@ import edu.uiuc.ncsa.security.delegation.services.Server;
import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
@@ -68,7 +69,7 @@ import java.util.concurrent.locks.Lock;
public class GfacServerHandler implements GfacService.Iface, Watcher {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
- private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
private static int requestCount=0;
@@ -143,6 +144,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
}
}
+ public static void main(String[] args) {
+ RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
+ try {
+ rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ rabbitMQTaskLaunchConsumer.listen(new TestHandler());
+ } catch (AiravataException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException {
Stat zkStat = zk.exists(gfacServer, false);
if (zkStat == null) {
@@ -244,13 +254,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
// if( gfac.submitJob(experimentId, taskId, gatewayId)){
logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
"{}", experimentId, taskId, gatewayId);
- try {
- GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker).get();
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- } catch (ExecutionException e) {
- logger.error(e.getMessage(), e);
- }
+
+ GFacThreadPoolExecutor.getFixedThreadPool().execute(inputHandlerWorker);
+
// we immediately return when we have a threadpool
return true;
}
@@ -309,6 +315,33 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
}
}
+ private static class TestHandler implements MessageHandler{
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ ArrayList<String> keys = new ArrayList<String>();
+ keys.add(ServerSettings.getLaunchQueueName());
+ keys.add(ServerSettings.getCancelQueueName());
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+ props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext message) {
+ TaskSubmitEvent event = new TaskSubmitEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = new byte[0];
+ try {
+ bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(event.getExperimentId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
private class TaskLaunchMessageHandler implements MessageHandler {
private String experimentNode;
private String nodeName;
@@ -329,6 +362,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
}
public void onMessage(MessageContext message) {
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
if (message.getType().equals(MessageType.LAUNCHTASK)) {
try {
TaskSubmitEvent event = new TaskSubmitEvent();
@@ -339,7 +374,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
try {
GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
- AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId());
+ AiravataZKUtils.getExpStatePath(event.getExperimentId(), event.getTaskId());
submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
} catch (KeeperException e) {
logger.error(nodeName + " was interrupted.");
@@ -351,8 +386,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
logger.error(e.getMessage(), e);
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType());
} catch (TException e) {
logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
index 963db7c..ec7991a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.core.utils;
+import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.slf4j.Logger;
@@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
-public class InputHandlerWorker implements Callable {
+public class InputHandlerWorker implements Runnable {
private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
String experimentId;
@@ -45,9 +46,11 @@ public class InputHandlerWorker implements Callable {
}
@Override
- public Object call() throws Exception {
- boolean b = gfac.submitJob(experimentId, taskId, gatewayId);
- log.info("InHandler and provider Gfac invocation returned: " + b);
- return b;
+ public void run() {
+ try {
+ gfac.submitJob(experimentId, taskId, gatewayId);
+ } catch (GFacException e) {
+ log.error(e.getMessage(), e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index ab934ca..26d3385 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -197,7 +197,7 @@ public class HPCPullMonitor extends PullMonitor {
sendNotification(iMonitorID);
logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
Thread.sleep(10000);
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
break;
}
}
@@ -223,7 +223,9 @@ public class HPCPullMonitor extends PullMonitor {
iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
sendNotification(iMonitorID);
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
+ Thread.sleep(10000);
+ GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
break;
}
}
@@ -248,7 +250,7 @@ public class HPCPullMonitor extends PullMonitor {
removeList.add(iMonitorID);
logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
}
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic
iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
@@ -286,7 +288,7 @@ public class HPCPullMonitor extends PullMonitor {
sendNotification(iMonitorID);
// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
removeList.add(iMonitorID);
- GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
} else {
iMonitorID.setFailedCount(0);
}