You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/13 17:49:49 UTC
[3/4] airavata git commit: Changing package name to core from commons
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
deleted file mode 100644
index 53bda5b..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
+++ /dev/null
@@ -1,806 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.context;
-
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
-import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
-import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
-import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
-import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
-import org.apache.airavata.model.data.movement.DataMovementProtocol;
-import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.model.task.TaskModel;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.airavata.worker.commons.authentication.SSHKeyAuthentication;
-import org.apache.airavata.worker.commons.cluster.RemoteCluster;
-import org.apache.airavata.worker.commons.cluster.ServerInfo;
-import org.apache.airavata.worker.commons.exceptions.WorkerException;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class ProcessContext {
-
- private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
- // process model
- private ExperimentCatalog experimentCatalog;
- private AppCatalog appCatalog;
- private CuratorFramework curatorClient;
- private Publisher statusPublisher;
- private final String processId;
- private final String gatewayId;
- private final String tokenId;
- private ProcessModel processModel;
- private String workingDir;
- private String scratchLocation;
- private String inputDir;
- private String outputDir;
- private String localWorkingDir;
- private GatewayResourceProfile gatewayResourceProfile;
- private ComputeResourcePreference gatewayComputeResourcePreference;
- private StoragePreference gatewayStorageResourcePreference;
- private UserResourceProfile userResourceProfile;
- private UserComputeResourcePreference userComputeResourcePreference;
- private UserStoragePreference userStoragePreference;
- private ComputeResourceDescription computeResourceDescription;
- private ApplicationDeploymentDescription applicationDeploymentDescription;
- private ApplicationInterfaceDescription applicationInterfaceDescription;
- private RemoteCluster jobSubmissionRemoteCluster;
- private RemoteCluster dataMovementRemoteCluster;
- private Map<String, String> sshProperties;
- private String stdoutLocation;
- private String stderrLocation;
- private JobSubmissionProtocol jobSubmissionProtocol;
- private DataMovementProtocol dataMovementProtocol;
- private JobModel jobModel;
- private StorageResourceDescription storageResource;
- private MonitorMode monitorMode;
- private ResourceJobManager resourceJobManager;
- private boolean handOver;
- private boolean cancel;
- private ServerInfo serverInfo;
- private List<String> taskExecutionOrder;
- private List<TaskModel> taskList;
- private Map<String, TaskModel> taskMap;
- private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
- private boolean complete = false; // all tasks executed?
- private boolean recovery = false; // is process in recovery mode?
- private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
- private boolean acknowledge;
- private SSHKeyAuthentication sshKeyAuthentication;
- private boolean recoveryWithCancel = false;
- private String usageReportingGatewayId;
-
- /**
- * Note: process context property use lazy loading approach. In runtime you will see some properties as null
- * unless you have access it previously. Once that property access using the api,it will be set to correct value.
- */
- private ProcessContext(String processId, String gatewayId, String tokenId) {
- this.processId = processId;
- this.gatewayId = gatewayId;
- this.tokenId = tokenId;
- }
-
- public ExperimentCatalog getExperimentCatalog() {
- return experimentCatalog;
- }
-
- public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
- this.experimentCatalog = experimentCatalog;
- }
-
- public AppCatalog getAppCatalog() {
- return appCatalog;
- }
-
- public void setAppCatalog(AppCatalog appCatalog) {
- this.appCatalog = appCatalog;
- }
-
- public String getGatewayId() {
- return gatewayId;
- }
-
- public String getTokenId() {
- return tokenId;
- }
-
- public String getProcessId() {
- return processId;
- }
-
- public CuratorFramework getCuratorClient() {
- return curatorClient;
- }
-
- public void setCuratorClient(CuratorFramework curatorClient) {
- this.curatorClient = curatorClient;
- }
-
- public Publisher getStatusPublisher() {
- return statusPublisher;
- }
-
- public void setStatusPublisher(Publisher statusPublisher) {
- this.statusPublisher = statusPublisher;
- }
-
- public ProcessModel getProcessModel() {
- return processModel;
- }
-
- public void setProcessModel(ProcessModel processModel) {
- this.processModel = processModel;
- }
-
- public String getWorkingDir() {
- if (workingDir == null) {
- if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
- workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
- }else {
- String scratchLocation = getScratchLocation();
- workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" +
- processId);
- }
- }
- return workingDir;
- }
-
- public String getScratchLocation() {
- if (scratchLocation == null) {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getScratchLocation())) {
- scratchLocation = userComputeResourcePreference.getScratchLocation();
- } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
- scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
- }else {
- scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
- }
- }
- return scratchLocation;
- }
-
- public void setWorkingDir(String workingDir) {
- this.workingDir = workingDir;
- }
-
- public GatewayResourceProfile getGatewayResourceProfile() {
- return gatewayResourceProfile;
- }
-
- public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
- this.gatewayResourceProfile = gatewayResourceProfile;
- }
-
- public UserResourceProfile getUserResourceProfile() {
- return userResourceProfile;
- }
-
- public void setUserResourceProfile(UserResourceProfile userResourceProfile) {
- this.userResourceProfile = userResourceProfile;
- }
-
- private UserComputeResourcePreference getUserComputeResourcePreference() {
- return userComputeResourcePreference;
- }
-
- public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
- this.userComputeResourcePreference = userComputeResourcePreference;
- }
-
- public UserStoragePreference getUserStoragePreference() {
- return userStoragePreference;
- }
-
- public void setUserStoragePreference(UserStoragePreference userStoragePreference) {
- this.userStoragePreference = userStoragePreference;
- }
-
- public StoragePreference getGatewayStorageResourcePreference() {
- return gatewayStorageResourcePreference;
- }
-
- public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
- this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
- }
-
- public RemoteCluster getJobSubmissionRemoteCluster() {
- return jobSubmissionRemoteCluster;
- }
-
- public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
- this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
- }
-
- public RemoteCluster getDataMovementRemoteCluster() {
- return dataMovementRemoteCluster;
- }
-
- public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
- this.dataMovementRemoteCluster = dataMovementRemoteCluster;
- }
-
- public Map<String, String> getSshProperties() {
- return sshProperties;
- }
-
- public void setSshProperties(Map<String, String> sshProperties) {
- this.sshProperties = sshProperties;
- }
-
- public ComputeResourceDescription getComputeResourceDescription() {
- return computeResourceDescription;
- }
-
- public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
- this.computeResourceDescription = computeResourceDescription;
- }
-
- public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
- return applicationDeploymentDescription;
- }
-
- public void setApplicationDeploymentDescription(ApplicationDeploymentDescription
- applicationDeploymentDescription) {
- this.applicationDeploymentDescription = applicationDeploymentDescription;
- }
-
- public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
- return applicationInterfaceDescription;
- }
-
- public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
- this.applicationInterfaceDescription = applicationInterfaceDescription;
- }
-
- public String getStdoutLocation() {
- return stdoutLocation;
- }
-
- public void setStdoutLocation(String stdoutLocation) {
- this.stdoutLocation = stdoutLocation;
- }
-
- public String getStderrLocation() {
- return stderrLocation;
- }
-
- public void setStderrLocation(String stderrLocation) {
- this.stderrLocation = stderrLocation;
- }
-
- public void setOutputDir(String outputDir) {
- this.outputDir = outputDir;
- }
-
- public String getOutputDir() {
- if (outputDir == null) {
- outputDir = getWorkingDir();
- }
- return outputDir;
- }
-
- public String getInputDir() {
- if (inputDir == null) {
- inputDir = getWorkingDir();
- }
- return inputDir;
- }
-
- public void setInputDir(String inputDir) {
- this.inputDir = inputDir;
- }
-
- public JobSubmissionProtocol getJobSubmissionProtocol() {
- if (jobSubmissionProtocol == null) {
- jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
- }
- return jobSubmissionProtocol;
- }
-
- public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
- this.jobSubmissionProtocol = jobSubmissionProtocol;
- }
-
- public DataMovementProtocol getDataMovementProtocol() {
- if (dataMovementProtocol == null) {
- dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
- }
- return dataMovementProtocol;
- }
-
- public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
- this.dataMovementProtocol = dataMovementProtocol;
- }
-
- public String getTaskDag() {
- return getProcessModel().getTaskDag();
- }
-
- public List<TaskModel> getTaskList() {
- if (taskList == null) {
- synchronized (TaskModel.class){
- if (taskList == null) {
- taskList = getProcessModel().getTasks();
- }
- }
- }
- return taskList;
- }
-
-
- public List<String> getTaskExecutionOrder() {
- return taskExecutionOrder;
- }
-
- public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
- this.taskExecutionOrder = taskExecutionOrder;
- }
-
- public Map<String, TaskModel> getTaskMap() {
- if (taskMap == null) {
- synchronized (TaskModel.class) {
- if (taskMap == null) {
- taskMap = new HashMap<>();
- for (TaskModel taskModel : getTaskList()) {
- taskMap.put(taskModel.getTaskId(), taskModel);
- }
- }
- }
- }
- return taskMap;
- }
-
- public JobModel getJobModel() {
- if (jobModel == null) {
- jobModel = new JobModel();
- jobModel.setProcessId(processId);
- jobModel.setWorkingDir(getWorkingDir());
- jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- }
- return jobModel;
- }
-
- public void setJobModel(JobModel jobModel) {
- this.jobModel = jobModel;
- }
-
- private ComputeResourcePreference getGatewayComputeResourcePreference() {
- return gatewayComputeResourcePreference;
- }
-
- public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
- this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
- }
-
- public ProcessState getProcessState() {
- if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0)
- return processModel.getProcessStatuses().get(0).getState();
- else
- return null;
- }
-
- public void setProcessStatus(ProcessStatus status) {
- if (status != null) {
- log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
- getProcessState().name(), status.getState().name());
- List<ProcessStatus> processStatuses = new ArrayList<>();
- processStatuses.add(status);
- processModel.setProcessStatuses(processStatuses);
- }
- }
-
- public ProcessStatus getProcessStatus(){
- if(processModel.getProcessStatuses() != null)
- return processModel.getProcessStatuses().get(0);
- else
- return null;
- }
-
- public String getComputeResourceId() {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getComputeResourceId())) {
- return userComputeResourcePreference.getComputeResourceId();
- } else {
- return gatewayComputeResourcePreference.getComputeResourceId();
- }
- }
-
- public String getComputeResourceCredentialToken(){
- if (isUseUserCRPref()) {
- if (userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
- } else {
- return userResourceProfile.getCredentialStoreToken();
- }
- } else {
- if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
- } else {
- return gatewayResourceProfile.getCredentialStoreToken();
- }
- }
- }
-
- public String getStorageResourceCredentialToken(){
- if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
- return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
- } else {
- return gatewayResourceProfile.getCredentialStoreToken();
- }
- }
-
- public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
- return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
- }
-
- public DataMovementProtocol getPreferredDataMovementProtocol() {
- return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
- }
-
- public void setMonitorMode(MonitorMode monitorMode) {
- this.monitorMode = monitorMode;
- }
-
- public MonitorMode getMonitorMode() {
- return monitorMode;
- }
-
- public void setResourceJobManager(ResourceJobManager resourceJobManager) {
- this.resourceJobManager = resourceJobManager;
- }
-
- public ResourceJobManager getResourceJobManager() {
- return resourceJobManager;
- }
-
- public String getLocalWorkingDir() {
- return localWorkingDir;
- }
-
- public void setLocalWorkingDir(String localWorkingDir) {
- this.localWorkingDir = localWorkingDir;
- }
-
- public String getExperimentId() {
- return processModel.getExperimentId();
- }
-
- public boolean isHandOver() {
- return handOver;
- }
-
- public void setHandOver(boolean handOver) {
- this.handOver = handOver;
- }
-
- public boolean isCancel() {
- return cancel;
- }
-
- public void setCancel(boolean cancel) {
- this.cancel = cancel;
- }
-
- public boolean isInterrupted(){
- return this.cancel || this.handOver;
- }
-
- public String getCurrentExecutingTaskId() {
- if (currentExecutingTaskModel != null) {
- return currentExecutingTaskModel.getTaskId();
- }
- return null;
- }
-
- public boolean isPauseTaskExecution() {
- return pauseTaskExecution;
- }
-
- public void setPauseTaskExecution(boolean pauseTaskExecution) {
- this.pauseTaskExecution = pauseTaskExecution;
- }
-
- public boolean isComplete() {
- return complete;
- }
-
- public void setComplete(boolean complete) {
- this.complete = complete;
- }
-
- public boolean isRecovery() {
- return recovery;
- }
-
- public void setRecovery(boolean recovery) {
- this.recovery = recovery;
- }
-
- public TaskModel getCurrentExecutingTaskModel() {
- return currentExecutingTaskModel;
- }
-
- public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
- this.currentExecutingTaskModel = currentExecutingTaskModel;
- }
-
- public StorageResourceDescription getStorageResource() {
- return storageResource;
- }
-
- public void setStorageResource(StorageResourceDescription storageResource) {
- this.storageResource = storageResource;
- }
-
- public void setAcknowledge(boolean acknowledge) {
- this.acknowledge = acknowledge;
- }
-
- public boolean isAcknowledge() {
- return acknowledge;
- }
-
- public boolean isRecoveryWithCancel() {
- return recoveryWithCancel;
- }
-
- public void setRecoveryWithCancel(boolean recoveryWithCancel) {
- this.recoveryWithCancel = recoveryWithCancel;
- }
-
- public boolean isUseUserCRPref() {
- return getProcessModel().isUseUserCRPref();
- }
-
- public String getComputeResourceLoginUserName(){
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getLoginUserName())) {
- return userComputeResourcePreference.getLoginUserName();
- } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
- return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
- } else {
- return gatewayComputeResourcePreference.getLoginUserName();
- }
- }
-
- public String getStorageResourceLoginUserName(){
- return gatewayStorageResourcePreference.getLoginUserName();
- }
-
- public String getStorageFileSystemRootLocation(){
- return gatewayStorageResourcePreference.getFileSystemRootLocation();
- }
-
- public String getStorageResourceId() {
- return gatewayStorageResourcePreference.getStorageResourceId();
- }
-
- private ComputationalResourceSchedulingModel getProcessCRSchedule() {
- if (getProcessModel() != null) {
- return getProcessModel().getProcessResourceSchedule();
- } else {
- return null;
- }
- }
-
- public ServerInfo getComputeResourceServerInfo(){
- return new ServerInfo(getComputeResourceLoginUserName(),
- getComputeResourceDescription().getHostName(),
- getComputeResourceCredentialToken());
- }
-
- public ServerInfo getStorageResourceServerInfo() {
- return new ServerInfo(getStorageResourceLoginUserName(),
- getStorageResource().getHostName(),
- getStorageResourceCredentialToken());
- }
-
- private boolean isValid(String str) {
- return str != null && !str.trim().isEmpty();
- }
-
- public String getUsageReportingGatewayId() {
- return gatewayComputeResourcePreference.getUsageReportingGatewayId();
- }
-
- public String getAllocationProjectNumber() {
- return gatewayComputeResourcePreference.getAllocationProjectNumber();
- }
-
- public String getReservation() {
- long start = 0, end = 0;
- String reservation = null;
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getReservation())) {
- reservation = userComputeResourcePreference.getReservation();
- start = userComputeResourcePreference.getReservationStartTime();
- end = userComputeResourcePreference.getReservationEndTime();
- } else {
- reservation = gatewayComputeResourcePreference.getReservation();
- start = gatewayComputeResourcePreference.getReservationStartTime();
- end = gatewayComputeResourcePreference.getReservationEndTime();
- }
- if (reservation != null && start > 0 && start < end) {
- long now = Calendar.getInstance().getTimeInMillis();
- if (now > start && now < end) {
- return reservation;
- }
- }
- return null;
- }
-
- public String getQualityOfService() {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getQualityOfService())) {
- return userComputeResourcePreference.getQualityOfService();
- } else {
- return gatewayComputeResourcePreference.getQualityOfService();
- }
- }
-
-
- public String getQueueName() {
- if (isUseUserCRPref() &&
- userComputeResourcePreference != null &&
- isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
- return userComputeResourcePreference.getPreferredBatchQueue();
- } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
- return processModel.getProcessResourceSchedule().getQueueName();
- } else {
- return gatewayComputeResourcePreference.getPreferredBatchQueue();
- }
- }
-
- public static class ProcessContextBuilder{
- private final String processId;
- private final String gatewayId;
- private final String tokenId;
- private ExperimentCatalog experimentCatalog;
- private AppCatalog appCatalog;
- private CuratorFramework curatorClient;
- private Publisher statusPublisher;
- private GatewayResourceProfile gatewayResourceProfile;
- private ComputeResourcePreference gatewayComputeResourcePreference;
- private StoragePreference gatewayStorageResourcePreference;
- private ProcessModel processModel;
-
- public ProcessContextBuilder(String processId, String gatewayId, String tokenId) throws WorkerException {
- if (notValid(processId) || notValid(gatewayId) || notValid(tokenId)) {
- throwError("Process Id, Gateway Id and tokenId must be not null");
- }
- this.processId = processId;
- this.gatewayId = gatewayId;
- this.tokenId = tokenId;
- }
-
- public ProcessContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
- this.gatewayResourceProfile = gatewayResourceProfile;
- return this;
- }
-
- public ProcessContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
- this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
- return this;
- }
-
- public ProcessContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
- this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
- return this;
- }
-
- public ProcessContextBuilder setProcessModel(ProcessModel processModel) {
- this.processModel = processModel;
- return this;
- }
-
- public ProcessContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) {
- this.experimentCatalog = experimentCatalog;
- return this;
- }
-
- public ProcessContextBuilder setAppCatalog(AppCatalog appCatalog) {
- this.appCatalog = appCatalog;
- return this;
- }
-
- public ProcessContextBuilder setCuratorClient(CuratorFramework curatorClient) {
- this.curatorClient = curatorClient;
- return this;
- }
-
- public ProcessContextBuilder setStatusPublisher(Publisher statusPublisher) {
- this.statusPublisher = statusPublisher;
- return this;
- }
-
- public ProcessContext build() throws WorkerException {
- if (notValid(gatewayResourceProfile)) {
- throwError("Invalid GatewayResourceProfile");
- }
- if (notValid(gatewayComputeResourcePreference)) {
- throwError("Invalid Gateway ComputeResourcePreference");
- }
- if (notValid(gatewayStorageResourcePreference)) {
- throwError("Invalid Gateway StoragePreference");
- }
- if (notValid(processModel)) {
- throwError("Invalid Process Model");
- }
- if (notValid(appCatalog)) {
- throwError("Invalid AppCatalog");
- }
- if (notValid(experimentCatalog)) {
- throwError("Invalid Experiment catalog");
- }
- if (notValid(curatorClient)) {
- throwError("Invalid Curator Client");
- }
- if (notValid(statusPublisher)) {
- throwError("Invalid Status Publisher");
- }
-
- ProcessContext pc = new ProcessContext(processId, gatewayId, tokenId);
- pc.setAppCatalog(appCatalog);
- pc.setExperimentCatalog(experimentCatalog);
- pc.setCuratorClient(curatorClient);
- pc.setStatusPublisher(statusPublisher);
- pc.setProcessModel(processModel);
- pc.setGatewayResourceProfile(gatewayResourceProfile);
- pc.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
- pc.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
-
- return pc;
- }
-
- private boolean notValid(Object value) {
- return value == null;
- }
-
- private void throwError(String msg) throws WorkerException {
- throw new WorkerException(msg);
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
deleted file mode 100644
index f94ebd5..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.context;
-
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.TaskModel;
-import org.apache.airavata.model.task.TaskTypes;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TaskContext {
- private static final Logger log = LoggerFactory.getLogger(TaskContext.class);
-
- private TaskModel taskModel;
- private ProcessContext parentProcessContext;
- private InputDataObjectType processInput;
- private OutputDataObjectType processOutput;
- private Object subTaskModel = null;
- private boolean isCancel = false;
-
- public TaskModel getTaskModel() {
- return taskModel;
- }
-
- public void setTaskModel(TaskModel taskModel) {
- this.taskModel = taskModel;
- }
-
- public ProcessContext getParentProcessContext() {
- return parentProcessContext;
- }
-
- public void setParentProcessContext(ProcessContext parentProcessContext) {
- this.parentProcessContext = parentProcessContext;
- }
-
- public String getWorkingDir() {
- return getParentProcessContext().getWorkingDir();
- }
-
- public void setTaskStatus(TaskStatus taskStatus) {
- log.info("expId: {}, processId: {}, taskId: {}, type: {} : Task status changed {} -> {}", parentProcessContext
- .getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
- getTaskState().name(), taskStatus .getState().name());
- List<TaskStatus> taskStatuses = new ArrayList<>();
- taskStatuses.add(taskStatus);
- taskModel.setTaskStatuses(taskStatuses);
- }
-
- public TaskStatus getTaskStatus() {
- if(taskModel.getTaskStatuses() != null)
- return taskModel.getTaskStatuses().get(0);
- else
- return null;
- }
-
- public TaskState getTaskState() {
- if(taskModel.getTaskStatuses() != null)
- return taskModel.getTaskStatuses().get(0).getState();
- else
- return null;
- }
-
- public TaskTypes getTaskType() {
- return taskModel.getTaskType();
- }
-
- public String getTaskId() {
- return taskModel.getTaskId();
- }
-
- public String getLocalWorkingDir() {
- return getParentProcessContext().getLocalWorkingDir();
- }
-
- public InputDataObjectType getProcessInput() {
- return processInput;
- }
-
- public void setProcessInput(InputDataObjectType processInput) {
- this.processInput = processInput;
- }
-
- public OutputDataObjectType getProcessOutput() {
- return processOutput;
- }
-
- public void setProcessOutput(OutputDataObjectType processOutput) {
- this.processOutput = processOutput;
- }
-
- public String getProcessId() {
- return parentProcessContext.getProcessId();
- }
-
- public String getExperimentId() {
- return parentProcessContext.getExperimentId();
- }
-
- public Object getSubTaskModel() throws TException {
- if (subTaskModel == null) {
- subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
- }
- return subTaskModel;
- }
-
- public boolean isCancel() {
- return isCancel;
- }
-
- public void setCancel(boolean cancel) {
- isCancel = cancel;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
deleted file mode 100644
index 0dcdd0e..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.exceptions;
-
-/**
- * An exception class to wrap SSH command execution related errors.
- */
-public class SSHApiException extends Exception {
-
- public SSHApiException(String message) {
- super(message);
- }
-
- public SSHApiException(String message, Exception e) {
- super(message, e);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
deleted file mode 100644
index 334ee0f..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.airavata.worker.commons.exceptions;
-
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerException extends Exception {
- private static final Logger log = LoggerFactory.getLogger(WorkerException.class);
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public WorkerException(String s) {
- super(s);
- }
-
- public WorkerException(Exception e) {
- super(e);
- log.error(e.getMessage(),e);
- }
-
- public WorkerException(String s, Throwable throwable) {
- super(s, throwable);
- log.error(s,throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
deleted file mode 100644
index b7a8b45..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.task;
-
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.worker.commons.context.TaskContext;
-
-import java.util.Map;
-
-/**
- * All Tasks should inherit this interface.
- */
-public interface Task {
-
- /**
- * Task initialization method, this method will be invoked after create a new task instance.
- * @param propertyMap
- * @throws TaskException
- */
- public void init(Map<String, String> propertyMap) throws TaskException;
-
- /**
- * This method will be called at the first time of task chain execution. This method should called before recover
- * method. For a given task chain execute method only call one time. recover method may be called more than once.
- * @param taskContext
- * @return completed task status if success otherwise failed task status.
- */
- public TaskStatus execute(TaskContext taskContext);
-
- /**
- * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
- * This method may be called zero or few time in a process chain.
- * @param taskContext
- * @return completed task status if success otherwise failed task status.
- */
- public TaskStatus recover(TaskContext taskContext);
-
- /**
- * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
- * @return type of this task object
- */
- public TaskTypes getType();
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
deleted file mode 100644
index d290881..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.task;
-
-public class TaskException extends Exception {
- private static final long serialVersionUID = 8662332011259328779L;
-
- public TaskException() {
- super();
- }
-
- public TaskException(String message) {
- super(message);
- }
-
- public TaskException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public TaskException(Throwable cause) {
- super(cause);
- }
-
- protected TaskException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
deleted file mode 100644
index 022cb79..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.airavata.worker.commons.utils;
-
-import org.apache.airavata.worker.commons.cluster.OutputParser;
-import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
-
-/**
- * Created by goshenoy on 4/12/17.
- */
-public interface JobManagerConfiguration {
-
- public RawCommandInfo getCancelCommand(String jobID);
-
- public String getJobDescriptionTemplateName();
-
- public RawCommandInfo getMonitorCommand(String jobID);
-
- public RawCommandInfo getUserBasedMonitorCommand(String userName);
-
- public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName);
-
- public String getScriptExtension();
-
- public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
-
- public OutputParser getParser();
-
- public String getInstalledPath();
-
- public String getBaseCancelCommand();
-
- public String getBaseMonitorCommand();
-
- public String getBaseSubmitCommand();
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
deleted file mode 100644
index 5fa5fc4..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.worker.commons.utils;
-
-public class WorkerConstants {
- public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
- public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
- public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
-
- public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
- public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
- public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
- public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
-
-
- public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
- public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
- public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
- public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
- public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
-
- public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
- public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
- public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
- public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
- public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
- public static final String NEWLINE = System.getProperty("line.separator");
- public static final String INPUT_DATA_DIR_VAR_NAME = "input";
- public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
- public static final int DEFAULT_GSI_FTP_PORT = 2811;
- public static final String _127_0_0_1 = "127.0.0.1";
- public static final String LOCALHOST = "localhost";
-
- public static final String MULTIPLE_INPUTS_SPLITTER = ",";
-
- public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
- public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
- public static final String PROP_BROKER_URL = "broker.url";
- public static final String PROP_TOPIC = "topic";
- public static final String SPACE = " ";
- public static final int COMMAND_EXECUTION_TIMEOUT = 5;
- public static final String EXECUTABLE_NAME = "run.sh";
-
- public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
- public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
- public static final String MYPROXY_SERVER = "myproxy.server";
- public static final String MYPROXY_SERVER_PORT = "myproxy.port";
- public static final String MYPROXY_USER = "myproxy.username";
- public static final String MYPROXY_PASS = "myproxy.password";
- public static final String MYPROXY_LIFE = "myproxy.life";
- /*
- * SSH properties
- */
- public static final String SSH_PRIVATE_KEY = "private.ssh.key";
- public static final String SSH_PUBLIC_KEY = "public.ssh.key";
- public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
- public static final String SSH_USER_NAME = "ssh.username";
- public static final String SSH_PASSWORD = "ssh.password";
- public static final String PROPERTY = "property";
- public static final String NAME = "name";
- public static final String VALUE = "value";
- public static final String OUTPUT_DATA_DIR = "output.location";
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
deleted file mode 100644
index 4ab403f..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.worker.commons.utils;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.worker.commons.config.ResourceConfig;
-import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
-import org.apache.airavata.worker.commons.exceptions.WorkerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by goshenoy on 4/13/17.
- */
-public class WorkerFactory {
- private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
-
- private static boolean isWorkerConfigurationLoaded = false;
- private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
- private static Cache<String,Session> sessionCache;
-
- public static void loadConfiguration() throws WorkerException {
- if (!isWorkerConfigurationLoaded) {
- WorkerYamlConfigruation config = new WorkerYamlConfigruation();
- try {
- for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
- resources.put(resourceConfig.getJobManagerType(), resourceConfig);
- }
- } catch (Exception e) {
- throw new WorkerException("Worker config issue", e);
- }
-
- sessionCache = CacheBuilder.newBuilder()
- .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
- .removalListener((RemovalListener<String, Session>) removalNotification -> {
- if (removalNotification.getValue().isConnected()) {
- log.info("Disconnecting ssh session with key: " + removalNotification.getKey());
- removalNotification.getValue().disconnect();
- }
- log.info("Removed ssh session with key: " + removalNotification.getKey());
- })
- .build();
-
- isWorkerConfigurationLoaded = true;
- }
- }
-
- public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() {
- return resources;
- }
-
- public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
- return resources.get(resourceJobManagerType);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
deleted file mode 100644
index dd244fb..0000000
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
+++ /dev/null
@@ -1,214 +0,0 @@
-package org.apache.airavata.worker.commons.utils;
-
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.airavata.model.status.*;
-import org.apache.airavata.registry.cpi.*;
-import org.apache.airavata.registry.cpi.utils.Constants;
-import org.apache.airavata.worker.commons.context.ProcessContext;
-import org.apache.airavata.worker.commons.context.TaskContext;
-import org.apache.airavata.worker.commons.exceptions.WorkerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Created by goshenoy on 4/12/17.
- */
-public class WorkerUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(WorkerUtils.class);
-
- public static void saveExperimentError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
- try {
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- String experimentId = processContext.getExperimentId();
- errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR"));
- experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId);
- } catch (RegistryException e) {
- String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
- + " : - Error while updating experiment errors";
- throw new WorkerException(msg, e);
- }
- }
-
- public static void saveProcessError(ProcessContext processContext, ErrorModel errorModel) throws WorkerException {
- try {
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR"));
- experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, processContext.getProcessId());
- } catch (RegistryException e) {
- String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
- + " : - Error while updating process errors";
- throw new WorkerException(msg, e);
- }
- }
-
- public static void saveTaskError(TaskContext taskContext, ErrorModel errorModel) throws WorkerException {
- try {
- ExperimentCatalog experimentCatalog = taskContext.getParentProcessContext().getExperimentCatalog();
- String taskId = taskContext.getTaskId();
- errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR"));
- experimentCatalog.add(ExpCatChildDataType.TASK_ERROR, errorModel, taskId);
- } catch (RegistryException e) {
- String msg = "expId: " + taskContext.getParentProcessContext().getExperimentId() + " processId: " + taskContext.getParentProcessContext().getProcessId() + " taskId: " + taskContext.getTaskId()
- + " : - Error while updating task errors";
- throw new WorkerException(msg, e);
- }
- }
-
- public static void handleProcessInterrupt(ProcessContext processContext) throws WorkerException {
- if (processContext.isCancel()) {
- ProcessStatus pStatus = new ProcessStatus(ProcessState.CANCELLING);
- pStatus.setReason("Process Cancel triggered");
- pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(pStatus);
- saveAndPublishProcessStatus(processContext);
- // do cancel operation here
-
- pStatus.setState(ProcessState.CANCELED);
- processContext.setProcessStatus(pStatus);
- pStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- saveAndPublishProcessStatus(processContext);
- }else if (processContext.isHandOver()) {
-
- } else {
- logger.error("expId: {}, processId: {} :- Unknown process interrupt", processContext.getExperimentId(),
- processContext.getProcessId());
- }
- }
-
- public static JobModel getJobModel(ProcessContext processContext) throws RegistryException {
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- List<Object> objects = experimentCatalog.get(ExperimentCatalogModelType.JOB,
- Constants.FieldConstants.JobConstants.PROCESS_ID, processContext.getProcessId());
- List<JobModel> jobModels = new ArrayList<>();
- JobModel jobModel = null;
- if (objects != null) {
- for (Object object : objects) {
- jobModel = ((JobModel) object);
- if (jobModel.getJobId() != null || !jobModel.equals("")) {
- return jobModel;
- }
- }
- }
- return jobModel;
- }
-
- public static List<String> parseTaskDag(String taskDag) {
- // TODO - parse taskDag and create taskId list
- String[] tasks = taskDag.split(",");
- return Arrays.asList(tasks);
- }
-
- public static void saveAndPublishTaskStatus(TaskContext taskContext) throws WorkerException {
- try {
- TaskState state = taskContext.getTaskState();
- // first we save job jobModel to the registry for sa and then save the job status.
- ProcessContext processContext = taskContext.getParentProcessContext();
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- TaskStatus status = taskContext.getTaskStatus();
- if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- }else {
- status.setTimeOfStateChange(status.getTimeOfStateChange());
- }
- experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId());
- TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(),
- processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
- processContext.getGatewayId());
- TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state,
- identifier);
- MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId
- (MessageType.TASK.name()), taskContext.getParentProcessContext().getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- processContext.getStatusPublisher().publish(msgCtx);
- } catch (Exception e) {
- throw new WorkerException("Error persisting task status"
- + e.getLocalizedMessage(), e);
- }
- }
-
- public static void saveAndPublishProcessStatus(ProcessContext processContext) throws WorkerException {
- try {
- // first we save job jobModel to the registry for sa and then save the job status.
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- ProcessStatus status = processContext.getProcessStatus();
- if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- }else {
- status.setTimeOfStateChange(status.getTimeOfStateChange());
- }
- experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId());
- ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(),
- processContext.getProcessModel().getExperimentId(),
- processContext.getGatewayId());
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- processContext.getStatusPublisher().publish(msgCtx);
- } catch (Exception e) {
- throw new WorkerException("Error persisting process status"
- + e.getLocalizedMessage(), e);
- }
- }
-
- public static void saveJobStatus(ProcessContext processContext, JobModel jobModel) throws WorkerException {
- try {
- // first we save job jobModel to the registry for sa and then save the job status.
- JobStatus jobStatus = null;
- if(jobModel.getJobStatuses() != null)
- jobStatus = jobModel.getJobStatuses().get(0);
-
- ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- List<JobStatus> statuses = new ArrayList<>();
- statuses.add(jobStatus);
- jobModel.setJobStatuses(statuses);
- if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ){
- jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- }else {
- jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange());
- }
- CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId());
- experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids);
- JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(),
- processContext.getProcessId(), processContext.getProcessModel().getExperimentId(),
- processContext.getGatewayId());
- JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier);
- MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId
- (MessageType.JOB.name()), processContext.getGatewayId());
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- processContext.getStatusPublisher().publish(msgCtx);
- } catch (Exception e) {
- throw new WorkerException("Error persisting job status"
- + e.getLocalizedMessage(), e);
- }
- }
-
- public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) {
- switch (resourceJobManagerType) {
- case FORK:
- return "UGE_Groovy.template";
- case PBS:
- return "PBS_Groovy.template";
- case SLURM:
- return "SLURM_Groovy.template";
- case UGE:
- return "UGE_Groovy.template";
- case LSF:
- return "LSF_Groovy.template";
- case CLOUD:
- return "CLOUD_Groovy.template";
- default:
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/AuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/AuthenticationInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/AuthenticationInfo.java
new file mode 100644
index 0000000..51e94c8
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/AuthenticationInfo.java
@@ -0,0 +1,27 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * An empty interface that represents authentication data to the API.
+ */
+public interface AuthenticationInfo {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/GSIAuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/GSIAuthenticationInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/GSIAuthenticationInfo.java
new file mode 100644
index 0000000..db33fb2
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/GSIAuthenticationInfo.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+import org.ietf.jgss.GSSCredential;
+
+import java.util.Properties;
+
+/**
+ * Authentication data. Could be MyProxy user name, password, could be GSSCredentials
+ * or could be SSH keys.
+ */
+public abstract class GSIAuthenticationInfo implements AuthenticationInfo {
+
+ public Properties properties = new Properties();
+
+ public abstract GSSCredential getCredentials() throws SecurityException;
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHKeyAuthentication.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHKeyAuthentication.java
new file mode 100644
index 0000000..57a567e
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHKeyAuthentication.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 10/4/13
+ * Time: 2:39 PM
+ */
+
+/**
+ * Abstracts out common methods for SSH key authentication.
+ */
+public class SSHKeyAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private byte[] privateKey;
+ private byte[] publicKey;
+ private String passphrase;
+ private String knownHostsFilePath;
+ private String strictHostKeyChecking; // yes or no
+
+ public SSHKeyAuthentication() {
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public byte[] getPrivateKey() {
+ return privateKey;
+ }
+
+ public void setPrivateKey(byte[] privateKey) {
+ this.privateKey = privateKey;
+ }
+
+ public byte[] getPublicKey() {
+ return publicKey;
+ }
+
+ public void setPublicKey(byte[] publicKey) {
+ this.publicKey = publicKey;
+ }
+
+ public String getPassphrase() {
+ return passphrase;
+ }
+
+ public void setPassphrase(String passphrase) {
+ this.passphrase = passphrase;
+ }
+
+ public String getKnownHostsFilePath() {
+ return knownHostsFilePath;
+ }
+
+ public void setKnownHostsFilePath(String knownHostsFilePath) {
+ this.knownHostsFilePath = knownHostsFilePath;
+ }
+
+ public String getStrictHostKeyChecking() {
+ return strictHostKeyChecking;
+ }
+
+ public void setStrictHostKeyChecking(String strictHostKeyChecking) {
+ this.strictHostKeyChecking = strictHostKeyChecking;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPasswordAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPasswordAuthentication.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPasswordAuthentication.java
new file mode 100644
index 0000000..2f9d372
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPasswordAuthentication.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Password authentication for vanilla SSH.
+ */
+public class SSHPasswordAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private String password;
+
+ public SSHPasswordAuthentication(String userName, String password) {
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyAuthentication.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyAuthentication.java
new file mode 100644
index 0000000..8134065
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyAuthentication.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Public key authentication for vanilla SSH.
+ * The public key and private key are returned as byte arrays. Useful when we store private key/public key
+ * in a secure storage such as credential store. API user should implement this.
+ */
+public interface SSHPublicKeyAuthentication extends AuthenticationInfo {
+
+ /**
+ * Gets the public key as byte array.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The public key as a byte array.
+ */
+ byte[] getPrivateKey(String userName, String hostName);
+
+ /**
+ * Gets the private key as byte array.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The private key as a byte array.
+ */
+ byte[] getPublicKey(String userName, String hostName);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyFileAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyFileAuthentication.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyFileAuthentication.java
new file mode 100644
index 0000000..1915d21
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/authentication/SSHPublicKeyFileAuthentication.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Public key authentication for vanilla SSH.
+ * The public key and private key stored files are returned. API user should implement this.
+ */
+public interface SSHPublicKeyFileAuthentication extends AuthenticationInfo {
+
+ /**
+ * The file which contains the public key.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The name of the file which contains the public key.
+ */
+ String getPublicKeyFile(String userName, String hostName);
+
+ /**
+ * The file which contains the public key.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The name of the file which contains the private key.
+ */
+ String getPrivateKeyFile(String userName, String hostName);
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/AbstractRemoteCluster.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/AbstractRemoteCluster.java
new file mode 100644
index 0000000..11f2c03
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/AbstractRemoteCluster.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
+
+public abstract class AbstractRemoteCluster implements RemoteCluster {
+
+ protected final OutputParser outputParser;
+ protected final AuthenticationInfo authenticationInfo;
+ protected final ServerInfo serverInfo;
+ protected final JobManagerConfiguration jobManagerConfiguration;
+
+ public AbstractRemoteCluster(ServerInfo serverInfo,
+ JobManagerConfiguration jobManagerConfiguration,
+ AuthenticationInfo authenticationInfo) {
+
+ this.serverInfo = serverInfo;
+ this.jobManagerConfiguration = jobManagerConfiguration;
+ this.authenticationInfo = authenticationInfo;
+ if (jobManagerConfiguration != null) {
+ this.outputParser = jobManagerConfiguration.getParser();
+ }else {
+ this.outputParser = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandInfo.java
new file mode 100644
index 0000000..559430a
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandInfo.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * Encapsulates information about
+ */
+public interface CommandInfo {
+
+ /**
+ * Gets the executable command as a string.
+ * @return String encoded command. Should be able to execute
+ * directly on remote shell. Should includes appropriate parameters.
+ */
+ String getCommand();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandOutput.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandOutput.java
new file mode 100644
index 0000000..791616c
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/CommandOutput.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+
+import com.jcraft.jsch.Channel;
+
+import java.io.OutputStream;
+
+/**
+ * Output of a certain command. TODO rethink
+ */
+public interface CommandOutput {
+
+ /**
+ * Gets the output of the command as a stream.
+ * @param channel Command output as a stream.
+ */
+ void onOutput(Channel channel);
+
+ /**
+ * Gets standard error as a output stream.
+ * @return Command error as a stream.
+ */
+ OutputStream getStandardError();
+
+ /**
+ * The command exit code.
+ * @param code The program exit code
+ */
+ void exitCode(int code);
+
+ /**
+ * Return the exit code of the command execution.
+ * @return exit code
+ */
+ int getExitCode();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c99f18ab/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/JobSubmissionOutput.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/JobSubmissionOutput.java
new file mode 100644
index 0000000..85cb033
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/cluster/JobSubmissionOutput.java
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+public class JobSubmissionOutput {
+
+ private int exitCode = Integer.MIN_VALUE;
+ private String stdOut;
+ private String stdErr;
+ private String command;
+ private String jobId;
+ private boolean isJobSubmissionFailed;
+ private String failureReason;
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public String getStdOut() {
+ return stdOut;
+ }
+
+ public void setStdOut(String stdOut) {
+ this.stdOut = stdOut;
+ }
+
+ public String getStdErr() {
+ return stdErr;
+ }
+
+ public void setStdErr(String stdErr) {
+ this.stdErr = stdErr;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public boolean isJobSubmissionFailed() {
+ return isJobSubmissionFailed;
+ }
+
+ public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+ isJobSubmissionFailed = jobSubmissionFailed;
+ }
+
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public void setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ }
+}