You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2016/10/28 14:51:06 UTC
[2/3] airavata-sandbox git commit: Added files with the design changes
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java
new file mode 100644
index 0000000..da357a4
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * This keeps configuration of orchestrator, mostly this keep static
+ * configuration, this can be accessed through orchestratorContext object
+ */
+public class OrchestratorConfiguration {
+
+ private String newJobSubmitterClass;
+
+ private String hangedJobSubmitterClass;
+
+ private int submitterInterval = 1000;
+
+ private int threadPoolSize = 10;
+
+ private boolean startSubmitter = false;
+
+ private URL brokerURL;
+
+ private boolean embeddedMode;
+
+ private List<String> validatorClasses;
+
+ private boolean enableValidation;
+
+
+ public List<String> getValidatorClasses() {
+ return validatorClasses;
+ }
+
+ public void setValidatorClasses(List<String> validatorClassesIn) {
+ this.validatorClasses = validatorClassesIn;
+ }
+
+ public boolean isEmbeddedMode() {
+ return embeddedMode;
+ }
+
+ public void setEmbeddedMode(boolean embeddedModeIn) {
+ this.embeddedMode = embeddedModeIn;
+ }
+
+ public URL getBrokerURL() {
+ return brokerURL;
+ }
+
+ public void setBrokerURL(URL brokerURLIn) {
+ this.brokerURL = brokerURLIn;
+ }
+
+ public String getNewJobSubmitterClass() {
+ return newJobSubmitterClass;
+ }
+
+ public int getSubmitterInterval() {
+ return submitterInterval;
+ }
+
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ public void setNewJobSubmitterClass(String newJobSubmitterClassIn) {
+ this.newJobSubmitterClass = newJobSubmitterClassIn;
+ }
+
+ public void setSubmitterInterval(int submitterIntervalIn) {
+ this.submitterInterval = submitterIntervalIn;
+ }
+
+ public void setThreadPoolSize(int threadPoolSizeIn) {
+ this.threadPoolSize = threadPoolSizeIn;
+ }
+
+ public boolean isStartSubmitter() {
+ return startSubmitter;
+ }
+
+ public void setStartSubmitter(boolean startSubmitterIn) {
+ this.startSubmitter = startSubmitterIn;
+ }
+
+ public String getHangedJobSubmitterClass() {
+ return hangedJobSubmitterClass;
+ }
+
+ public void setHangedJobSubmitterClass(String hangedJobSubmitterClassIn) {
+ this.hangedJobSubmitterClass = hangedJobSubmitterClassIn;
+ }
+
+ public boolean isEnableValidation() {
+ return enableValidation;
+ }
+
+ public void setEnableValidation(boolean enableValidationIn) {
+ this.enableValidation = enableValidationIn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
new file mode 100644
index 0000000..6a0f04b
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core.utils;
+
+/**
+ * This class contains all the constants in orchestrator-core
+ *
+ */
+/*public class OrchestratorConstants {
+ public static final String AIRAVATA_PROPERTIES = "airavata-server.properties";
+ public static final int hotUpdateInterval=1000;
+ public static final String SUBMIT_INTERVAL = "submitter.interval";
+ public static final String THREAD_POOL_SIZE = "threadpool.size";
+ public static final String START_SUBMITTER = "start.submitter";
+ public static final String EMBEDDED_MODE = "embedded.mode";
+ public static final String ENABLE_VALIDATION = "enable.validation";
+ public static final String JOB_VALIDATOR = "job.validators";
+}*/
+
+
+/**
+ * This enum contains all the constants in orchestrator-core
+ enum is the way about dealing with constants as its very powerful.
+ Hence, a design change has been made to change the class to enum.
+ *
+ */
+public enum OrchestratorConstants {
+ AIRAVATA_PROPERTIES("airavata-server.properties"),
+ hotUpdateInterval(1000),
+ SUBMIT_INTERVAL("submitter.interval"),
+ THREAD_POOL_SIZE("threadpool.size"),
+ START_SUBMITTER("start.submitter"),
+ EMBEDDED_MODE("embedded.mode"),
+ ENABLE_VALIDATION("enable.validation"),
+ JOB_VALIDATOR("job.validators");
+
+
+ private String stringConstant;
+ private int integerConstant;
+
+ OrchestratorConstants(String stringConstantIn)
+ {
+ stringConstant = stringConstantIn;
+ }
+ OrchestratorConstants(int integerConstantIn)
+ {
+ integerConstant = integerConstantIn;
+ }
+
+ public String getOrchestratorStringConstant()
+ {
+ return stringConstant;
+ }
+ public int getOrchestratorIntegerConstant()
+ {
+ return integerConstant;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
new file mode 100644
index 0000000..3ee40e0
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core.utils;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementInterface;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.data.movement.SCPDataMovement;
+import org.apache.airavata.model.data.movement.SecurityProtocol;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.ApplicationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This contains orchestrator specific utilities
+ */
+public class OrchestratorUtils {
+
+
+ private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class);
+
+ public static String OrchestratorStringConstant(OrchestratorConstants constant)
+ {
+ return constant.getOrchestratorStringConstant();
+ }
+
+ public static int OrchestratorIntegerConstant(OrchestratorConstants constant)
+ {
+ return constant.getOrchestratorIntegerConstant();
+ }
+
+ public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException {
+ OrchestratorConfiguration orchestratorConfiguration = new OrchestratorConfiguration();
+ orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.SUBMIT_INTERVAL))));
+ orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.THREAD_POOL_SIZE))));
+ orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.START_SUBMITTER))));
+ orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.EMBEDDED_MODE))));
+ orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.ENABLE_VALIDATION))));
+ if (orchestratorConfiguration.isEnableValidation()) {
+ orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.JOB_VALIDATOR)).split(",")));
+ }
+ return orchestratorConfiguration;
+ }
+
+ public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ String resourceHostId = model.getComputeResourceId();
+ ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+ , resourceHostId);
+ return preference.getPreferredJobSubmissionProtocol();
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog", e);
+ throw new RegistryException("Error occurred while initializing app catalog", e);
+ }
+ }
+
+ public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException {
+ try {
+ ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface();
+ ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+ return appInterface.getApplicationName();
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error while retrieving application interface", e);
+ }
+ }
+
+ public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ String resourceHostId = model.getComputeResourceId();
+ ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+ , resourceHostId);
+ return preference.getPreferredDataMovementProtocol();
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog", e);
+ throw new RegistryException("Error occurred while initializing app catalog", e);
+ }
+ }
+
+ public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ String resourceHostId = processModel.getComputeResourceId();
+ return gatewayProfile.getComputeResourcePreference(gatewayId, resourceHostId);
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog", e);
+ throw new RegistryException("Error occurred while initializing app catalog", e);
+ }
+ }
+
+ public static StoragePreference getStoragePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ String resourceHostId = processModel.getComputeResourceId();
+ return gatewayProfile.getStoragePreference(gatewayId, resourceHostId);
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog", e);
+ throw new RegistryException("Error occurred while initializing app catalog", e);
+ }
+ }
+
+ public static String getLoginUserName(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ String loginUserName = null;
+ String overrideLoginUserName = processModel.getResourceSchedule().getOverrideLoginUserName();
+ if (overrideLoginUserName != null && !overrideLoginUserName.equals("")) {
+ loginUserName = overrideLoginUserName;
+ } else {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ loginUserName = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getLoginUserName();
+ }
+ return loginUserName;
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog to fetch login username", e);
+ throw new RegistryException("Error occurred while initializing app catalog to fetch login username", e);
+ }
+ }
+
+ public static String getScratchLocation(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ String scratchLocation = null;
+ String overrideScratchLocation = processModel.getResourceSchedule().getOverrideScratchLocation();
+ if (overrideScratchLocation != null && !overrideScratchLocation.equals("")) {
+ scratchLocation = overrideScratchLocation;
+ } else {
+ GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+ scratchLocation = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getScratchLocation();
+ }
+ return scratchLocation;
+ } catch (AppCatalogException e) {
+ logger.error("Error occurred while initializing app catalog to fetch scratch location", e);
+ throw new RegistryException("Error occurred while initializing app catalog to fetch scratch location", e);
+ }
+ }
+
+ public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ String resourceHostId = processModel.getComputeResourceId();
+ ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+ JobSubmissionProtocol preferredJobSubmissionProtocol = resourcePreference.getPreferredJobSubmissionProtocol();
+ ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+ Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+ List<JobSubmissionInterface> interfaces = new ArrayList<>();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+ for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+ if (preferredJobSubmissionProtocol != null){
+ if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+ if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+ List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+ interfaceList.add(submissionInterface);
+ }else {
+ interfaces.add(submissionInterface);
+ orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+ }
+ }
+ }else {
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ }
+ }
+ interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+ Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ } else {
+ throw new RegistryException("Compute resource should have at least one job submission interface defined...");
+ }
+ return interfaces.get(0);
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+ }
+ }
+
+ public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+ try {
+ String resourceHostId = processModel.getComputeResourceId();
+ ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+ DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol();
+ ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces();
+ if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
+ for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){
+ if (preferredDataMovementProtocol != null){
+ if (preferredDataMovementProtocol.toString().equals(dataMovementInterface.getDataMovementProtocol().toString())){
+ return dataMovementInterface;
+ }
+ }
+ }
+ } else {
+ throw new RegistryException("Compute resource should have at least one data movement interface defined...");
+ }
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+ }
+ return null;
+ }
+
+ public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+ try {
+ DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId);
+ DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId);
+ if (protocol == DataMovementProtocol.SCP ) {
+ SCPDataMovement scpDataMovement = getSCPDataMovement(context, dataMovementInterface.getDataMovementInterfaceId());
+ if (scpDataMovement != null) {
+ return scpDataMovement.getSshPort();
+ }
+ }
+ } catch (RegistryException e) {
+ logger.error("Error occurred while retrieving security protocol", e);
+ }
+ return 0;
+ }
+
+
+ public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+ try {
+ JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
+ JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(context, processModel, gatewayId);
+ if (submissionProtocol == JobSubmissionProtocol.SSH ) {
+ SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ return sshJobSubmission.getSecurityProtocol();
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localJobSubmission = getLocalJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (localJobSubmission != null) {
+ return localJobSubmission.getSecurityProtocol();
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){
+ SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ return sshJobSubmission.getSecurityProtocol();
+ }
+ }
+ } catch (RegistryException e) {
+ logger.error("Error occurred while retrieving security protocol", e);
+ }
+ return null;
+ }
+
+ public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+
+ public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+
+ public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+
+ public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
+ try {
+ AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+ return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
+ } catch (Exception e) {
+ String errorMsg = "Error while retrieving SCP Data movement with submission id : " + dataMoveId;
+ logger.error(errorMsg, e);
+ throw new RegistryException(errorMsg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
new file mode 100644
index 0000000..0319f27
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -0,0 +1,625 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi.impl;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.error.LaunchValidationException;
+import org.apache.airavata.model.error.ValidationResults;
+import org.apache.airavata.model.error.ValidatorResult;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.experiment.*;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.*;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator{
+ private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
+ private ExecutorService executor;
+
+ // this is going to be null unless the thread count is 0
+ private JobSubmitter jobSubmitter = null;
+
+
+ public SimpleOrchestratorImpl() throws OrchestratorException {
+ try {
+ try {
+ // We are only going to use GFacPassiveJobSubmitter
+ jobSubmitter = new GFACPassiveJobSubmitter();
+ jobSubmitter.initialize(this.orchestratorContext);
+
+ } catch (Exception e) {
+ String error = "Error creating JobSubmitter in non threaded mode ";
+ logger.error(error);
+ throw new OrchestratorException(error, e);
+ }
+ } catch (OrchestratorException e) {
+ logger.error("Error Constructing the Orchestrator");
+ throw e;
+ }
+ }
+
+ public boolean launchProcess(ProcessModel processModel, String tokenId) throws OrchestratorException {
+ try {
+ return jobSubmitter.submit(processModel.getExperimentId(), processModel.getProcessId(), tokenId);
+ } catch (Exception e) {
+ throw new OrchestratorException("Error launching the job", e);
+ }
+ }
+
+ public ValidationResults validateExperiment(ExperimentModel experiment) throws OrchestratorException,LaunchValidationException {
+ org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
+ validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
+ String errorMsg = "Validation Errors : ";
+ if (this.orchestratorConfiguration.isEnableValidation()) {
+ List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
+ for (String validator : validatorClasses) {
+ try {
+ Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+ JobMetadataValidator jobMetadataValidator = vClass.newInstance();
+ validationResults = jobMetadataValidator.validate(experiment, null);
+ if (validationResults.isValidationState()) {
+ logger.info("Validation of " + validator + " is SUCCESSFUL");
+ } else {
+ List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
+ for (ValidatorResult result : validationResultList){
+ if (!result.isResult()){
+ String validationError = result.getErrorDetails();
+ if (validationError != null){
+ errorMsg += validationError + " ";
+ }
+ }
+ }
+ logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+ validationResults.setValidationState(false);
+ try {
+ ErrorModel details = new ErrorModel();
+ details.setActualErrorMessage(errorMsg);
+ details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, details,
+ experiment.getExperimentId());
+ } catch (RegistryException e) {
+ logger.error("Error while saving error details to registry", e);
+ }
+ break;
+ }
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ } /*catch (InstantiationException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ } catch (IllegalAccessException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ }*/
+ }
+ }
+ if(validationResults.isValidationState()){
+ return validationResults;
+ }else {
+ //atleast one validation has failed, so we throw an exception
+ LaunchValidationException launchValidationException = new LaunchValidationException();
+ launchValidationException.setValidationResult(validationResults);
+ launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+ throw launchValidationException;
+ }
+ }
+
+ public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) throws OrchestratorException,LaunchValidationException {
+ org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
+ validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
+ String errorMsg = "Validation Errors : ";
+ if (this.orchestratorConfiguration.isEnableValidation()) {
+ List<String> validatorClzzez = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
+ for (String validator : validatorClzzez) {
+ try {
+ Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+ JobMetadataValidator jobMetadataValidator = vClass.newInstance();
+ validationResults = jobMetadataValidator.validate(experiment, processModel);
+ if (validationResults.isValidationState()) {
+ logger.info("Validation of " + validator + " is SUCCESSFUL");
+ } else {
+ List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
+ for (ValidatorResult result : validationResultList){
+ if (!result.isResult()){
+ String validationError = result.getErrorDetails();
+ if (validationError != null){
+ errorMsg += validationError + " ";
+ }
+ }
+ }
+ logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+ validationResults.setValidationState(false);
+ try {
+ ErrorModel details = new ErrorModel();
+ details.setActualErrorMessage(errorMsg);
+ details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, details,
+ processModel.getProcessId());
+ } catch (RegistryException e) {
+ logger.error("Error while saving error details to registry", e);
+ }
+ break;
+ }
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ } /*catch (InstantiationException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ } catch (IllegalAccessException e) {
+ logger.error("Error loading the validation class: ", validator, e);
+ validationResults.setValidationState(false);
+ }*/
+ }
+ }
+ if(validationResults.isValidationState()){
+ return validationResults;
+ }else {
+ //atleast one validation has failed, so we throw an exception
+ LaunchValidationException launchValidationException = new LaunchValidationException();
+ launchValidationException.setValidationResult(validationResults);
+ launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+ throw launchValidationException;
+ }
+ }
+
+
+ public void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId)
+ throws OrchestratorException {
+ // FIXME
+// List<JobDetails> jobDetailsList = task.getJobDetailsList();
+// for(JobDetails jobDetails:jobDetailsList) {
+// JobState jobState = jobDetails.getJobStatus().getJobState();
+// if (jobState.getValue() > 4){
+// logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() +
+// "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName());
+// return;
+// }
+// }
+// jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId);
+ }
+
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(ExecutorService executorIn) {
+ this.executor = executorIn;
+ }
+
+ public JobSubmitter getJobSubmitter() {
+ return jobSubmitter;
+ }
+
+ public void setJobSubmitter(JobSubmitter jobSubmitterIn) {
+ this.jobSubmitter = jobSubmitterIn;
+ }
+
+ public void initialize() throws OrchestratorException {
+
+ }
+
+ public List<ProcessModel> createProcesses (String experimentId, String gatewayId) throws OrchestratorException {
+ List<ProcessModel> processModels = new ArrayList<ProcessModel>();
+ try {
+ Registry registry = orchestratorContext.getRegistry();
+ ExperimentModel experimentModel = (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ List<Object> processList = registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
+ if (processList != null && !processList.isEmpty()) {
+ for (Object processObject : processList) {
+ ProcessModel processModel = (ProcessModel)processObject;
+ processModels.add(processModel);
+ }
+ }else {
+ ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
+ String processId = (String)registry.getExperimentCatalog().add(ExpCatChildDataType.PROCESS, processModel, experimentId);
+ processModel.setProcessId(processId);
+ processModels.add(processModel);
+ }
+ } catch (Exception e) {
+ throw new OrchestratorException("Error during creating process");
+ }
+ return processModels;
+ }
+
+ public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException {
+ try {
+ ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog();
+ AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog();
+ ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule();
+ String userGivenQueueName = resourceSchedule.getQueueName();
+ int userGivenWallTime = resourceSchedule.getWallTimeLimit();
+ String resourceHostId = resourceSchedule.getResourceHostId();
+ if (resourceHostId == null){
+ throw new OrchestratorException("Compute Resource Id cannot be null at this point");
+ }
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
+ JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
+ ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ List<String> taskIdList = new ArrayList<>();
+
+ if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) {
+ // TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here.
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ } else {
+ taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+ taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
+ if (autoSchedule) {
+ List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
+ for (BatchQueue batchQueue : definedBatchQueues) {
+ if (batchQueue.getQueueName().equals(userGivenQueueName)) {
+ int maxRunTime = batchQueue.getMaxRunTime();
+ if (maxRunTime < userGivenWallTime) {
+ resourceSchedule.setWallTimeLimit(maxRunTime);
+ // need to create more job submissions
+ int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
+ for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
+ }
+ int leftWallTime = userGivenWallTime % maxRunTime;
+ if (leftWallTime != 0) {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+ }
+ } else {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ }
+ }
+ }
+ } else {
+ taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ }
+ taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+ }
+ // update process scheduling
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
+ return getTaskDag(taskIdList);
+ } catch (Exception e) {
+ throw new OrchestratorException("Error during creating process");
+ }
+ }
+
+ private String getTaskDag(List<String> taskIdList) {
+ if (taskIdList.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String s : taskIdList) {
+ sb.append(s).append(","); // comma separated values
+ }
+ String dag = sb.toString();
+ return dag.substring(0, dag.length() - 1); // remove last comma
+ }
+
+ private List<String> createAndSaveEnvSetupTask(String gatewayId,
+ ProcessModel processModel,
+ ExperimentCatalog experimentCatalog)
+ throws RegistryException, TException {
+ List<String> envTaskIds = new ArrayList<>();
+ TaskModel envSetupTask = new TaskModel();
+ envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
+ envSetupTask.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ envSetupTask.setParentProcessId(processModel.getProcessId());
+ EnvironmentSetupTaskModel envSetupSubModel = new EnvironmentSetupTaskModel();
+ envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(orchestratorContext, processModel, gatewayId));
+ ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ String scratchLocation = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId);
+ String workingDir = scratchLocation + File.separator + processModel.getProcessId();
+ envSetupSubModel.setLocation(workingDir);
+ byte[] envSetupSub = ThriftUtils.serializeThriftObject(envSetupSubModel);
+ envSetupTask.setSubTaskModel(envSetupSub);
+ String envSetupTaskId = (String) experimentCatalog.add(ExpCatChildDataType.TASK, envSetupTask, processModel.getProcessId());
+ envSetupTask.setTaskId(envSetupTaskId);
+ envTaskIds.add(envSetupTaskId);
+ return envTaskIds;
+ }
+
+ public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+ List<String> dataStagingTaskIds = new ArrayList<>();
+ List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+
+ sortByInputOrder(processInputs);
+ if (processInputs != null) {
+ for (InputDataObjectType processInput : processInputs) {
+ DataType type = processInput.getType();
+ switch (type) {
+ case STDERR:
+ break;
+ case STDOUT:
+ break;
+ case URI:
+ case URI_COLLECTION:
+ try {
+ TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask,
+ processModel.getProcessId());
+ inputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
+ } catch (TException | AppCatalogException | TaskException e) {
+ throw new RegistryException("Error while serializing data staging sub task model");
+ }
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+ }
+ }
+ return dataStagingTaskIds;
+ }
+
+ public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+ List<String> dataStagingTaskIds = new ArrayList<>();
+ List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+ String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel);
+ if (processOutputs != null) {
+ for (OutputDataObjectType processOutput : processOutputs) {
+ DataType type = processOutput.getType();
+ switch (type) {
+ case STDOUT :
+ processOutput.setValue(appName + ".stdout");
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ case STDERR:
+ processOutput.setValue(appName + ".stderr");
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ case URI:
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+ break;
+ default:
+ // nothing to do
+ break;
+ }
+ }
+ }
+
+ try {
+ if (isArchive(processModel, gatewayId)) {
+ createArchiveDataStatgingTask(processModel, gatewayId, dataStagingTaskIds);
+ }
+ } catch (AppCatalogException e) {
+ throw new RegistryException("Error! Application interface retrieval failed");
+ }
+ return dataStagingTaskIds;
+ }
+
+ private boolean isArchive(ProcessModel processModel, String gatewayId) throws AppCatalogException {
+ AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+ ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId());
+ return appInterface.isArchiveWorkingDirectory();
+ }
+
+ private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+ TaskModel archiveTask = null;
+ try {
+ archiveTask = getOutputDataStagingTask(processModel, null, gatewayId);
+ } catch (TException e) {
+ throw new RegistryException("Error! DataStaging sub task serialization failed");
+ }
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask,
+ processModel.getProcessId());
+ archiveTask.setTaskId(taskId);
+ dataStagingTaskIds.add(archiveTask.getTaskId());
+
+ }
+
+ private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+ processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new RegistryException("Error while serializing data staging sub task model", e);
+ }
+ }
+
+ private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime)
+ throws TException, RegistryException, OrchestratorException {
+
+ JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+ MonitorMode monitorMode = null;
+ if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+ SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId());
+ monitorMode = sshJobSubmission.getMonitorMode();
+ } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+ monitorMode = MonitorMode.FORK;
+ } else {
+ logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
+ processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
+ throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
+ }
+ List<String> submissionTaskIds = new ArrayList<>();
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(new Date().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setTaskStatus(taskStatus);
+ taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+ JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
+ submissionSubTask.setMonitorMode(monitorMode);
+ submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
+ submissionSubTask.setWallTime(wallTime);
+ byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
+ taskModel.setSubTaskModel(bytes);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
+ processModel.getProcessId());
+ taskModel.setTaskId(taskId);
+ submissionTaskIds.add(taskModel.getTaskId());
+
+ // create monitor task for this Email based monitor mode job
+ if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+ TaskModel monitorTaskModel = new TaskModel();
+ monitorTaskModel.setParentProcessId(processModel.getProcessId());
+ monitorTaskModel.setCreationTime(new Date().getTime());
+ monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
+ TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+ monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ monitorTaskModel.setTaskStatus(monitorTaskStatus);
+ monitorTaskModel.setTaskType(TaskTypes.MONITORING);
+ MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
+ monitorSubTaskModel.setMonitorMode(monitorMode);
+ monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
+ String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, monitorTaskModel, processModel.getProcessId());
+ monitorTaskModel.setTaskId(mTaskId);
+ submissionTaskIds.add(monitorTaskModel.getTaskId());
+ }
+
+ return submissionTaskIds;
+ }
+
+ private void sortByInputOrder(List<InputDataObjectType> processInputs) {
+ Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
+ return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
+ }
+ });
+ }
+
+ private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException {
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setTaskStatus(taskStatus);
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ // create data staging sub task model
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+ String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
+ remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ URI destination = null;
+ try {
+ DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+ String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId);
+ destination = new URI(dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+ remoteOutputDir , null, null);
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing destination file URI");
+ }
+ submodel.setType(DataStageType.INPUT);
+ submodel.setSource(processInput.getValue());
+ submodel.setProcessInput(processInput);
+ submodel.setDestination(destination.toString());
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ return taskModel;
+ }
+
+ private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+ try {
+
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setTaskStatus(taskStatus);
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+ ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+
+ String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
+ remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+ URI source = null;
+ try {
+ String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId);
+ if (processOutput != null) {
+ submodel.setType(DataStageType.OUPUT);
+ submodel.setProcessOutput(processOutput);
+ source = new URI(dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+ remoteOutputDir + processOutput.getValue(), null, null);
+ } else {
+ // archive
+ submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+ source = new URI(dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+ remoteOutputDir, null, null);
+ }
+ } catch (URISyntaxException e) {
+ throw new TaskException("Error while constructing source file URI");
+ }
+ // We don't know destination location at this time, data staging task will set this.
+ // because destination is required field we set dummy destination
+ submodel.setSource(source.toString());
+ // We don't know destination location at this time, data staging task will set this.
+ // because destination is required field we set dummy destination
+ submodel.setDestination("dummy://temp/file/location");
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ return taskModel;
+ } catch (AppCatalogException | TaskException e) {
+ throw new RegistryException("Error occurred while retrieving data movement from app catalog", e);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml
new file mode 100644
index 0000000..0761b04
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (the� "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>orchestrator</artifactId>
+ <packaging>pom</packaging>
+ <name>Airavata Orchestrator</name>
+ <url>http://airavata.apache.org/</url>
+
+ <profiles>
+ <profile>
+ <id>default</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <modules>
+ <module>orchestrator-core</module>
+ <module>orchestrator-service</module>
+ <module>orchestrator-client</module>
+ </modules>
+ </profile>
+ </profiles>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+</project>