You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/01/10 21:53:53 UTC

svn commit: r1557251 - in /airavata/trunk/modules/orchestrator/orchestrator-core/src: main/java/org/apache/airavata/orchestrator/core/ main/java/org/apache/airavata/orchestrator/core/utils/ test/java/org/apache/airavata/orchestrator/core/

Author: lahiru
Date: Fri Jan 10 20:53:53 2014
New Revision: 1557251

URL: http://svn.apache.org/r1557251
Log:
adding new worker to handle hanged jobs.

Added:
    airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
    airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java
      - copied, changed from r1556863, airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
Removed:
    airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
Modified:
    airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
    airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
    airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java

Added: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java?rev=1557251&view=auto
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java (added)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java Fri Jan 10 20:53:53 2014
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core;
+
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.registry.api.exception.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HangedJobWorker implements Runnable{
+     private final static Logger logger = LoggerFactory.getLogger(HangedJobWorker.class);
+
+    private OrchestratorContext orchestratorContext;
+
+    private JobSubmitter jobSubmitter;
+
+    // Set the default submit interval value
+    private int submitInterval = 1000;
+
+
+    public HangedJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
+        this.orchestratorContext = orchestratorContext;
+        try {
+            String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterClass();
+            submitInterval = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterInterval();
+            Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
+            jobSubmitter = aClass.newInstance();
+        } catch (ClassNotFoundException e) {
+            logger.error("Error while loading Job Submitter");
+        } catch (InstantiationException e) {
+            logger.error("Error while loading Job Submitter");
+            throw new OrchestratorException(e);
+        } catch (IllegalAccessException e) {
+            logger.error("Error while loading Job Submitter");
+            throw new OrchestratorException(e);
+        }
+
+    }
+
+    public void run() {
+        /* implement logic to submit job batches time to time */
+        int idleCount = 0;
+        while (true) {
+            try {
+                Thread.sleep(submitInterval);
+            } catch (InterruptedException e) {
+                logger.error("Error in JobSubmitter during sleeping process before submit jobs");
+                e.printStackTrace();
+            }
+            /* Here the worker pick bunch of jobs available to submit and submit that to a single
+              GFAC instance, we do not handle job by job submission to each gfac instance
+            */
+            GFACInstance gfacInstance = jobSubmitter.selectGFACInstance(orchestratorContext);
+
+            // Now we have picked a gfac instance to submit set of jobs at this time, now its time to
+            // select what are the jobs available to submit
+
+            try {
+                List<String> allHangedJobs = orchestratorContext.getRegistry().getAllHangedJobs();
+                if (allHangedJobs.size() == 0) {
+                    idleCount++;
+
+                    if (idleCount == 10) {
+                        try {
+                            Thread.sleep(submitInterval*2);
+                        } catch (InterruptedException e) {
+                            logger.error("Error in JobSubmitter during sleeping process before submit jobs");
+                            e.printStackTrace();
+                        }
+                        idleCount=0;
+                    }
+                    continue;
+                }
+
+                jobSubmitter.submitJob(gfacInstance,allHangedJobs);
+
+                /* After submitting available jobs try to schedule again and then submit*/
+                jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
+            } catch (RegistryException e) {
+                logger.error("Error while trying to retrieve available ");
+            }
+        }
+    }
+
+    public OrchestratorContext getOrchestratorContext() {
+        return orchestratorContext;
+    }
+
+    public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
+        this.orchestratorContext = orchestratorContext;
+    }
+}

Copied: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java (from r1556863, airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java)
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java?p2=airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java&p1=airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java&r1=1556863&r2=1557251&rev=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java Fri Jan 10 20:53:53 2014
@@ -24,18 +24,14 @@ import org.apache.airavata.orchestrator.
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
 import org.apache.airavata.registry.api.exception.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URL;
 import java.util.List;
-import java.util.Properties;
 
