You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airavata.apache.org by Marlon Pierce <ma...@iu.edu> on 2014/02/04 17:15:18 UTC

Fwd: git commit: implementing orchestrator cpi

Hi Lahiru--

Please associate these with Jira ticket and include the Jira ticket
number or numbers in the commit message.


Thanks--


Marlon



-------- Original Message --------
Subject: 	git commit: implementing orchestrator cpi
Date: 	Tue, 4 Feb 2014 16:11:20 +0000 (UTC)
From: 	lahiru@apache.org
Reply-To: 	dev@airavata.apache.org
To: 	commits@airavata.apache.org



Updated Branches:
  refs/heads/master 5cf0a0f55 -> 4cfc32e39


implementing orchestrator cpi


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4cfc32e3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4cfc32e3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4cfc32e3

Branch: refs/heads/master
Commit: 4cfc32e39b8954390acadcad5003309e8b4bbb0b
Parents: 5cf0a0f
Author: lginnali <la...@149-160-172-90.dhcp-bl.indiana.edu>
Authored: Tue Feb 4 10:57:35 2014 -0500
Committer: lginnali <la...@149-160-172-90.dhcp-bl.indiana.edu>
Committed: Tue Feb 4 10:57:35 2014 -0500

----------------------------------------------------------------------
 .../airavata/orchestrator/cpi/Orchestrator.java |  95 +++++++++++
 .../cpi/impl/SimpleOrchestratorImpl.java        | 163 +++++++++++++++++++
 2 files changed, 258 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
