You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2016/10/28 14:51:06 UTC

[2/3] airavata-sandbox git commit: Added files with the design changes

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java
new file mode 100644
index 0000000..da357a4
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java	
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * This keeps configuration of orchestrator, mostly this keep static
+ * configuration, this can be accessed through orchestratorContext object
+ */
+public class OrchestratorConfiguration {
+
+    private String newJobSubmitterClass;
+
+    private String hangedJobSubmitterClass;
+
+    private int submitterInterval = 1000;
+
+    private int threadPoolSize = 10;
+
+    private boolean startSubmitter = false;
+
+    private URL brokerURL;
+
+    private boolean embeddedMode;
+
+    private List<String> validatorClasses;
+
+    private boolean enableValidation;
+
+
+    public List<String> getValidatorClasses() {
+        return validatorClasses;
+    }
+
+    public void setValidatorClasses(List<String> validatorClassesIn) {
+        this.validatorClasses = validatorClassesIn;
+    }
+
+    public boolean isEmbeddedMode() {
+        return embeddedMode;
+    }
+
+    public void setEmbeddedMode(boolean embeddedModeIn) {
+        this.embeddedMode = embeddedModeIn;
+    }
+
+    public URL getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(URL brokerURLIn) {
+        this.brokerURL = brokerURLIn;
+    }
+
+    public String getNewJobSubmitterClass() {
+        return newJobSubmitterClass;
+    }
+
+    public int getSubmitterInterval() {
+        return submitterInterval;
+    }
+
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    public void setNewJobSubmitterClass(String newJobSubmitterClassIn) {
+        this.newJobSubmitterClass = newJobSubmitterClassIn;
+    }
+
+    public void setSubmitterInterval(int submitterIntervalIn) {
+        this.submitterInterval = submitterIntervalIn;
+    }
+
+    public void setThreadPoolSize(int threadPoolSizeIn) {
+        this.threadPoolSize = threadPoolSizeIn;
+    }
+
+    public boolean isStartSubmitter() {
+        return startSubmitter;
+    }
+
+    public void setStartSubmitter(boolean startSubmitterIn) {
+        this.startSubmitter = startSubmitterIn;
+    }
+
+    public String getHangedJobSubmitterClass() {
+        return hangedJobSubmitterClass;
+    }
+
+    public void setHangedJobSubmitterClass(String hangedJobSubmitterClassIn) {
+        this.hangedJobSubmitterClass = hangedJobSubmitterClassIn;
+    }
+
+    public boolean isEnableValidation() {
+        return enableValidation;
+    }
+
+    public void setEnableValidation(boolean enableValidationIn) {
+        this.enableValidation = enableValidationIn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
new file mode 100644
index 0000000..6a0f04b
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java	
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core.utils;
+
+/**
+ * This class contains all the constants in orchestrator-core
+ *
+ */
+/*public class OrchestratorConstants {
+    public static final String AIRAVATA_PROPERTIES = "airavata-server.properties";
+    public static final int hotUpdateInterval=1000;
+    public static final String SUBMIT_INTERVAL = "submitter.interval";
+    public static final String THREAD_POOL_SIZE = "threadpool.size";
+    public static final String START_SUBMITTER = "start.submitter";
+    public static final String EMBEDDED_MODE = "embedded.mode";
+    public static final String ENABLE_VALIDATION = "enable.validation";
+    public static final String JOB_VALIDATOR = "job.validators";
+}*/
+
+
+/**
+ * This enum contains all the constants in orchestrator-core
+   enum is the way about dealing with constants as its very powerful.
+   Hence, a design change has been made to change the class to enum.
+ *
+ */
+public enum OrchestratorConstants {
+    AIRAVATA_PROPERTIES("airavata-server.properties"),
+    hotUpdateInterval(1000),
+    SUBMIT_INTERVAL("submitter.interval"),
+    THREAD_POOL_SIZE("threadpool.size"),
+    START_SUBMITTER("start.submitter"),
+    EMBEDDED_MODE("embedded.mode"),
+    ENABLE_VALIDATION("enable.validation"),
+    JOB_VALIDATOR("job.validators");
+
+
+    private String stringConstant;
+    private int integerConstant;
+
+    OrchestratorConstants(String stringConstantIn)
+    {
+      stringConstant = stringConstantIn;
+    }
+    OrchestratorConstants(int integerConstantIn)
+    {
+      integerConstant = integerConstantIn;
+    }
+
+    public String getOrchestratorStringConstant()
+    {
+      return stringConstant;
+    }
+    public int getOrchestratorIntegerConstant()
+    {
+      return integerConstant;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
new file mode 100644
index 0000000..3ee40e0
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java	
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.core.utils;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementInterface;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.data.movement.SCPDataMovement;
+import org.apache.airavata.model.data.movement.SecurityProtocol;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.ApplicationInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This contains orchestrator specific utilities
+ */
+public class OrchestratorUtils {
+
+
+    private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class);
+
+    public static String OrchestratorStringConstant(OrchestratorConstants constant)
+    {
+      return constant.getOrchestratorStringConstant();
+    }
+
+    public static int OrchestratorIntegerConstant(OrchestratorConstants constant)
+    {
+      return constant.getOrchestratorIntegerConstant();
+    }
+
+    public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException {
+        OrchestratorConfiguration orchestratorConfiguration = new OrchestratorConfiguration();
+        orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.SUBMIT_INTERVAL))));
+        orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.THREAD_POOL_SIZE))));
+        orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.START_SUBMITTER))));
+        orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.EMBEDDED_MODE))));
+        orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.ENABLE_VALIDATION))));
+        if (orchestratorConfiguration.isEnableValidation()) {
+            orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.JOB_VALIDATOR)).split(",")));
+        }
+        return orchestratorConfiguration;
+    }
+
+    public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+            String resourceHostId = model.getComputeResourceId();
+            ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+                    , resourceHostId);
+            return preference.getPreferredJobSubmissionProtocol();
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog", e);
+            throw new RegistryException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException {
+        try {
+            ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface();
+            ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+            return appInterface.getApplicationName();
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error while retrieving application interface", e);
+        }
+    }
+
+    public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+            String resourceHostId = model.getComputeResourceId();
+            ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId
+                    , resourceHostId);
+            return preference.getPreferredDataMovementProtocol();
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog", e);
+            throw new RegistryException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+            String resourceHostId = processModel.getComputeResourceId();
+            return gatewayProfile.getComputeResourcePreference(gatewayId, resourceHostId);
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog", e);
+            throw new RegistryException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static StoragePreference getStoragePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+            String resourceHostId = processModel.getComputeResourceId();
+            return gatewayProfile.getStoragePreference(gatewayId, resourceHostId);
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog", e);
+            throw new RegistryException("Error occurred while initializing app catalog", e);
+        }
+    }
+
+    public static String getLoginUserName(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            String loginUserName = null;
+            String overrideLoginUserName = processModel.getResourceSchedule().getOverrideLoginUserName();
+            if (overrideLoginUserName != null && !overrideLoginUserName.equals("")) {
+                loginUserName = overrideLoginUserName;
+            } else {
+                GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+                loginUserName = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getLoginUserName();
+            }
+            return loginUserName;
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog to fetch login username", e);
+            throw new RegistryException("Error occurred while initializing app catalog to fetch login username", e);
+        }
+    }
+
+    public static String getScratchLocation(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            String scratchLocation = null;
+            String overrideScratchLocation = processModel.getResourceSchedule().getOverrideScratchLocation();
+            if (overrideScratchLocation != null && !overrideScratchLocation.equals("")) {
+                scratchLocation = overrideScratchLocation;
+            } else {
+                GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile();
+                scratchLocation = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getScratchLocation();
+            }
+            return scratchLocation;
+        } catch (AppCatalogException e) {
+            logger.error("Error occurred while initializing app catalog to fetch scratch location", e);
+            throw new RegistryException("Error occurred while initializing app catalog to fetch scratch location", e);
+        }
+    }
+
+    public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            String resourceHostId = processModel.getComputeResourceId();
+            ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+            JobSubmissionProtocol preferredJobSubmissionProtocol = resourcePreference.getPreferredJobSubmissionProtocol();
+            ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+            Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>();
+            List<JobSubmissionInterface> interfaces = new ArrayList<>();
+            if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+                for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){
+
+                    if (preferredJobSubmissionProtocol != null){
+                        if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){
+                            if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){
+                                List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol());
+                                interfaceList.add(submissionInterface);
+                            }else {
+                                interfaces.add(submissionInterface);
+                                orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces);
+                            }
+                        }
+                    }else {
+                        Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+                            @Override
+                            public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                                return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                            }
+                        });
+                    }
+                }
+                interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol);
+                Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() {
+                    @Override
+                    public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+                        return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+                    }
+                });
+            } else {
+                throw new RegistryException("Compute resource should have at least one job submission interface defined...");
+            }
+            return interfaces.get(0);
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+        }
+    }
+
+    public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException {
+        try {
+            String resourceHostId = processModel.getComputeResourceId();
+            ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId);
+            DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol();
+            ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces();
+            if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) {
+                for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){
+                    if (preferredDataMovementProtocol != null){
+                        if (preferredDataMovementProtocol.toString().equals(dataMovementInterface.getDataMovementProtocol().toString())){
+                            return dataMovementInterface;
+                        }
+                    }
+                }
+            } else {
+                throw new RegistryException("Compute resource should have at least one data movement interface defined...");
+            }
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error occurred while retrieving data from app catalog", e);
+        }
+        return null;
+    }
+
+    public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+        try {
+            DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId);
+            DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId);
+            if (protocol == DataMovementProtocol.SCP ) {
+                SCPDataMovement scpDataMovement = getSCPDataMovement(context, dataMovementInterface.getDataMovementInterfaceId());
+                if (scpDataMovement != null) {
+                    return scpDataMovement.getSshPort();
+                }
+            }
+        } catch (RegistryException e) {
+            logger.error("Error occurred while retrieving security protocol", e);
+        }
+        return 0;
+    }
+
+
+    public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{
+        try {
+            JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
+            JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(context, processModel, gatewayId);
+            if (submissionProtocol == JobSubmissionProtocol.SSH ) {
+                SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getSecurityProtocol();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+                LOCALSubmission localJobSubmission = getLocalJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (localJobSubmission != null) {
+                    return localJobSubmission.getSecurityProtocol();
+                }
+            } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){
+                SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId());
+                if (sshJobSubmission != null) {
+                    return sshJobSubmission.getSecurityProtocol();
+                }
+            }
+        } catch (RegistryException e) {
+            logger.error("Error occurred while retrieving security protocol", e);
+        }
+        return null;
+    }
+
+    public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
+    public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
+    public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+
+    public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException {
+        try {
+            AppCatalog appCatalog = context.getRegistry().getAppCatalog();
+            return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
+        } catch (Exception e) {
+            String errorMsg = "Error while retrieving SCP Data movement with submission id : " + dataMoveId;
+            logger.error(errorMsg, e);
+            throw new RegistryException(errorMsg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
new file mode 100644
index 0000000..0319f27
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java	
@@ -0,0 +1,625 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.cpi.impl;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.error.LaunchValidationException;
+import org.apache.airavata.model.error.ValidationResults;
+import org.apache.airavata.model.error.ValidatorResult;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.experiment.*;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.*;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
+import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.utils.Constants;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator{
+    private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
+    private ExecutorService executor;
+
+    // this is going to be null unless the thread count is 0
+    private JobSubmitter jobSubmitter = null;
+
+
+    public SimpleOrchestratorImpl() throws OrchestratorException {
+        try {
+            try {
+                // We are only going to use GFacPassiveJobSubmitter
+                jobSubmitter = new GFACPassiveJobSubmitter();
+                jobSubmitter.initialize(this.orchestratorContext);
+
+            } catch (Exception e) {
+                String error = "Error creating JobSubmitter in non threaded mode ";
+                logger.error(error);
+                throw new OrchestratorException(error, e);
+            }
+        } catch (OrchestratorException e) {
+            logger.error("Error Constructing the Orchestrator");
+            throw e;
+        }
+    }
+
+    public boolean launchProcess(ProcessModel processModel, String tokenId) throws OrchestratorException {
+        try {
+	        return jobSubmitter.submit(processModel.getExperimentId(), processModel.getProcessId(), tokenId);
+        } catch (Exception e) {
+            throw new OrchestratorException("Error launching the job", e);
+        }
+    }
+
+    public ValidationResults validateExperiment(ExperimentModel experiment) throws OrchestratorException,LaunchValidationException {
+        org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
+        validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
+        String errorMsg = "Validation Errors : ";
+        if (this.orchestratorConfiguration.isEnableValidation()) {
+            List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
+            for (String validator : validatorClasses) {
+                try {
+                    Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+                    JobMetadataValidator jobMetadataValidator = vClass.newInstance();
+                    validationResults = jobMetadataValidator.validate(experiment, null);
+                    if (validationResults.isValidationState()) {
+                        logger.info("Validation of " + validator + " is SUCCESSFUL");
+                    } else {
+                        List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
+                        for (ValidatorResult result : validationResultList){
+                            if (!result.isResult()){
+                                String validationError = result.getErrorDetails();
+                                if (validationError != null){
+                                    errorMsg += validationError + " ";
+                                }
+                            }
+                        }
+                        logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+                        validationResults.setValidationState(false);
+                        try {
+                            ErrorModel details = new ErrorModel();
+                            details.setActualErrorMessage(errorMsg);
+                            details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+                            orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, details,
+                                    experiment.getExperimentId());
+                        } catch (RegistryException e) {
+                            logger.error("Error while saving error details to registry", e);
+                        }
+                        break;
+                    }
+                } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                } /*catch (InstantiationException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                } catch (IllegalAccessException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                }*/
+            }
+        }
+        if(validationResults.isValidationState()){
+            return validationResults;
+        }else {
+            //atleast one validation has failed, so we throw an exception
+            LaunchValidationException launchValidationException = new LaunchValidationException();
+            launchValidationException.setValidationResult(validationResults);
+            launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+            throw launchValidationException;
+        }
+    }
+
+    public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) throws OrchestratorException,LaunchValidationException {
+        org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults();
+        validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed.
+        String errorMsg = "Validation Errors : ";
+        if (this.orchestratorConfiguration.isEnableValidation()) {
+            List<String> validatorClzzez = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
+            for (String validator : validatorClzzez) {
+                try {
+                    Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+                    JobMetadataValidator jobMetadataValidator = vClass.newInstance();
+                    validationResults = jobMetadataValidator.validate(experiment, processModel);
+                    if (validationResults.isValidationState()) {
+                        logger.info("Validation of " + validator + " is SUCCESSFUL");
+                    } else {
+                        List<ValidatorResult> validationResultList = validationResults.getValidationResultList();
+                        for (ValidatorResult result : validationResultList){
+                            if (!result.isResult()){
+                                String validationError = result.getErrorDetails();
+                                if (validationError != null){
+                                    errorMsg += validationError + " ";
+                                }
+                            }
+                        }
+                        logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg);
+                        validationResults.setValidationState(false);
+                        try {
+                            ErrorModel details = new ErrorModel();
+                            details.setActualErrorMessage(errorMsg);
+                            details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+                            orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, details,
+                                    processModel.getProcessId());
+                        } catch (RegistryException e) {
+                            logger.error("Error while saving error details to registry", e);
+                        }
+                        break;
+                    }
+                } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                } /*catch (InstantiationException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                } catch (IllegalAccessException e) {
+                    logger.error("Error loading the validation class: ", validator, e);
+                    validationResults.setValidationState(false);
+                }*/
+            }
+        }
+        if(validationResults.isValidationState()){
+            return validationResults;
+        }else {
+            //atleast one validation has failed, so we throw an exception
+            LaunchValidationException launchValidationException = new LaunchValidationException();
+            launchValidationException.setValidationResult(validationResults);
+            launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg);
+            throw launchValidationException;
+        }
+    }
+
+
+    public void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId)
+            throws OrchestratorException {
+        // FIXME
+//        List<JobDetails> jobDetailsList = task.getJobDetailsList();
+//        for(JobDetails jobDetails:jobDetailsList) {
+//            JobState jobState = jobDetails.getJobStatus().getJobState();
+//            if (jobState.getValue() > 4){
+//                logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() +
+//                "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName());
+//                return;
+//            }
+//        }
+//        jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId);
+    }
+
+
+    public ExecutorService getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(ExecutorService executorIn) {
+        this.executor = executorIn;
+    }
+
+    public JobSubmitter getJobSubmitter() {
+        return jobSubmitter;
+    }
+
+    public void setJobSubmitter(JobSubmitter jobSubmitterIn) {
+        this.jobSubmitter = jobSubmitterIn;
+    }
+
+    public void initialize() throws OrchestratorException {
+
+    }
+
+    public List<ProcessModel> createProcesses (String experimentId, String gatewayId) throws OrchestratorException {
+        List<ProcessModel> processModels = new ArrayList<ProcessModel>();
+        try {
+            Registry registry = orchestratorContext.getRegistry();
+            ExperimentModel experimentModel = (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            List<Object> processList = registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
+            if (processList != null && !processList.isEmpty()) {
+                for (Object processObject : processList) {
+                    ProcessModel processModel = (ProcessModel)processObject;
+                    processModels.add(processModel);
+                }
+            }else {
+                ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
+                String processId = (String)registry.getExperimentCatalog().add(ExpCatChildDataType.PROCESS, processModel, experimentId);
+                processModel.setProcessId(processId);
+                processModels.add(processModel);
+            }
+        } catch (Exception e) {
+            throw new OrchestratorException("Error during creating process");
+        }
+        return processModels;
+    }
+
+    public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException {
+        try {
+            ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog();
+            AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog();
+            ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule();
+            String userGivenQueueName = resourceSchedule.getQueueName();
+            int userGivenWallTime = resourceSchedule.getWallTimeLimit();
+            String resourceHostId = resourceSchedule.getResourceHostId();
+            if (resourceHostId == null){
+                throw new OrchestratorException("Compute Resource Id cannot be null at this point");
+            }
+            ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
+            JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId);
+            ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+            List<String> taskIdList = new ArrayList<>();
+
+            if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) {
+                // TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here.
+                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime));
+            } else {
+                taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog));
+                taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
+                if (autoSchedule) {
+                    List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues();
+                    for (BatchQueue batchQueue : definedBatchQueues) {
+                        if (batchQueue.getQueueName().equals(userGivenQueueName)) {
+                            int maxRunTime = batchQueue.getMaxRunTime();
+                            if (maxRunTime < userGivenWallTime) {
+                                resourceSchedule.setWallTimeLimit(maxRunTime);
+                                // need to create more job submissions
+                                int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime));
+                                for (int i = 1; i <= numOfMaxWallTimeJobs; i++) {
+                                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime));
+                                }
+                                int leftWallTime = userGivenWallTime % maxRunTime;
+                                if (leftWallTime != 0) {
+                                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime));
+                                }
+                            } else {
+                                taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+                            }
+                        }
+                    }
+                } else {
+                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime));
+                }
+                taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+            }
+            // update process scheduling
+            experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId());
+            return getTaskDag(taskIdList);
+        } catch (Exception e) {
+            throw new OrchestratorException("Error during creating process");
+        }
+    }
+
+    private String getTaskDag(List<String> taskIdList) {
+        if (taskIdList.isEmpty()) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (String s : taskIdList) {
+            sb.append(s).append(","); // comma separated values
+        }
+        String dag = sb.toString();
+        return dag.substring(0, dag.length() - 1); // remove last comma
+    }
+
+    private List<String> createAndSaveEnvSetupTask(String gatewayId,
+                                                   ProcessModel processModel,
+                                                   ExperimentCatalog experimentCatalog)
+            throws RegistryException, TException {
+        List<String> envTaskIds = new ArrayList<>();
+        TaskModel envSetupTask = new TaskModel();
+        envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
+        envSetupTask.setTaskStatus(new TaskStatus(TaskState.CREATED));
+        envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+        envSetupTask.setParentProcessId(processModel.getProcessId());
+        EnvironmentSetupTaskModel envSetupSubModel = new EnvironmentSetupTaskModel();
+        envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(orchestratorContext, processModel, gatewayId));
+        ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+        String scratchLocation = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId);
+        String workingDir = scratchLocation + File.separator + processModel.getProcessId();
+        envSetupSubModel.setLocation(workingDir);
+        byte[] envSetupSub = ThriftUtils.serializeThriftObject(envSetupSubModel);
+        envSetupTask.setSubTaskModel(envSetupSub);
+        String envSetupTaskId = (String) experimentCatalog.add(ExpCatChildDataType.TASK, envSetupTask, processModel.getProcessId());
+        envSetupTask.setTaskId(envSetupTaskId);
+        envTaskIds.add(envSetupTaskId);
+        return envTaskIds;
+    }
+
+    public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+        List<String> dataStagingTaskIds = new ArrayList<>();
+        List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+
+        sortByInputOrder(processInputs);
+        if (processInputs != null) {
+            for (InputDataObjectType processInput : processInputs) {
+                DataType type = processInput.getType();
+                switch (type) {
+                    case STDERR:
+                        break;
+                    case STDOUT:
+                        break;
+                    case URI:
+                    case URI_COLLECTION:
+                        try {
+                            TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId);
+                            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask,
+                                    processModel.getProcessId());
+                            inputDataStagingTask.setTaskId(taskId);
+                            dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
+                        } catch (TException | AppCatalogException | TaskException e) {
+                            throw new RegistryException("Error while serializing data staging sub task model");
+                        }
+                        break;
+                    default:
+                        // nothing to do
+                        break;
+                }
+            }
+        }
+        return dataStagingTaskIds;
+    }
+
+    public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException {
+        List<String> dataStagingTaskIds = new ArrayList<>();
+        List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+        String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel);
+        if (processOutputs != null) {
+            for (OutputDataObjectType processOutput : processOutputs) {
+                DataType type = processOutput.getType();
+                switch (type) {
+                    case STDOUT :
+                        processOutput.setValue(appName + ".stdout");
+                        createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+                        break;
+                    case STDERR:
+                        processOutput.setValue(appName + ".stderr");
+                        createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+                        break;
+                    case URI:
+                        createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
+                        break;
+                    default:
+                        // nothing to do
+                        break;
+                }
+            }
+        }
+
+        try {
+            if (isArchive(processModel, gatewayId)) {
+                createArchiveDataStatgingTask(processModel, gatewayId, dataStagingTaskIds);
+            }
+        } catch (AppCatalogException e) {
+            throw new RegistryException("Error! Application interface retrieval failed");
+        }
+        return dataStagingTaskIds;
+    }
+
+    private boolean isArchive(ProcessModel processModel, String gatewayId) throws AppCatalogException {
+        AppCatalog appCatalog = RegistryFactory.getAppCatalog();
+        ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId());
+        return appInterface.isArchiveWorkingDirectory();
+    }
+
+    private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+        TaskModel archiveTask = null;
+        try {
+            archiveTask = getOutputDataStagingTask(processModel, null, gatewayId);
+        } catch (TException e) {
+            throw new RegistryException("Error! DataStaging sub task serialization failed");
+        }
+        String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask,
+                processModel.getProcessId());
+        archiveTask.setTaskId(taskId);
+        dataStagingTaskIds.add(archiveTask.getTaskId());
+
+    }
+
+    private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
+        try {
+            TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+            String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+                    processModel.getProcessId());
+            outputDataStagingTask.setTaskId(taskId);
+            dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+        } catch (TException e) {
+            throw new RegistryException("Error while serializing data staging sub task model", e);
+        }
+    }
+
+    private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime)
+            throws TException, RegistryException, OrchestratorException {
+
+        JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+        MonitorMode monitorMode = null;
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+            SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId());
+            monitorMode = sshJobSubmission.getMonitorMode();
+        } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+            monitorMode = MonitorMode.FORK;
+        } else {
+            logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.",
+                    processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name());
+            throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name());
+        }
+        List<String> submissionTaskIds = new ArrayList<>();
+        TaskModel taskModel = new TaskModel();
+        taskModel.setParentProcessId(processModel.getProcessId());
+        taskModel.setCreationTime(new Date().getTime());
+        taskModel.setLastUpdateTime(taskModel.getCreationTime());
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskModel.setTaskStatus(taskStatus);
+        taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+        JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
+        submissionSubTask.setMonitorMode(monitorMode);
+        submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
+        submissionSubTask.setWallTime(wallTime);
+        byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
+        taskModel.setSubTaskModel(bytes);
+        String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
+                processModel.getProcessId());
+        taskModel.setTaskId(taskId);
+        submissionTaskIds.add(taskModel.getTaskId());
+
+        // create monitor task for this Email based monitor mode job
+        if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+            TaskModel monitorTaskModel = new TaskModel();
+            monitorTaskModel.setParentProcessId(processModel.getProcessId());
+            monitorTaskModel.setCreationTime(new Date().getTime());
+            monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
+            TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+            monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            monitorTaskModel.setTaskStatus(monitorTaskStatus);
+            monitorTaskModel.setTaskType(TaskTypes.MONITORING);
+            MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
+            monitorSubTaskModel.setMonitorMode(monitorMode);
+            monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
+            String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, monitorTaskModel, processModel.getProcessId());
+            monitorTaskModel.setTaskId(mTaskId);
+            submissionTaskIds.add(monitorTaskModel.getTaskId());
+        }
+
+        return submissionTaskIds;
+    }
+
+    private void sortByInputOrder(List<InputDataObjectType> processInputs) {
+        Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
+                return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
+            }
+        });
+    }
+
+    private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException {
+        // create new task model for this task
+        TaskModel taskModel = new TaskModel();
+        taskModel.setParentProcessId(processModel.getProcessId());
+        taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+        taskModel.setLastUpdateTime(taskModel.getCreationTime());
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskModel.setTaskStatus(taskStatus);
+        taskModel.setTaskType(TaskTypes.DATA_STAGING);
+        // create data staging sub task model
+        DataStagingTaskModel submodel = new DataStagingTaskModel();
+        ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+        ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+        String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
+        remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+        URI destination = null;
+        try {
+            DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+            String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId);
+            destination = new URI(dataMovementProtocol.name(),
+                    loginUserName,
+                    computeResource.getHostName(),
+                    OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+                    remoteOutputDir , null, null);
+        } catch (URISyntaxException e) {
+            throw new TaskException("Error while constructing destination file URI");
+        }
+        submodel.setType(DataStageType.INPUT);
+        submodel.setSource(processInput.getValue());
+        submodel.setProcessInput(processInput);
+        submodel.setDestination(destination.toString());
+        taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+        return taskModel;
+    }
+
+    private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+        try {
+
+            // create new task model for this task
+            TaskModel taskModel = new TaskModel();
+            taskModel.setParentProcessId(processModel.getProcessId());
+            taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+            taskModel.setLastUpdateTime(taskModel.getCreationTime());
+            TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskModel.setTaskStatus(taskStatus);
+            taskModel.setTaskType(TaskTypes.DATA_STAGING);
+            ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId);
+            ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+
+            String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId();
+            remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+            DataStagingTaskModel submodel = new DataStagingTaskModel();
+            DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId);
+            URI source = null;
+            try {
+                String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId);
+                if (processOutput != null) {
+                    submodel.setType(DataStageType.OUPUT);
+                    submodel.setProcessOutput(processOutput);
+                    source = new URI(dataMovementProtocol.name(),
+                            loginUserName,
+                            computeResource.getHostName(),
+                            OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+                            remoteOutputDir + processOutput.getValue(), null, null);
+                } else {
+                    // archive
+                    submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+                    source = new URI(dataMovementProtocol.name(),
+                            loginUserName,
+                            computeResource.getHostName(),
+                            OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId),
+                            remoteOutputDir, null, null);
+                }
+            } catch (URISyntaxException e) {
+                throw new TaskException("Error while constructing source file URI");
+            }
+            // We don't know destination location at this time, data staging task will set this.
+            // because destination is required field we set dummy destination
+            submodel.setSource(source.toString());
+            // We don't know destination location at this time, data staging task will set this.
+            // because destination is required field we set dummy destination
+            submodel.setDestination("dummy://temp/file/location");
+            taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+            return taskModel;
+        } catch (AppCatalogException | TaskException e) {
+           throw new RegistryException("Error occurred while retrieving data movement from app catalog", e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml
----------------------------------------------------------------------
diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml
new file mode 100644
index 0000000..0761b04
--- /dev/null
+++ b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml	
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file 
+    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under 
+    the Apache License, Version 2.0 (the� "License"); you may not use this file except in compliance with the License. You may 
+    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to 
+    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
+    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
+    the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata</artifactId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>orchestrator</artifactId>
+    <packaging>pom</packaging>
+    <name>Airavata Orchestrator</name>
+    <url>http://airavata.apache.org/</url>
+
+    <profiles>
+        <profile>
+            <id>default</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <modules>
+                <module>orchestrator-core</module>
+                <module>orchestrator-service</module>
+                <module>orchestrator-client</module>
+            </modules>
+        </profile>
+    </profiles>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+
+</project>