You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airavata.apache.org by Marlon Pierce <ma...@iu.edu> on 2014/02/04 17:15:18 UTC
Fwd: git commit: implementing orchestrator cpi
Hi Lahiru--
Please associate these with Jira ticket and include the Jira ticket
number or numbers in the commit message.
Thanks--
Marlon
-------- Original Message --------
Subject: git commit: implementing orchestrator cpi
Date: Tue, 4 Feb 2014 16:11:20 +0000 (UTC)
From: lahiru@apache.org
Reply-To: dev@airavata.apache.org
To: commits@airavata.apache.org
Updated Branches:
refs/heads/master 5cf0a0f55 -> 4cfc32e39
implementing orchestrator cpi
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4cfc32e3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4cfc32e3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4cfc32e3
Branch: refs/heads/master
Commit: 4cfc32e39b8954390acadcad5003309e8b4bbb0b
Parents: 5cf0a0f
Author: lginnali <la...@149-160-172-90.dhcp-bl.indiana.edu>
Authored: Tue Feb 4 10:57:35 2014 -0500
Committer: lginnali <la...@149-160-172-90.dhcp-bl.indiana.edu>
Committed: Tue Feb 4 10:57:35 2014 -0500
----------------------------------------------------------------------
.../airavata/orchestrator/cpi/Orchestrator.java | 95 +++++++++++
.../cpi/impl/SimpleOrchestratorImpl.java | 163 +++++++++++++++++++
2 files changed, 258 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
new file mode 100644
index 0000000..83178a1
--- /dev/null
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi;
+
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
+import org.apache.airavata.registry.api.JobRequest;
+
+/*
+ This is the interface for orchestrator functionality exposed to the out side of the
+ module
+*/
+public interface Orchestrator {
+
+
+ /**
+ * This method will initialize the Orchestrator, during restart this will
+ * get called and do init tasks
+ * @return
+ * @throws OrchestratorException
+ */
+ boolean initialize() throws OrchestratorException;
+
+
+ /**
+ * This method is the very first method which create an entry in
+ * database for a given experiment, this return the experiment ID, so
+ * user have full control for the experiment
+ * @param request
+ * @return
+ * @throws OrchestratorException
+ */
+ String createExperiment(ExperimentRequest request) throws OrchestratorException;
+
+ /**
+ * After creating the experiment user has the experimentID, then user
+ * can create the JobRequest and send the Job input parameters to Orchestrator
+ * @param request
+ * @return
+ * @throws OrchestratorException
+ */
+ boolean launchExperiment(JobRequest request) throws OrchestratorException;
+
+ /**
+ * This method can be used to cancel a running experiment, if job is already finished it
+ * throws an exception. If job is not yet submitted it will just change the status to cancelled,
+ * if Job is running it will be killed from the resource and make the status to cancelled
+ * @param experimentID
+ * @return
+ * @throws OrchestratorException
+ */
+ boolean terminateExperiment(String experimentID)throws OrchestratorException;
+
+ /**
+ * This is like a cron job which runs continuously and take available jobs to
+ * submit to GFAC and submit them to GFAC
+ * @throws OrchestratorException
+ */
+ void startJobSubmitter() throws OrchestratorException;
+
+ /**
+ * This method will get called during graceful shutdown of Orchestrator
+ * This can be used to handle the shutdown of orchestrator gracefully.
+ * @return
+ * @throws OrchestratorException
+ */
+ void shutdown() throws OrchestratorException;
+
+ /**
+ * This method can be used to parse the current job data configured in
+ * Registry and validate its status, if it has minimum required parameters to
+ * submit the job this method returns true otherwise this returns false
+ * @param jobRequest
+ * @return
+ */
+ boolean validateExperiment(JobRequest jobRequest);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
new file mode 100644
index 0000000..1616690
--- /dev/null
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.airavata.common.utils.AiravataJobState;
+import org.apache.airavata.orchestrator.core.AbstractOrchestrator;
+import org.apache.airavata.orchestrator.core.HangedJobWorker;
+import org.apache.airavata.orchestrator.core.NewJobWorker;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.registry.api.JobRequest;
+import org.apache.airavata.registry.api.exception.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator {
+ private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
+ private ExecutorService executor;
+
+
+ // this is going to be null unless the thread count is 0
+ private JobSubmitter jobSubmitter = null;
+
+ public boolean initialize() throws OrchestratorException {
+ super.initialize();
+ // we have a thread to run normal new jobs except to monitor hanged jobs
+ if (orchestratorConfiguration.getThreadPoolSize() != 0) {
+ executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize() + 1);
+ this.startJobSubmitter();
+ } else {
+
+ try {
+ String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getNewJobSubmitterClass();
+ Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
+ jobSubmitter = aClass.newInstance();
+ jobSubmitter.initialize(this.orchestratorContext);
+ } catch (Exception e) {
+ String error = "Error creating JobSubmitter in non threaded mode ";
+ logger.error(error);
+ throw new OrchestratorException(error, e);
+ }
+ }
+ return true;
+ }
+
+
+ public void shutdown() throws OrchestratorException {
+ executor.shutdown();
+
+ }
+
+ public boolean launchExperiment(JobRequest request) throws OrchestratorException {
+ // validate the jobRequest first
+ if (!OrchestratorUtils.validateJobRequest(request)) {
+ logger.error("Invalid Job request sent, Experiment creation failed");
+ return false;
+ }
+ String experimentID = OrchestratorUtils.getUniqueID(request);
+ // we give higher priority to userExperimentID
+ if (experimentID == null) {
+ logger.error("Invalid Experiment ID given: " + request.getUserName());
+ return false;
+ }
+ //todo use a more concrete user type in to this
+ //FIXME: (MEP) Why don't we pass the JobRequest to the registry and let it do all of this? Or just store the JobRequest as an object directly in the registry?
+ try {
+ if (request.getHostDescription() != null) {
+ if (!airavataRegistry.isHostDescriptorExists(request.getHostDescription().getType().getHostName())) {
+ airavataRegistry.addHostDescriptor(request.getHostDescription());
+ }
+ }
+ if (request.getServiceDescription() != null) {
+ if (!airavataRegistry.isServiceDescriptorExists(request.getServiceDescription().getType().getName())) {
+ airavataRegistry.addServiceDescriptor(request.getServiceDescription());
+ }
+ }
+ if (request.getApplicationDescription() != null) {
+ if (request.getServiceDescription() != null && request.getHostDescription() != null) {
+ if (!airavataRegistry.isApplicationDescriptorExists(request.getServiceDescription().getType().getName(),
+ request.getHostDescription().getType().getHostName(), request.getApplicationDescription().getType().getApplicationName().getStringValue())) {
+ airavataRegistry.addApplicationDescriptor(request.getServiceDescription(),
+ request.getHostDescription(), request.getApplicationDescription());
+ }
+ } else {
+ String error = "Providing just Application Descriptor is not sufficient to save to Registry";
+ logger.error(error);
+ throw new OrchestratorException(error);
+ }
+ }
+ airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
+ if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() == 0) {
+ jobSubmitter.directJobSubmit(request);
+ }
+
+ //todo save jobRequest data in to the database
+ } catch (RegistryException e) {
+ //todo put more meaningful error message
+ logger.error("Failed to create experiment for the request from " + request.getUserName());
+ return false;
+ }
+ return true;
+ }
+
+ public void startJobSubmitter() throws OrchestratorException {
+ //FIXME: (MEP) Why create a new thread for jobSubmittedWorker but use the pool for HangedJobWorker?
+ //FIXME: (MEP) As discussed on the dev list, we need to improve this
+ NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext);
+ executor.execute(jobSubmitterWorker);
+
+ for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() - 1; i++) {
+ HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext);
+ executor.execute(hangedJobWorker);
+ }
+ }
+
+ public boolean terminateExperiment(String experimentID) throws OrchestratorException {
+ try {
+ AiravataJobState state = orchestratorContext.getRegistry().getState(experimentID);
+ if (state.getJobState().equals(AiravataJobState.State.RUNNING) || state.getJobState().equals(AiravataJobState.State.PENDING) ||
+ state.getJobState().equals(AiravataJobState.State.ACTIVE) || state.getJobState().equals(AiravataJobState.State.SUBMITTED)) {
+
+ //todo perform cancelling and last peform the database update
+
+ orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
+ } else if (state.getJobState().equals(AiravataJobState.State.DONE)) {
+ String error = "Job is already Finished so cannot cancel the job " + experimentID;
+ logger.error(error);
+ new OrchestratorException(error);
+ } else {
+ // do nothing but simply change the job state to cancelled because job is not yet submitted to the resource
+ orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
+ }
+
+ } catch (RegistryException e) {
+ String error = "Error reading the job state for the given Experiment ID: " + experimentID;
+ logger.error(error);
+ throw new OrchestratorException(error, e);
+ }
+ return true;
+ }
+}