-public class JobSubmitterWorker implements Runnable {
-    private final static Logger logger = LoggerFactory.getLogger(JobSubmitterWorker.class);
+public class NewJobWorker implements Runnable {
+    private final static Logger logger = LoggerFactory.getLogger(NewJobWorker.class);
 
     private OrchestratorContext orchestratorContext;
 
@@ -45,7 +41,7 @@ public class JobSubmitterWorker implemen
     private int submitInterval = 1000;
 
 
-    public JobSubmitterWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
+    public NewJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
         this.orchestratorContext = orchestratorContext;
         try {
             String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterClass();
@@ -84,7 +80,6 @@ public class JobSubmitterWorker implemen
 
             try {
                 List<String> allAcceptedJobs = orchestratorContext.getRegistry().getAllAcceptedJobs();
-                List<String> allHangedJobs = orchestratorContext.getRegistry().getAllHangedJobs();
                 if (allAcceptedJobs.size() == 0) {
                     idleCount++;
 
@@ -100,9 +95,6 @@ public class JobSubmitterWorker implemen
                     continue;
                 }
                 jobSubmitter.submitJob(gfacInstance,allAcceptedJobs);
-
-                /* After submitting available jobs try to schedule again and then submit*/
-                jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
             } catch (RegistryException e) {
                 logger.error("Error while trying to retrieve available ");
             }

Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java Fri Jan 10 20:53:53 2014
@@ -79,7 +79,9 @@ public class PullBasedOrchestrator imple
             orchestratorConfiguration.setAiravataAPI(getAiravataAPI());
             /* Starting submitter thread pool */
 
-            executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize());
+            // we have a thread to run normal new jobs except to monitor hanged jobs
+            executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize() + 1);
+            this.startJobSubmitter();
         } catch (RegistryException e) {
             logger.error("Failed to initializing Orchestrator");
             OrchestratorException orchestratorException = new OrchestratorException(e);
@@ -105,6 +107,7 @@ public class PullBasedOrchestrator imple
     //todo decide whether to return an error or do what
 
     public String createExperiment(ExperimentRequest request) throws OrchestratorException {
+        //todo use a consistent method to create the experiment ID
         String experimentID = UUID.randomUUID().toString();
         String username = request.getUserName();
         try {
@@ -125,9 +128,11 @@ public class PullBasedOrchestrator imple
             return false;
         }
         String experimentID = request.getExperimentID();
+        //todo use a more concrete user type in to this
         String username = request.getUserName();
         try {
             airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
+            //todo save jobRequest data in to the database
         } catch (RegistryException e) {
             //todo put more meaningful error message
             logger.error("Failed to create experiment for the request from " + request.getUserName());
@@ -137,23 +142,27 @@ public class PullBasedOrchestrator imple
     }
 
     public void startJobSubmitter() throws OrchestratorException {
-        for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize(); i++) {
-            JobSubmitterWorker jobSubmitterWorker = new JobSubmitterWorker(orchestratorContext);
-            executor.execute(jobSubmitterWorker);
-        }
-    }
-    private AiravataAPI getAiravataAPI(){
-        if (airavataAPI==null) {
-			try {
-				String systemUserName = ServerSettings.getSystemUser();
-				String gateway = ServerSettings.getSystemUserGateway();
-				airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName);
-			} catch (ApplicationSettingsException e) {
-				logger.error("Unable to read the properties file", e);
-			} catch (AiravataAPIInvocationException e) {
-				logger.error("Unable to create Airavata API", e);
-			}
-		}
-		return airavataAPI;
+        NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext);
+        executor.execute(jobSubmitterWorker);
+
+        for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize()-1; i++) {
+            HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext);
+            executor.execute(hangedJobWorker);
+        }
+    }
+
+    private AiravataAPI getAiravataAPI() {
+        if (airavataAPI == null) {
+            try {
+                String systemUserName = ServerSettings.getSystemUser();
+                String gateway = ServerSettings.getSystemUserGateway();
+                airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName);
+            } catch (ApplicationSettingsException e) {
+                logger.error("Unable to read the properties file", e);
+            } catch (AiravataAPIInvocationException e) {
+                logger.error("Unable to create Airavata API", e);
+            }
+        }
+        return airavataAPI;
     }
 }

Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java Fri Jan 10 20:53:53 2014
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.orchestrator.core.utils;
 
-import org.apache.airavata.orchestrator.core.JobSubmitterWorker;
+import org.apache.airavata.orchestrator.core.NewJobWorker;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.registry.api.JobRequest;
@@ -39,7 +39,7 @@ public class OrchestratorUtils {
 
     public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException {
         URL resource =
-                JobSubmitterWorker.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
+                NewJobWorker.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
         if (resource == null) {
             String error = "orchestrator.properties cannot be found, Failed to initialize Orchestrator";
             logger.error(error);

Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java Fri Jan 10 20:53:53 2014
@@ -43,10 +43,6 @@ public class PullBasedOrchestratorTest {
 
     @BeforeTest
     public void setUp() throws Exception {
-//        System.setProperty("myproxy.user", "ogce");
-//        System.setProperty("myproxy.password", "");
-//        System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
-//        System.setProperty("gsi.working.directory","/home/ogce");
         orchestrator = new PullBasedOrchestrator();
         orchestrator.initialize();
     }