You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/01/10 21:53:53 UTC
svn commit: r1557251 - in
/airavata/trunk/modules/orchestrator/orchestrator-core/src:
main/java/org/apache/airavata/orchestrator/core/
main/java/org/apache/airavata/orchestrator/core/utils/
test/java/org/apache/airavata/orchestrator/core/
Author: lahiru
Date: Fri Jan 10 20:53:53 2014
New Revision: 1557251
URL: http://svn.apache.org/r1557251
Log:
adding new worker to handle hanged jobs.
Added:
airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java
- copied, changed from r1556863, airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
Removed:
airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java
Modified:
airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java
Added: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java?rev=1557251&view=auto
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java (added)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java Fri Jan 10 20:53:53 2014
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core;
+
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.registry.api.exception.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HangedJobWorker implements Runnable{
+ private final static Logger logger = LoggerFactory.getLogger(HangedJobWorker.class);
+
+ private OrchestratorContext orchestratorContext;
+
+ private JobSubmitter jobSubmitter;
+
+ // Set the default submit interval value
+ private int submitInterval = 1000;
+
+
+ public HangedJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
+ this.orchestratorContext = orchestratorContext;
+ try {
+ String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterClass();
+ submitInterval = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterInterval();
+ Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
+ jobSubmitter = aClass.newInstance();
+ } catch (ClassNotFoundException e) {
+ logger.error("Error while loading Job Submitter");
+ } catch (InstantiationException e) {
+ logger.error("Error while loading Job Submitter");
+ throw new OrchestratorException(e);
+ } catch (IllegalAccessException e) {
+ logger.error("Error while loading Job Submitter");
+ throw new OrchestratorException(e);
+ }
+
+ }
+
+ public void run() {
+ /* implement logic to submit job batches time to time */
+ int idleCount = 0;
+ while (true) {
+ try {
+ Thread.sleep(submitInterval);
+ } catch (InterruptedException e) {
+ logger.error("Error in JobSubmitter during sleeping process before submit jobs");
+ e.printStackTrace();
+ }
+ /* Here the worker pick bunch of jobs available to submit and submit that to a single
+ GFAC instance, we do not handle job by job submission to each gfac instance
+ */
+ GFACInstance gfacInstance = jobSubmitter.selectGFACInstance(orchestratorContext);
+
+ // Now we have picked a gfac instance to submit set of jobs at this time, now its time to
+ // select what are the jobs available to submit
+
+ try {
+ List<String> allHangedJobs = orchestratorContext.getRegistry().getAllHangedJobs();
+ if (allHangedJobs.size() == 0) {
+ idleCount++;
+
+ if (idleCount == 10) {
+ try {
+ Thread.sleep(submitInterval*2);
+ } catch (InterruptedException e) {
+ logger.error("Error in JobSubmitter during sleeping process before submit jobs");
+ e.printStackTrace();
+ }
+ idleCount=0;
+ }
+ continue;
+ }
+
+ jobSubmitter.submitJob(gfacInstance,allHangedJobs);
+
+ /* After submitting available jobs try to schedule again and then submit*/
+ jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
+ } catch (RegistryException e) {
+ logger.error("Error while trying to retrieve available ");
+ }
+ }
+ }
+
+ public OrchestratorContext getOrchestratorContext() {
+ return orchestratorContext;
+ }
+
+ public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
+ this.orchestratorContext = orchestratorContext;
+ }
+}
Copied: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java (from r1556863, airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java)
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java?p2=airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java&p1=airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java&r1=1556863&r2=1557251&rev=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/JobSubmitterWorker.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java Fri Jan 10 20:53:53 2014
@@ -24,18 +24,14 @@ import org.apache.airavata.orchestrator.
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
import org.apache.airavata.registry.api.exception.RegistryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.URL;
import java.util.List;
-import java.util.Properties;
-public class JobSubmitterWorker implements Runnable {
- private final static Logger logger = LoggerFactory.getLogger(JobSubmitterWorker.class);
+public class NewJobWorker implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(NewJobWorker.class);
private OrchestratorContext orchestratorContext;
@@ -45,7 +41,7 @@ public class JobSubmitterWorker implemen
private int submitInterval = 1000;
- public JobSubmitterWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
+ public NewJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
try {
String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterClass();
@@ -84,7 +80,6 @@ public class JobSubmitterWorker implemen
try {
List<String> allAcceptedJobs = orchestratorContext.getRegistry().getAllAcceptedJobs();
- List<String> allHangedJobs = orchestratorContext.getRegistry().getAllHangedJobs();
if (allAcceptedJobs.size() == 0) {
idleCount++;
@@ -100,9 +95,6 @@ public class JobSubmitterWorker implemen
continue;
}
jobSubmitter.submitJob(gfacInstance,allAcceptedJobs);
-
- /* After submitting available jobs try to schedule again and then submit*/
- jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(orchestratorContext),allHangedJobs);
} catch (RegistryException e) {
logger.error("Error while trying to retrieve available ");
}
Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/PullBasedOrchestrator.java Fri Jan 10 20:53:53 2014
@@ -79,7 +79,9 @@ public class PullBasedOrchestrator imple
orchestratorConfiguration.setAiravataAPI(getAiravataAPI());
/* Starting submitter thread pool */
- executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize());
+ // we have a thread to run normal new jobs except to monitor hanged jobs
+ executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize() + 1);
+ this.startJobSubmitter();
} catch (RegistryException e) {
logger.error("Failed to initializing Orchestrator");
OrchestratorException orchestratorException = new OrchestratorException(e);
@@ -105,6 +107,7 @@ public class PullBasedOrchestrator imple
//todo decide whether to return an error or do what
public String createExperiment(ExperimentRequest request) throws OrchestratorException {
+ //todo use a consistent method to create the experiment ID
String experimentID = UUID.randomUUID().toString();
String username = request.getUserName();
try {
@@ -125,9 +128,11 @@ public class PullBasedOrchestrator imple
return false;
}
String experimentID = request.getExperimentID();
+ //todo use a more concrete user type in to this
String username = request.getUserName();
try {
airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
+ //todo save jobRequest data in to the database
} catch (RegistryException e) {
//todo put more meaningful error message
logger.error("Failed to create experiment for the request from " + request.getUserName());
@@ -137,23 +142,27 @@ public class PullBasedOrchestrator imple
}
public void startJobSubmitter() throws OrchestratorException {
- for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize(); i++) {
- JobSubmitterWorker jobSubmitterWorker = new JobSubmitterWorker(orchestratorContext);
- executor.execute(jobSubmitterWorker);
- }
- }
- private AiravataAPI getAiravataAPI(){
- if (airavataAPI==null) {
- try {
- String systemUserName = ServerSettings.getSystemUser();
- String gateway = ServerSettings.getSystemUserGateway();
- airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName);
- } catch (ApplicationSettingsException e) {
- logger.error("Unable to read the properties file", e);
- } catch (AiravataAPIInvocationException e) {
- logger.error("Unable to create Airavata API", e);
- }
- }
- return airavataAPI;
+ NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext);
+ executor.execute(jobSubmitterWorker);
+
+ for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize()-1; i++) {
+ HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext);
+ executor.execute(hangedJobWorker);
+ }
+ }
+
+ private AiravataAPI getAiravataAPI() {
+ if (airavataAPI == null) {
+ try {
+ String systemUserName = ServerSettings.getSystemUser();
+ String gateway = ServerSettings.getSystemUserGateway();
+ airavataAPI = AiravataAPIFactory.getAPI(gateway, systemUserName);
+ } catch (ApplicationSettingsException e) {
+ logger.error("Unable to read the properties file", e);
+ } catch (AiravataAPIInvocationException e) {
+ logger.error("Unable to create Airavata API", e);
+ }
+ }
+ return airavataAPI;
}
}
Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java Fri Jan 10 20:53:53 2014
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.orchestrator.core.utils;
-import org.apache.airavata.orchestrator.core.JobSubmitterWorker;
+import org.apache.airavata.orchestrator.core.NewJobWorker;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.registry.api.JobRequest;
@@ -39,7 +39,7 @@ public class OrchestratorUtils {
public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException {
URL resource =
- JobSubmitterWorker.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
+ NewJobWorker.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
if (resource == null) {
String error = "orchestrator.properties cannot be found, Failed to initialize Orchestrator";
logger.error(error);
Modified: airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java?rev=1557251&r1=1557250&r2=1557251&view=diff
==============================================================================
--- airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java (original)
+++ airavata/trunk/modules/orchestrator/orchestrator-core/src/test/java/org/apache/airavata/orchestrator/core/PullBasedOrchestratorTest.java Fri Jan 10 20:53:53 2014
@@ -43,10 +43,6 @@ public class PullBasedOrchestratorTest {
@BeforeTest
public void setUp() throws Exception {
-// System.setProperty("myproxy.user", "ogce");
-// System.setProperty("myproxy.password", "");
-// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
-// System.setProperty("gsi.working.directory","/home/ogce");
orchestrator = new PullBasedOrchestrator();
orchestrator.initialize();
}