new file mode 100644
index 0000000..83178a1
--- /dev/null
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi;
+
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
+import org.apache.airavata.registry.api.JobRequest;
+
+/*
+   This is the interface for orchestrator functionality exposed to the out side of the
+   module
+*/
+public interface Orchestrator {
+
+
+    /**
+     * This method will initialize the Orchestrator, during restart this will
+     * get called and do init tasks
+     * @return
+     * @throws OrchestratorException
+     */
+    boolean initialize() throws OrchestratorException;
+
+
+    /**
+     *  This method is the very first method which create an entry in
+     * database for a given experiment, this return the experiment ID, so
+     * user have full control for the experiment
+     * @param request
+     * @return
+     * @throws OrchestratorException
+     */
+    String createExperiment(ExperimentRequest request) throws OrchestratorException;
+
+    /**
+     * After creating the experiment user has the experimentID, then user
+     * can create the JobRequest and send the Job input parameters to Orchestrator
+     * @param request
+     * @return
+     * @throws OrchestratorException
+     */
+    boolean launchExperiment(JobRequest request) throws OrchestratorException;
+
+    /**
+     * This method can be used to cancel a running experiment, if job is already finished it
+     * throws an exception. If job is not yet submitted it will just change the status to cancelled,
+     * if Job is running it will be killed from the resource and make the status to cancelled
+     * @param experimentID
+     * @return
+     * @throws OrchestratorException
+     */
+    boolean terminateExperiment(String experimentID)throws OrchestratorException;
+
+    /**
+     * This is like a cron job which runs continuously and take available jobs to
+     * submit to GFAC and submit them to GFAC
+     * @throws OrchestratorException
+     */
+    void startJobSubmitter() throws OrchestratorException;
+
+    /**
+     * This method will get called during graceful shutdown of Orchestrator
+     * This can be used to handle the shutdown of orchestrator gracefully.
+     * @return
+     * @throws OrchestratorException
+     */
+    void shutdown() throws OrchestratorException;
+
+    /**
+     * This method can be used to parse the current job data configured in
+     * Registry and validate its status, if it has minimum required parameters to
+     * submit the job this method returns true otherwise this returns false
+     * @param jobRequest
+     * @return
+     */
+    boolean validateExperiment(JobRequest jobRequest);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/4cfc32e3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
new file mode 100644
index 0000000..1616690
--- /dev/null
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.airavata.common.utils.AiravataJobState;
+import org.apache.airavata.orchestrator.core.AbstractOrchestrator;
+import org.apache.airavata.orchestrator.core.HangedJobWorker;
+import org.apache.airavata.orchestrator.core.NewJobWorker;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.registry.api.JobRequest;
+import org.apache.airavata.registry.api.exception.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator {
+    private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
+    private ExecutorService executor;
+
+
+    // this is going to be null unless the thread count is 0
+    private JobSubmitter jobSubmitter = null;
+
+    public boolean initialize() throws OrchestratorException {
+        super.initialize();
+        // we have a thread to run normal new jobs except to monitor hanged jobs
+        if (orchestratorConfiguration.getThreadPoolSize() != 0) {
+            executor = Executors.newFixedThreadPool(orchestratorConfiguration.getThreadPoolSize() + 1);
+            this.startJobSubmitter();
+        } else {
+
+            try {
+                String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getNewJobSubmitterClass();
+                Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
+                jobSubmitter = aClass.newInstance();
+                jobSubmitter.initialize(this.orchestratorContext);
+            } catch (Exception e) {
+                String error = "Error creating JobSubmitter in non threaded mode ";
+                logger.error(error);
+                throw new OrchestratorException(error, e);
+            }
+        }
+        return true;
+    }
+
+
+    public void shutdown() throws OrchestratorException {
+        executor.shutdown();
+
+    }
+
+    public boolean launchExperiment(JobRequest request) throws OrchestratorException {
+        // validate the jobRequest first
+        if (!OrchestratorUtils.validateJobRequest(request)) {
+            logger.error("Invalid Job request sent, Experiment creation failed");
+            return false;
+        }
+        String experimentID = OrchestratorUtils.getUniqueID(request);
+        // we give higher priority to userExperimentID
+        if (experimentID == null) {
+            logger.error("Invalid Experiment ID given: " + request.getUserName());
+            return false;
+        }
+        //todo use a more concrete user type in to this
+        //FIXME: (MEP) Why don't we pass the JobRequest to the registry and let it do all of this?  Or just store the JobRequest as an object directly in the registry?
+        try {
+            if (request.getHostDescription() != null) {
+                if (!airavataRegistry.isHostDescriptorExists(request.getHostDescription().getType().getHostName())) {
+                    airavataRegistry.addHostDescriptor(request.getHostDescription());
+                }
+            }
+            if (request.getServiceDescription() != null) {
+                if (!airavataRegistry.isServiceDescriptorExists(request.getServiceDescription().getType().getName())) {
+                    airavataRegistry.addServiceDescriptor(request.getServiceDescription());
+                }
+            }
+            if (request.getApplicationDescription() != null) {
+                if (request.getServiceDescription() != null && request.getHostDescription() != null) {
+                    if (!airavataRegistry.isApplicationDescriptorExists(request.getServiceDescription().getType().getName(),
+                            request.getHostDescription().getType().getHostName(), request.getApplicationDescription().getType().getApplicationName().getStringValue())) {
+                        airavataRegistry.addApplicationDescriptor(request.getServiceDescription(),
+                                request.getHostDescription(), request.getApplicationDescription());
+                    }
+                } else {
+                    String error = "Providing just Application Descriptor is not sufficient to save to Registry";
+                    logger.error(error);
+                    throw new OrchestratorException(error);
+                }
+            }
+            airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
+            if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() == 0) {
+                jobSubmitter.directJobSubmit(request);
+            }
+
+            //todo save jobRequest data in to the database
+        } catch (RegistryException e) {
+            //todo put more meaningful error message
+            logger.error("Failed to create experiment for the request from " + request.getUserName());
+            return false;
+        }
+        return true;
+    }
+
+    public void startJobSubmitter() throws OrchestratorException {
+        //FIXME: (MEP) Why create a new thread for jobSubmittedWorker but use the pool for HangedJobWorker?
+        //FIXME: (MEP) As discussed on the dev list, we need to improve this
+        NewJobWorker jobSubmitterWorker = new NewJobWorker(orchestratorContext);
+        executor.execute(jobSubmitterWorker);
+
+        for (int i = 0; i < orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() - 1; i++) {
+            HangedJobWorker hangedJobWorker = new HangedJobWorker(orchestratorContext);
+            executor.execute(hangedJobWorker);
+        }
+    }
+
+    public boolean terminateExperiment(String experimentID) throws OrchestratorException {
+        try {
+            AiravataJobState state = orchestratorContext.getRegistry().getState(experimentID);
+            if (state.getJobState().equals(AiravataJobState.State.RUNNING) || state.getJobState().equals(AiravataJobState.State.PENDING) ||
+                    state.getJobState().equals(AiravataJobState.State.ACTIVE) || state.getJobState().equals(AiravataJobState.State.SUBMITTED)) {
+
+                //todo perform cancelling and last peform the database update
+
+                orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
+            } else if (state.getJobState().equals(AiravataJobState.State.DONE)) {
+                String error = "Job is already Finished so cannot cancel the job " + experimentID;
+                logger.error(error);
+                new OrchestratorException(error);
+            } else {
+                // do nothing but simply change the job state to cancelled because job is not yet submitted to the resource
+                orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.CANCELLED);
+            }
+
+        } catch (RegistryException e) {
+            String error = "Error reading the job state for the given Experiment ID: " + experimentID;
+            logger.error(error);
+            throw new OrchestratorException(error, e);
+        }
+        return true;
+    }
+}