You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/14 06:20:35 UTC

airavata git commit: fixing issue with threadpool

Repository: airavata
Updated Branches:
  refs/heads/master a773a2858 -> 7f89109fe


fixing issue with threadpool


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7f89109f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7f89109f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7f89109f

Branch: refs/heads/master
Commit: 7f89109fe22680f14901139ce4056458b2665ff2
Parents: a773a28
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Apr 14 00:20:33 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Apr 14 00:20:33 2015 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 38 +++++++++-----
 .../airavata/gfac/server/GfacServerHandler.java | 55 ++++++++++++++++----
 .../gfac/core/utils/InputHandlerWorker.java     | 13 +++--
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 10 ++--
 4 files changed, 82 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 9ad71f4..be07975 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -58,10 +58,10 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "php_reference_gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60";
+    private static String echoAppId = "Echo_4efc544e-b72b-45d4-9f2d-ea1fae602d02";
     private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
-    private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c";
+    private static String amberAppId = "Amber_1e7c89b0-4e03-4f36-8bdd-86c8bb9df8a1";
     private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
     private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
     private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -168,12 +168,12 @@ public class CreateLaunchExperiment {
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-//                final String expId = createExperimentEchoForLocalHost(airavataClient);
+                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
 //                final String expId = createExperimentWRFStampede(airavataClient);
-                final String expId = createExperimentForStampedeAmber(airavataClient);
+//                final String expId = createExperimentForStampedeAmber(airavataClient);
 //                String expId = createExperimentForTrestlesAmber(airavataClient);
 //                final String expId = createExperimentGROMACSStampede(airavataClient);
 //                final String expId = createExperimentESPRESSOStampede(airavataClient);
@@ -191,10 +191,20 @@ public class CreateLaunchExperiment {
                 launchExperiment(airavataClient, expId);
             }
 
-            Thread.sleep(10000);
-            for (String exId : experimentIds) {
-                Experiment experiment = airavataClient.getExperiment(exId);
-                System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name());
+            boolean allNotFinished = true;
+            while(allNotFinished) {
+                allNotFinished = false;
+                for (String exId : experimentIds) {
+                    Experiment experiment = airavataClient.getExperiment(exId);
+                    if(!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.COMPLETED)&&
+                            !experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.FAILED)
+                            &&!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.CANCELED)){
+                        allNotFinished = true;
+                    }
+                    System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name());
+                }
+                System.out.println("----------------------------------------------------");
+                Thread.sleep(10000);
             }
 
 
@@ -1313,11 +1323,11 @@ public class CreateLaunchExperiment {
 //			}
             for (InputDataObjectType inputDataObjectType : exInputs) {
                 if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop");
                 }
             }
 
@@ -1378,11 +1388,11 @@ public class CreateLaunchExperiment {
 //			}
             for (InputDataObjectType inputDataObjectType : exInputs) {
                 if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in");
                 } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) {
-                    inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop");
+                    inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop");
                 }
             }
             List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId);

http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 7d5e223..f3e1d91 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -25,6 +25,7 @@ import edu.uiuc.ncsa.security.delegation.services.Server;
 import org.airavata.appcatalog.cpi.AppCatalog;
 import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
@@ -68,7 +69,7 @@ import java.util.concurrent.locks.Lock;
 public class GfacServerHandler implements GfacService.Iface, Watcher {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
 
-    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+    private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
 
     private static int requestCount=0;
 
@@ -143,6 +144,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
         }
     }
 
+    public static void main(String[] args) {
+        RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
+        try {
+            rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+            rabbitMQTaskLaunchConsumer.listen(new TestHandler());
+        } catch (AiravataException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
     private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException {
         Stat zkStat = zk.exists(gfacServer, false);
         if (zkStat == null) {
@@ -244,13 +254,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 //            if( gfac.submitJob(experimentId, taskId, gatewayId)){
         logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
                 "{}", experimentId, taskId, gatewayId);
-        try {
-            GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker).get();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-        } catch (ExecutionException e) {
-            logger.error(e.getMessage(), e);
-        }
+
+        GFacThreadPoolExecutor.getFixedThreadPool().execute(inputHandlerWorker);
+
         // we immediately return when we have a threadpool
         return true;
     }
@@ -309,6 +315,33 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
         }
     }
 
+    private static  class TestHandler implements MessageHandler{
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            ArrayList<String> keys = new ArrayList<String>();
+            keys.add(ServerSettings.getLaunchQueueName());
+            keys.add(ServerSettings.getCancelQueueName());
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
+            return props;
+        }
+
+        @Override
+        public void onMessage(MessageContext message) {
+            TaskSubmitEvent event = new TaskSubmitEvent();
+            TBase messageEvent = message.getEvent();
+            byte[] bytes = new byte[0];
+            try {
+                bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
+                System.out.println(event.getExperimentId());
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
     private class TaskLaunchMessageHandler implements MessageHandler {
         private String experimentNode;
         private String nodeName;
@@ -329,6 +362,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
         }
 
         public void onMessage(MessageContext message) {
+            System.out.println(" Message Received with message id '" + message.getMessageId()
+                    + "' and with message type '" + message.getType());
             if (message.getType().equals(MessageType.LAUNCHTASK)) {
                 try {
                     TaskSubmitEvent event = new TaskSubmitEvent();
@@ -339,7 +374,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 
                     try {
                         GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
-                        AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId());
+                        AiravataZKUtils.getExpStatePath(event.getExperimentId(), event.getTaskId());
                         submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
                     } catch (KeeperException e) {
                         logger.error(nodeName + " was interrupted.");
@@ -351,8 +386,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                         logger.error(e.getMessage(), e);
                         rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getType());
                 } catch (TException e) {
                     logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
index 963db7c..ec7991a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.gfac.core.utils;
 
+import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.slf4j.Logger;
@@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
 
-public class InputHandlerWorker implements Callable {
+public class InputHandlerWorker implements Runnable {
     private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
 
     String experimentId;
@@ -45,9 +46,11 @@ public class InputHandlerWorker implements Callable {
     }
 
     @Override
-    public Object call() throws Exception {
-        boolean b = gfac.submitJob(experimentId, taskId, gatewayId);
-        log.info("InHandler and provider Gfac invocation returned: " + b);
-        return b;
+    public void run()  {
+        try {
+            gfac.submitJob(experimentId, taskId, gatewayId);
+        } catch (GFacException e) {
+            log.error(e.getMessage(), e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index ab934ca..26d3385 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -197,7 +197,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 sendNotification(iMonitorID);
                                 logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
                                 Thread.sleep(10000);
-                                GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                                GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
                                 break;
                             }
                         }
@@ -223,7 +223,9 @@ public class HPCPullMonitor extends PullMonitor {
                                             iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
 
                                     sendNotification(iMonitorID);
-                                    GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                                    logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
+                                    Thread.sleep(10000);
+                                    GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
                                     break;
                                 }
                             }
@@ -248,7 +250,7 @@ public class HPCPullMonitor extends PullMonitor {
                             removeList.add(iMonitorID);
                             logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
                                     iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-                            GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                            GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
                         }
                         iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
                         iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
@@ -286,7 +288,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 sendNotification(iMonitorID);
 //                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                                 removeList.add(iMonitorID);
-                                GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+                                GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
                             } else {
                                 iMonitorID.setFailedCount(0);
                             }