You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 17:37:05 UTC
[2/4] airavata git commit: Refactor airavata-worker-commons to
airavata-worker-core
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
new file mode 100644
index 0000000..85cb033
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+public class JobSubmissionOutput {
+
+ private int exitCode = Integer.MIN_VALUE;
+ private String stdOut;
+ private String stdErr;
+ private String command;
+ private String jobId;
+ private boolean isJobSubmissionFailed;
+ private String failureReason;
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public String getStdOut() {
+ return stdOut;
+ }
+
+ public void setStdOut(String stdOut) {
+ this.stdOut = stdOut;
+ }
+
+ public String getStdErr() {
+ return stdErr;
+ }
+
+ public void setStdErr(String stdErr) {
+ this.stdErr = stdErr;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public boolean isJobSubmissionFailed() {
+ return isJobSubmissionFailed;
+ }
+
+ public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+ isJobSubmissionFailed = jobSubmissionFailed;
+ }
+
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public void setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
new file mode 100644
index 0000000..f6f1824
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.Map;
+
+public interface OutputParser {
+
+ /**
+ * This can be used to parseSingleJob the result of a job submission to get the JobID
+ * @param rawOutput
+ * @return
+ */
+ public String parseJobSubmission(String rawOutput)throws GFacException;
+
+
+ /**
+ * Parse output return by job submission task and identify jobSubmission failures.
+ * @param rawOutput
+ * @return true if job submission has been failed, false otherwise.
+ */
+ public boolean isJobSubmissionFailed(String rawOutput);
+
+
+ /**
+ * This can be used to get the job status from the output
+ * @param jobID
+ * @param rawOutput
+ */
+ public JobStatus parseJobStatus(String jobID, String rawOutput)throws GFacException;
+
+ /**
+ * This can be used to parseSingleJob a big output and get multipleJob statuses
+ * @param statusMap list of status map will return and key will be the job ID
+ * @param rawOutput
+ */
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput)throws GFacException;
+
+ /**
+ * filter the jobId value of given JobName from rawOutput
+ * @param jobName
+ * @param rawOutput
+ * @return
+ * @throws SSHApiException
+ */
+ public String parseJobId(String jobName, String rawOutput) throws GFacException;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
new file mode 100644
index 0000000..6045f4e
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * The raw command information. String returned by getCommand is directly executed in SSH
+ * shell. E.g :- getCommand return string set for rawCommand - "/opt/torque/bin/qsub /home/ogce/test.pbs".
+ */
+public class RawCommandInfo implements CommandInfo {
+
+ private String rawCommand;
+
+ public RawCommandInfo(String cmd) {
+ this.rawCommand = cmd;
+ }
+
+ public String getCommand() {
+ return this.rawCommand;
+ }
+
+ public String getRawCommand() {
+ return rawCommand;
+ }
+
+ public void setRawCommand(String rawCommand) {
+ this.rawCommand = rawCommand;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
new file mode 100644
index 0000000..936e1ce
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.exceptions.SSHApiException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface represents a RemoteCluster machine
+ * End users of the API can implement this and come up with their own
+ * implementations, but mostly this interface is for internal usage.
+ */
+public interface RemoteCluster { // FIXME: replace SSHApiException with suitable exception.
+
+ /**
+ * This will submit a job to the cluster with a given pbs file and some parameters
+ *
+ * @param jobScriptFilePath path of the job script file
+ * @param workingDirectory working directory where pbs should has to copy
+ * @return jobId after successful job submission
+ * @throws SSHApiException throws exception during error
+ */
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws WorkerException;
+
+ /**
+ * This will copy the localFile to remoteFile location in configured cluster
+ *
+ * @param localFile local file path of the file which needs to copy to remote location
+ * @param remoteFile remote file location, this can be a directory too
+ * @throws SSHApiException throws exception during error
+ */
+ public void copyTo(String localFile, String remoteFile) throws WorkerException;
+
+ /**
+ * This will copy a remote file in path rFile to local file lFile
+ *
+ * @param remoteFile remote file path, this has to be a full qualified path
+ * @param localFile This is the local file to copy, this can be a directory too
+ */
+ public void copyFrom(String remoteFile, String localFile) throws WorkerException;
+
+ /**
+ * This wil copy source remote file to target remote file.
+ *
+ * @param sourceFile remote file path, this has to be a full qualified path
+ * @param destinationFile This is the local file to copy, this can be a directory too
+ * @param session jcraft session of other coner of thirdparty file transfer.
+ * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT)
+ *
+ */
+ public void scpThirdParty(String sourceFile,
+ String destinationFile,
+ Session session,
+ DIRECTION inOrOut,
+ boolean ignoreEmptyFile) throws WorkerException;
+
+ /**
+ * This will create directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public void makeDirectory(String directoryPath) throws WorkerException;
+
+ /**
+ * This will delete the given job from the queue
+ *
+ * @param jobID jobId of the job which user wants to delete
+ * @return return the description of the deleted job
+ * @throws SSHApiException throws exception during error
+ */
+ public JobStatus cancelJob(String jobID) throws WorkerException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobID jobId of the job user want to get the status
+ * @return job status of the given jobID
+ * @throws SSHApiException throws exception during error
+ */
+ public JobStatus getJobStatus(String jobID) throws WorkerException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobName jobName of the job user want to get the status
+ * @return jobId of the given jobName
+ * @throws SSHApiException throws exception during error
+ */
+ public String getJobIdByJobName(String jobName, String userName) throws WorkerException;
+
+ /**
+ * This method can be used to poll the jobstatuses based on the given
+ * user but we should pass the jobID list otherwise we will get unwanted
+ * job statuses which submitted by different middleware outside apache
+ * airavata with the same uername which we are not considering
+ *
+ * @param userName userName of the jobs which required to get the status
+ * @param jobIDs precises set of jobIDs
+ */
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws WorkerException;
+
+ /**
+ * This will list directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public List<String> listDirectory(String directoryPath) throws WorkerException;
+
+ /**
+ * This method can use to execute custom command on remote compute resource.
+ * @param commandInfo
+ * @return <code>true</code> if command successfully executed, <code>false</code> otherwise.
+ * @throws SSHApiException
+ */
+ public boolean execute(CommandInfo commandInfo) throws WorkerException;
+
+ /**
+ * This method can be used to get created ssh session
+ * to reuse the created session.
+ */
+ public Session getSession() throws WorkerException;
+
+ /**
+ * This method can be used to close the connections initialized
+ * to handle graceful shutdown of the system
+ */
+ public void disconnect() throws WorkerException;
+
+ /**
+ * This gives the server Info
+ */
+ public ServerInfo getServerInfo();
+
+ public AuthenticationInfo getAuthentication();
+ enum DIRECTION {
+ TO,
+ FROM
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
new file mode 100644
index 0000000..a451d2b
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * Encapsulate server information.
+ */
+public class ServerInfo {
+ private static int DEFAULT_PORT = 22;
+ private String host;
+ private String userName;
+ private int port;
+ private String credentialToken;
+
+ public ServerInfo(String userName, String host, String credentialToken) {
+ this(userName, host, credentialToken, DEFAULT_PORT);
+ }
+
+ public ServerInfo(String userName, String host, String credentialToken, int port) {
+ this.host = host;
+ this.userName = userName;
+ this.port = port;
+ this.credentialToken = credentialToken;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getCredentialToken() {
+ return credentialToken;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
new file mode 100644
index 0000000..1241435
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataTransferTaskConfig {
+ private DataMovementProtocol transferProtocol;
+ private String taskClass;
+ private Map<String,String> properties = new HashMap<>();
+
+
+ public DataMovementProtocol getTransferProtocol() {
+ return transferProtocol;
+ }
+
+ public void setTransferProtocol(DataMovementProtocol transferProtocol) {
+ this.transferProtocol = transferProtocol;
+ }
+
+ public String getTaskClass() {
+ return taskClass;
+ }
+
+ public void setTaskClass(String taskClass) {
+ this.taskClass = taskClass;
+ }
+
+ public void addProperty(String key, String value) {
+ properties.put(key, value);
+ }
+
+ public void addProperties(Map<String, String> propMap) {
+ propMap.forEach(properties::put);
+ }
+
+ public Map<String,String> getProperties(){
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
new file mode 100644
index 0000000..5744345
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobSubmitterTaskConfig {
+ private JobSubmissionProtocol submissionProtocol;
+ private String taskClass;
+ private Map<String,String> properties = new HashMap<>();
+
+ public JobSubmissionProtocol getSubmissionProtocol() {
+ return submissionProtocol;
+ }
+
+ public void setSubmissionProtocol(JobSubmissionProtocol submissionProtocol) {
+ this.submissionProtocol = submissionProtocol;
+ }
+
+ public String getTaskClass() {
+ return taskClass;
+ }
+
+ public void setTaskClass(String taskClass) {
+ this.taskClass = taskClass;
+ }
+
+ public void addProperty(String key, String value) {
+ properties.put(key, value);
+ }
+
+ public void addProperties(Map<String, String> propMap) {
+ propMap.forEach(properties::put);
+ }
+
+ public Map<String,String> getProperties(){
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
new file mode 100644
index 0000000..12eed5a
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+
+import java.util.List;
+
+public class ResourceConfig {
+ private ResourceJobManagerType jobManagerType;
+ private String commandOutputParser;
+ private String emailParser;
+ private List<String> resourceEmailAddresses;
+
+ public ResourceJobManagerType getJobManagerType() {
+ return jobManagerType;
+ }
+
+ public void setJobManagerType(ResourceJobManagerType jobManagerType) {
+ this.jobManagerType = jobManagerType;
+ }
+
+ public String getCommandOutputParser() {
+ return commandOutputParser;
+ }
+
+ public void setCommandOutputParser(String commandOutputParser) {
+ this.commandOutputParser = commandOutputParser;
+ }
+
+ public String getEmailParser() {
+ return emailParser;
+ }
+
+ public void setEmailParser(String emailParser) {
+ this.emailParser = emailParser;
+ }
+
+ public List<String> getResourceEmailAddresses() {
+ return resourceEmailAddresses;
+ }
+
+ public void setResourceEmailAddresses(List<String> resourceEmailAddresses) {
+ this.resourceEmailAddresses = resourceEmailAddresses;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
new file mode 100644
index 0000000..5d2e372
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
@@ -0,0 +1,150 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class WorkerYamlConfigruation {
+
+ private static final String CONFIG = "config";
+ private static final String JOB_SUBMITTERS = "jobSubmitters";
+ private static final String SUBMISSIO_PROTOCOL = "submissionProtocol";
+ private static final String TASK_CLASS = "taskClass";
+ private static final String COMMON_TASKS = "commonTasks";
+ private static final String TYPE = "type";
+ private static final String FILE_TRANSFER_TASKS = "fileTransferTasks";
+ private static final String TRANSFER_PROTOCOL = "transferProtocol";
+ private static final String RESOURCES = "resources";
+ private static final String JOB_MANAGER_TYPE = "jobManagerType";
+ private static final String COMMAND_OUTPUT_PARSER = "commandOutputParser";
+ private static final String EMAIL_PARSER = "emailParser";
+ private static final String RESOURCE_EMAIL_ADDRESS = "resourceEmailAddresses";
+ private static final String PROPERTIES = "properties";
+
+ private List<JobSubmitterTaskConfig> jobSubmitters = new ArrayList<>();
+ private List<DataTransferTaskConfig> fileTransferTasks = new ArrayList<>();
+ private List<ResourceConfig> resources = new ArrayList<>();
+
+
+ public WorkerYamlConfigruation() throws WorkerException {
+ InputStream resourceAsStream = WorkerYamlConfigruation.class.getClassLoader().
+ getResourceAsStream("gfac-config.yaml");
+ parse(resourceAsStream);
+ }
+
+ private void parse(InputStream resourceAsStream) throws WorkerException {
+ if (resourceAsStream == null) {
+ throw new WorkerException("Configuration file{gfac-config.yaml} is not fund");
+ }
+ Yaml yaml = new Yaml();
+ Object load = yaml.load(resourceAsStream);
+ if (load == null) {
+ throw new WorkerException("Yaml configuration object null");
+ }
+
+ if (load instanceof Map) {
+ Map<String, Object> loadMap = (Map<String, Object>) load;
+ String identifier;
+ List<Map<String,Object >> jobSubYamls = (List<Map<String, Object>>) loadMap.get(JOB_SUBMITTERS);
+ JobSubmitterTaskConfig jobSubmitterTaskConfig;
+ if (jobSubYamls != null) {
+ for (Map<String, Object> jobSub : jobSubYamls) {
+ jobSubmitterTaskConfig = new JobSubmitterTaskConfig();
+ identifier = ((String) jobSub.get(SUBMISSIO_PROTOCOL));
+ jobSubmitterTaskConfig.setSubmissionProtocol(JobSubmissionProtocol.valueOf(identifier));
+ jobSubmitterTaskConfig.setTaskClass(((String) jobSub.get(TASK_CLASS)));
+ Object propertiesObj = jobSub.get(PROPERTIES);
+ List propertiesList;
+ if (propertiesObj instanceof List) {
+ propertiesList = ((List) propertiesObj);
+ if (propertiesList.size() > 0) {
+ Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+ jobSubmitterTaskConfig.addProperties(props);
+ }
+ }
+ jobSubmitters.add(jobSubmitterTaskConfig);
+ }
+ }
+
+ List<Map<String, Object>> fileTransYamls = (List<Map<String, Object>>) loadMap.get(FILE_TRANSFER_TASKS);
+ DataTransferTaskConfig dataTransferTaskConfig;
+ if (fileTransYamls != null) {
+ for (Map<String, Object> fileTransConfig : fileTransYamls) {
+ dataTransferTaskConfig = new DataTransferTaskConfig();
+ identifier = ((String) fileTransConfig.get(TRANSFER_PROTOCOL));
+ dataTransferTaskConfig.setTransferProtocol(DataMovementProtocol.valueOf(identifier));
+ dataTransferTaskConfig.setTaskClass(((String) fileTransConfig.get(TASK_CLASS)));
+ Object propertiesObj = fileTransConfig.get(PROPERTIES);
+ List propertiesList;
+ if (propertiesObj instanceof List) {
+ propertiesList = (List) propertiesObj;
+ if (propertiesList.size() > 0) {
+ Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+ dataTransferTaskConfig.addProperties(props);
+ }
+ }
+ fileTransferTasks.add(dataTransferTaskConfig);
+ }
+ }
+
+ List<Map<String, Object>> resourcesYaml = (List<Map<String, Object>>) loadMap.get(RESOURCES);
+ ResourceConfig resourceConfig;
+ if (resourcesYaml != null) {
+ for (Map<String, Object> resource : resourcesYaml) {
+ resourceConfig = new ResourceConfig();
+ identifier = resource.get(JOB_MANAGER_TYPE).toString();
+ resourceConfig.setJobManagerType(ResourceJobManagerType.valueOf(identifier));
+ resourceConfig.setCommandOutputParser(resource.get(COMMAND_OUTPUT_PARSER).toString());
+ Object emailParser = resource.get(EMAIL_PARSER);
+ if (emailParser != null){
+ resourceConfig.setEmailParser(emailParser.toString());
+ }
+ List<String> emailAddressList = (List<String>) resource.get(RESOURCE_EMAIL_ADDRESS);
+ resourceConfig.setResourceEmailAddresses(emailAddressList);
+ resources.add(resourceConfig);
+ }
+ }
+ }
+ }
+
+ public List<JobSubmitterTaskConfig> getJobSbumitters() {
+ return jobSubmitters;
+ }
+
+ public List<DataTransferTaskConfig> getFileTransferTasks() {
+ return fileTransferTasks;
+ }
+
+ public List<ResourceConfig> getResourceConfiguration() {
+ return resources;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
new file mode 100644
index 0000000..53bda5b
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.worker.commons.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.commons.cluster.RemoteCluster;
+import org.apache.airavata.worker.commons.cluster.ServerInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class ProcessContext {
+
+ private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
+ // process model
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private CuratorFramework curatorClient;
+ private Publisher statusPublisher;
+ private final String processId;
+ private final String gatewayId;
+ private final String tokenId;
+ private ProcessModel processModel;
+ private String workingDir;
+ private String scratchLocation;
+ private String inputDir;
+ private String outputDir;
+ private String localWorkingDir;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private UserResourceProfile userResourceProfile;
+ private UserComputeResourcePreference userComputeResourcePreference;
+ private UserStoragePreference userStoragePreference;
+ private ComputeResourceDescription computeResourceDescription;
+ private ApplicationDeploymentDescription applicationDeploymentDescription;
+ private ApplicationInterfaceDescription applicationInterfaceDescription;
+ private RemoteCluster jobSubmissionRemoteCluster;
+ private RemoteCluster dataMovementRemoteCluster;
+ private Map<String, String> sshProperties;
+ private String stdoutLocation;
+ private String stderrLocation;
+ private JobSubmissionProtocol jobSubmissionProtocol;
+ private DataMovementProtocol dataMovementProtocol;
+ private JobModel jobModel;
+ private StorageResourceDescription storageResource;
+ private MonitorMode monitorMode;
+ private ResourceJobManager resourceJobManager;
+ private boolean handOver;
+ private boolean cancel;
+ private ServerInfo serverInfo;
+ private List<String> taskExecutionOrder;
+ private List<TaskModel> taskList;
+ private Map<String, TaskModel> taskMap;
+ private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
+ private boolean complete = false; // all tasks executed?
+ private boolean recovery = false; // is process in recovery mode?
+ private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+ private boolean acknowledge;
+ private SSHKeyAuthentication sshKeyAuthentication;
+ private boolean recoveryWithCancel = false;
+ private String usageReportingGatewayId;
+
+ /**
+ * Note: process context property use lazy loading approach. In runtime you will see some properties as null
+ * unless you have access it previously. Once that property access using the api,it will be set to correct value.
+ */
+ private ProcessContext(String processId, String gatewayId, String tokenId) {
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ public ExperimentCatalog getExperimentCatalog() {
+ return experimentCatalog;
+ }
+
+ public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ }
+
+ public AppCatalog getAppCatalog() {
+ return appCatalog;
+ }
+
+ public void setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public String getTokenId() {
+ return tokenId;
+ }
+
+ public String getProcessId() {
+ return processId;
+ }
+
+ public CuratorFramework getCuratorClient() {
+ return curatorClient;
+ }
+
+ public void setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ }
+
+ public Publisher getStatusPublisher() {
+ return statusPublisher;
+ }
+
+ public void setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ }
+
+ public ProcessModel getProcessModel() {
+ return processModel;
+ }
+
+ public void setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ }
+
+ public String getWorkingDir() {
+ if (workingDir == null) {
+ if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
+ workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
+ }else {
+ String scratchLocation = getScratchLocation();
+ workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" +
+ processId);
+ }
+ }
+ return workingDir;
+ }
+
+ public String getScratchLocation() {
+ if (scratchLocation == null) {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getScratchLocation())) {
+ scratchLocation = userComputeResourcePreference.getScratchLocation();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+ scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+ }else {
+ scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+ }
+ }
+ return scratchLocation;
+ }
+
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+
+ public GatewayResourceProfile getGatewayResourceProfile() {
+ return gatewayResourceProfile;
+ }
+
+ public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ }
+
+ public UserResourceProfile getUserResourceProfile() {
+ return userResourceProfile;
+ }
+
+ public void setUserResourceProfile(UserResourceProfile userResourceProfile) {
+ this.userResourceProfile = userResourceProfile;
+ }
+
+ private UserComputeResourcePreference getUserComputeResourcePreference() {
+ return userComputeResourcePreference;
+ }
+
+ public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
+ this.userComputeResourcePreference = userComputeResourcePreference;
+ }
+
+ public UserStoragePreference getUserStoragePreference() {
+ return userStoragePreference;
+ }
+
+ public void setUserStoragePreference(UserStoragePreference userStoragePreference) {
+ this.userStoragePreference = userStoragePreference;
+ }
+
+ public StoragePreference getGatewayStorageResourcePreference() {
+ return gatewayStorageResourcePreference;
+ }
+
+ public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ }
+
+ public RemoteCluster getJobSubmissionRemoteCluster() {
+ return jobSubmissionRemoteCluster;
+ }
+
+ public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
+ this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
+ }
+
+ public RemoteCluster getDataMovementRemoteCluster() {
+ return dataMovementRemoteCluster;
+ }
+
+ public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
+ this.dataMovementRemoteCluster = dataMovementRemoteCluster;
+ }
+
+ public Map<String, String> getSshProperties() {
+ return sshProperties;
+ }
+
+ public void setSshProperties(Map<String, String> sshProperties) {
+ this.sshProperties = sshProperties;
+ }
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
+ }
+
+ public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+ return applicationDeploymentDescription;
+ }
+
+ public void setApplicationDeploymentDescription(ApplicationDeploymentDescription
+ applicationDeploymentDescription) {
+ this.applicationDeploymentDescription = applicationDeploymentDescription;
+ }
+
+ public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+ return applicationInterfaceDescription;
+ }
+
+ public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+ this.applicationInterfaceDescription = applicationInterfaceDescription;
+ }
+
+ public String getStdoutLocation() {
+ return stdoutLocation;
+ }
+
+ public void setStdoutLocation(String stdoutLocation) {
+ this.stdoutLocation = stdoutLocation;
+ }
+
+ public String getStderrLocation() {
+ return stderrLocation;
+ }
+
+ public void setStderrLocation(String stderrLocation) {
+ this.stderrLocation = stderrLocation;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public String getOutputDir() {
+ if (outputDir == null) {
+ outputDir = getWorkingDir();
+ }
+ return outputDir;
+ }
+
+ public String getInputDir() {
+ if (inputDir == null) {
+ inputDir = getWorkingDir();
+ }
+ return inputDir;
+ }
+
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+
+ public JobSubmissionProtocol getJobSubmissionProtocol() {
+ if (jobSubmissionProtocol == null) {
+ jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+ return jobSubmissionProtocol;
+ }
+
+ public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
+ this.jobSubmissionProtocol = jobSubmissionProtocol;
+ }
+
+ public DataMovementProtocol getDataMovementProtocol() {
+ if (dataMovementProtocol == null) {
+ dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+ return dataMovementProtocol;
+ }
+
+ public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
+ this.dataMovementProtocol = dataMovementProtocol;
+ }
+
+ public String getTaskDag() {
+ return getProcessModel().getTaskDag();
+ }
+
+ public List<TaskModel> getTaskList() {
+ if (taskList == null) {
+ synchronized (TaskModel.class){
+ if (taskList == null) {
+ taskList = getProcessModel().getTasks();
+ }
+ }
+ }
+ return taskList;
+ }
+
+
+ public List<String> getTaskExecutionOrder() {
+ return taskExecutionOrder;
+ }
+
+ public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+ this.taskExecutionOrder = taskExecutionOrder;
+ }
+
+ public Map<String, TaskModel> getTaskMap() {
+ if (taskMap == null) {
+ synchronized (TaskModel.class) {
+ if (taskMap == null) {
+ taskMap = new HashMap<>();
+ for (TaskModel taskModel : getTaskList()) {
+ taskMap.put(taskModel.getTaskId(), taskModel);
+ }
+ }
+ }
+ }
+ return taskMap;
+ }
+
+ public JobModel getJobModel() {
+ if (jobModel == null) {
+ jobModel = new JobModel();
+ jobModel.setProcessId(processId);
+ jobModel.setWorkingDir(getWorkingDir());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ }
+ return jobModel;
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ this.jobModel = jobModel;
+ }
+
+ private ComputeResourcePreference getGatewayComputeResourcePreference() {
+ return gatewayComputeResourcePreference;
+ }
+
+ public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ }
+
+ public ProcessState getProcessState() {
+ if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0)
+ return processModel.getProcessStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public void setProcessStatus(ProcessStatus status) {
+ if (status != null) {
+ log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
+ getProcessState().name(), status.getState().name());
+ List<ProcessStatus> processStatuses = new ArrayList<>();
+ processStatuses.add(status);
+ processModel.setProcessStatuses(processStatuses);
+ }
+ }
+
+ public ProcessStatus getProcessStatus(){
+ if(processModel.getProcessStatuses() != null)
+ return processModel.getProcessStatuses().get(0);
+ else
+ return null;
+ }
+
+ public String getComputeResourceId() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getComputeResourceId())) {
+ return userComputeResourcePreference.getComputeResourceId();
+ } else {
+ return gatewayComputeResourcePreference.getComputeResourceId();
+ }
+ }
+
+ public String getComputeResourceCredentialToken(){
+ if (isUseUserCRPref()) {
+ if (userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return userResourceProfile.getCredentialStoreToken();
+ }
+ } else {
+ if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+ }
+
+ public String getStorageResourceCredentialToken(){
+ if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+
+ public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
+ return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+
+ public DataMovementProtocol getPreferredDataMovementProtocol() {
+ return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+
+ public void setMonitorMode(MonitorMode monitorMode) {
+ this.monitorMode = monitorMode;
+ }
+
+ public MonitorMode getMonitorMode() {
+ return monitorMode;
+ }
+
+ public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+ this.resourceJobManager = resourceJobManager;
+ }
+
+ public ResourceJobManager getResourceJobManager() {
+ return resourceJobManager;
+ }
+
+ public String getLocalWorkingDir() {
+ return localWorkingDir;
+ }
+
+ public void setLocalWorkingDir(String localWorkingDir) {
+ this.localWorkingDir = localWorkingDir;
+ }
+
+ public String getExperimentId() {
+ return processModel.getExperimentId();
+ }
+
+ public boolean isHandOver() {
+ return handOver;
+ }
+
+ public void setHandOver(boolean handOver) {
+ this.handOver = handOver;
+ }
+
+ public boolean isCancel() {
+ return cancel;
+ }
+
+ public void setCancel(boolean cancel) {
+ this.cancel = cancel;
+ }
+
+ public boolean isInterrupted(){
+ return this.cancel || this.handOver;
+ }
+
+ public String getCurrentExecutingTaskId() {
+ if (currentExecutingTaskModel != null) {
+ return currentExecutingTaskModel.getTaskId();
+ }
+ return null;
+ }
+
+ public boolean isPauseTaskExecution() {
+ return pauseTaskExecution;
+ }
+
+ public void setPauseTaskExecution(boolean pauseTaskExecution) {
+ this.pauseTaskExecution = pauseTaskExecution;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public void setComplete(boolean complete) {
+ this.complete = complete;
+ }
+
+ public boolean isRecovery() {
+ return recovery;
+ }
+
+ public void setRecovery(boolean recovery) {
+ this.recovery = recovery;
+ }
+
+ public TaskModel getCurrentExecutingTaskModel() {
+ return currentExecutingTaskModel;
+ }
+
+ public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+ this.currentExecutingTaskModel = currentExecutingTaskModel;
+ }
+
+ public StorageResourceDescription getStorageResource() {
+ return storageResource;
+ }
+
+ public void setStorageResource(StorageResourceDescription storageResource) {
+ this.storageResource = storageResource;
+ }
+
+ public void setAcknowledge(boolean acknowledge) {
+ this.acknowledge = acknowledge;
+ }
+
+ public boolean isAcknowledge() {
+ return acknowledge;
+ }
+
+ public boolean isRecoveryWithCancel() {
+ return recoveryWithCancel;
+ }
+
+ public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+ this.recoveryWithCancel = recoveryWithCancel;
+ }
+
+ public boolean isUseUserCRPref() {
+ return getProcessModel().isUseUserCRPref();
+ }
+
+ public String getComputeResourceLoginUserName(){
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getLoginUserName())) {
+ return userComputeResourcePreference.getLoginUserName();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
+ return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
+ } else {
+ return gatewayComputeResourcePreference.getLoginUserName();
+ }
+ }
+
+ public String getStorageResourceLoginUserName(){
+ return gatewayStorageResourcePreference.getLoginUserName();
+ }
+
+ public String getStorageFileSystemRootLocation(){
+ return gatewayStorageResourcePreference.getFileSystemRootLocation();
+ }
+
+ public String getStorageResourceId() {
+ return gatewayStorageResourcePreference.getStorageResourceId();
+ }
+
+ private ComputationalResourceSchedulingModel getProcessCRSchedule() {
+ if (getProcessModel() != null) {
+ return getProcessModel().getProcessResourceSchedule();
+ } else {
+ return null;
+ }
+ }
+
+ public ServerInfo getComputeResourceServerInfo(){
+ return new ServerInfo(getComputeResourceLoginUserName(),
+ getComputeResourceDescription().getHostName(),
+ getComputeResourceCredentialToken());
+ }
+
+ public ServerInfo getStorageResourceServerInfo() {
+ return new ServerInfo(getStorageResourceLoginUserName(),
+ getStorageResource().getHostName(),
+ getStorageResourceCredentialToken());
+ }
+
+ private boolean isValid(String str) {
+ return str != null && !str.trim().isEmpty();
+ }
+
+ public String getUsageReportingGatewayId() {
+ return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+ }
+
+ public String getAllocationProjectNumber() {
+ return gatewayComputeResourcePreference.getAllocationProjectNumber();
+ }
+
+ public String getReservation() {
+ long start = 0, end = 0;
+ String reservation = null;
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getReservation())) {
+ reservation = userComputeResourcePreference.getReservation();
+ start = userComputeResourcePreference.getReservationStartTime();
+ end = userComputeResourcePreference.getReservationEndTime();
+ } else {
+ reservation = gatewayComputeResourcePreference.getReservation();
+ start = gatewayComputeResourcePreference.getReservationStartTime();
+ end = gatewayComputeResourcePreference.getReservationEndTime();
+ }
+ if (reservation != null && start > 0 && start < end) {
+ long now = Calendar.getInstance().getTimeInMillis();
+ if (now > start && now < end) {
+ return reservation;
+ }
+ }
+ return null;
+ }
+
+ public String getQualityOfService() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getQualityOfService())) {
+ return userComputeResourcePreference.getQualityOfService();
+ } else {
+ return gatewayComputeResourcePreference.getQualityOfService();
+ }
+ }
+
+
+ public String getQueueName() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
+ return userComputeResourcePreference.getPreferredBatchQueue();
+ } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
+ return processModel.getProcessResourceSchedule().getQueueName();
+ } else {
+ return gatewayComputeResourcePreference.getPreferredBatchQueue();
+ }
+ }
+
+ public static class ProcessContextBuilder{
+ private final String processId;
+ private final String gatewayId;
+ private final String tokenId;
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private CuratorFramework curatorClient;
+ private Publisher statusPublisher;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private ProcessModel processModel;
+
+ public ProcessContextBuilder(String processId, String gatewayId, String tokenId) throws WorkerException {
+ if (notValid(processId) || notValid(gatewayId) || notValid(tokenId)) {
+ throwError("Process Id, Gateway Id and tokenId must be not null");
+ }
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ public ProcessContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ return this;
+ }
+
+ public ProcessContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ return this;
+ }
+
+ public ProcessContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ return this;
+ }
+
+ public ProcessContextBuilder setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ return this;
+ }
+
+ public ProcessContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ return this;
+ }
+
+ public ProcessContextBuilder setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ return this;
+ }
+
+ public ProcessContextBuilder setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ return this;
+ }
+
+ public ProcessContextBuilder setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ return this;
+ }
+
+ public ProcessContext build() throws WorkerException {
+ if (notValid(gatewayResourceProfile)) {
+ throwError("Invalid GatewayResourceProfile");
+ }
+ if (notValid(gatewayComputeResourcePreference)) {
+ throwError("Invalid Gateway ComputeResourcePreference");
+ }
+ if (notValid(gatewayStorageResourcePreference)) {
+ throwError("Invalid Gateway StoragePreference");
+ }
+ if (notValid(processModel)) {
+ throwError("Invalid Process Model");
+ }
+ if (notValid(appCatalog)) {
+ throwError("Invalid AppCatalog");
+ }
+ if (notValid(experimentCatalog)) {
+ throwError("Invalid Experiment catalog");
+ }
+ if (notValid(curatorClient)) {
+ throwError("Invalid Curator Client");
+ }
+ if (notValid(statusPublisher)) {
+ throwError("Invalid Status Publisher");
+ }
+
+ ProcessContext pc = new ProcessContext(processId, gatewayId, tokenId);
+ pc.setAppCatalog(appCatalog);
+ pc.setExperimentCatalog(experimentCatalog);
+ pc.setCuratorClient(curatorClient);
+ pc.setStatusPublisher(statusPublisher);
+ pc.setProcessModel(processModel);
+ pc.setGatewayResourceProfile(gatewayResourceProfile);
+ pc.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
+ pc.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
+
+ return pc;
+ }
+
+ private boolean notValid(Object value) {
+ return value == null;
+ }
+
+ private void throwError(String msg) throws WorkerException {
+ throw new WorkerException(msg);
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
new file mode 100644
index 0000000..f94ebd5
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TaskContext {
+ private static final Logger log = LoggerFactory.getLogger(TaskContext.class);
+
+ private TaskModel taskModel;
+ private ProcessContext parentProcessContext;
+ private InputDataObjectType processInput;
+ private OutputDataObjectType processOutput;
+ private Object subTaskModel = null;
+ private boolean isCancel = false;
+
+ public TaskModel getTaskModel() {
+ return taskModel;
+ }
+
+ public void setTaskModel(TaskModel taskModel) {
+ this.taskModel = taskModel;
+ }
+
+ public ProcessContext getParentProcessContext() {
+ return parentProcessContext;
+ }
+
+ public void setParentProcessContext(ProcessContext parentProcessContext) {
+ this.parentProcessContext = parentProcessContext;
+ }
+
+ public String getWorkingDir() {
+ return getParentProcessContext().getWorkingDir();
+ }
+
+ public void setTaskStatus(TaskStatus taskStatus) {
+ log.info("expId: {}, processId: {}, taskId: {}, type: {} : Task status changed {} -> {}", parentProcessContext
+ .getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+ getTaskState().name(), taskStatus .getState().name());
+ List<TaskStatus> taskStatuses = new ArrayList<>();
+ taskStatuses.add(taskStatus);
+ taskModel.setTaskStatuses(taskStatuses);
+ }
+
+ public TaskStatus getTaskStatus() {
+ if(taskModel.getTaskStatuses() != null)
+ return taskModel.getTaskStatuses().get(0);
+ else
+ return null;
+ }
+
+ public TaskState getTaskState() {
+ if(taskModel.getTaskStatuses() != null)
+ return taskModel.getTaskStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public TaskTypes getTaskType() {
+ return taskModel.getTaskType();
+ }
+
+ public String getTaskId() {
+ return taskModel.getTaskId();
+ }
+
+ public String getLocalWorkingDir() {
+ return getParentProcessContext().getLocalWorkingDir();
+ }
+
+ public InputDataObjectType getProcessInput() {
+ return processInput;
+ }
+
+ public void setProcessInput(InputDataObjectType processInput) {
+ this.processInput = processInput;
+ }
+
+ public OutputDataObjectType getProcessOutput() {
+ return processOutput;
+ }
+
+ public void setProcessOutput(OutputDataObjectType processOutput) {
+ this.processOutput = processOutput;
+ }
+
+ public String getProcessId() {
+ return parentProcessContext.getProcessId();
+ }
+
+ public String getExperimentId() {
+ return parentProcessContext.getExperimentId();
+ }
+
+ public Object getSubTaskModel() throws TException {
+ if (subTaskModel == null) {
+ subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
+ }
+ return subTaskModel;
+ }
+
+ public boolean isCancel() {
+ return isCancel;
+ }
+
+ public void setCancel(boolean cancel) {
+ isCancel = cancel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
new file mode 100644
index 0000000..0dcdd0e
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ * An exception class to wrap SSH command execution related errors.
+ */
+public class SSHApiException extends Exception {
+
+ public SSHApiException(String message) {
+ super(message);
+ }
+
+ public SSHApiException(String message, Exception e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
new file mode 100644
index 0000000..334ee0f
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkerException extends Exception {
+ private static final Logger log = LoggerFactory.getLogger(WorkerException.class);
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public WorkerException(String s) {
+ super(s);
+ }
+
+ public WorkerException(Exception e) {
+ super(e);
+ log.error(e.getMessage(),e);
+ }
+
+ public WorkerException(String s, Throwable throwable) {
+ super(s, throwable);
+ log.error(s,throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
new file mode 100644
index 0000000..b7a8b45
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.commons.context.TaskContext;
+
+import java.util.Map;
+
+/**
+ * All Tasks should inherit this interface.
+ */
+public interface Task {
+
+ /**
+ * Task initialization method, this method will be invoked after create a new task instance.
+ * @param propertyMap
+ * @throws TaskException
+ */
+ public void init(Map<String, String> propertyMap) throws TaskException;
+
+ /**
+ * This method will be called at the first time of task chain execution. This method should called before recover
+ * method. For a given task chain execute method only call one time. recover method may be called more than once.
+ * @param taskContext
+ * @return completed task status if success otherwise failed task status.
+ */
+ public TaskStatus execute(TaskContext taskContext);
+
+ /**
+ * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
+ * This method may be called zero or few time in a process chain.
+ * @param taskContext
+ * @return completed task status if success otherwise failed task status.
+ */
+ public TaskStatus recover(TaskContext taskContext);
+
+ /**
+ * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
+ * @return type of this task object
+ */
+ public TaskTypes getType();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
new file mode 100644
index 0000000..d290881
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+public class TaskException extends Exception {
+ private static final long serialVersionUID = 8662332011259328779L;
+
+ public TaskException() {
+ super();
+ }
+
+ public TaskException(String message) {
+ super(message);
+ }
+
+ public TaskException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskException(Throwable cause) {
+ super(cause);
+ }
+
+ protected TaskException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
new file mode 100644
index 0000000..022cb79
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public interface JobManagerConfiguration {
+
+ public RawCommandInfo getCancelCommand(String jobID);
+
+ public String getJobDescriptionTemplateName();
+
+ public RawCommandInfo getMonitorCommand(String jobID);
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+ public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName);
+
+ public String getScriptExtension();
+
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+ public OutputParser getParser();
+
+ public String getInstalledPath();
+
+ public String getBaseCancelCommand();
+
+ public String getBaseMonitorCommand();
+
+ public String getBaseSubmitCommand();
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
new file mode 100644
index 0000000..5fa5fc4
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.utils;
+
+public class WorkerConstants {
+ public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
+ public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+ public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
+
+ public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
+ public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
+
+
+ public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
+ public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
+ public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
+ public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
+ public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
+ public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
+ public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
+ public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
+ public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
+ public static final String NEWLINE = System.getProperty("line.separator");
+ public static final String INPUT_DATA_DIR_VAR_NAME = "input";
+ public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
+ public static final int DEFAULT_GSI_FTP_PORT = 2811;
+ public static final String _127_0_0_1 = "127.0.0.1";
+ public static final String LOCALHOST = "localhost";
+
+ public static final String MULTIPLE_INPUTS_SPLITTER = ",";
+
+ public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
+ public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
+ public static final String PROP_BROKER_URL = "broker.url";
+ public static final String PROP_TOPIC = "topic";
+ public static final String SPACE = " ";
+ public static final int COMMAND_EXECUTION_TIMEOUT = 5;
+ public static final String EXECUTABLE_NAME = "run.sh";
+
+ public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
+ public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
+ public static final String MYPROXY_SERVER = "myproxy.server";
+ public static final String MYPROXY_SERVER_PORT = "myproxy.port";
+ public static final String MYPROXY_USER = "myproxy.username";
+ public static final String MYPROXY_PASS = "myproxy.password";
+ public static final String MYPROXY_LIFE = "myproxy.life";
+ /*
+ * SSH properties
+ */
+ public static final String SSH_PRIVATE_KEY = "private.ssh.key";
+ public static final String SSH_PUBLIC_KEY = "public.ssh.key";
+ public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
+ public static final String SSH_USER_NAME = "ssh.username";
+ public static final String SSH_PASSWORD = "ssh.password";
+ public static final String PROPERTY = "property";
+ public static final String NAME = "name";
+ public static final String VALUE = "value";
+ public static final String OUTPUT_DATA_DIR = "output.location";
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
new file mode 100644
index 0000000..4ab403f
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.commons.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class WorkerFactory {
+ private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
+
+ private static boolean isWorkerConfigurationLoaded = false;
+ private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
+ private static Cache<String,Session> sessionCache;
+
+ public static void loadConfiguration() throws WorkerException {
+ if (!isWorkerConfigurationLoaded) {
+ WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+ try {
+ for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+ resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+ }
+ } catch (Exception e) {
+ throw new WorkerException("Worker config issue", e);
+ }
+
+ sessionCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
+ .removalListener((RemovalListener<String, Session>) removalNotification -> {
+ if (removalNotification.getValue().isConnected()) {
+ log.info("Disconnecting ssh session with key: " + removalNotification.getKey());
+ removalNotification.getValue().disconnect();
+ }
+ log.info("Removed ssh session with key: " + removalNotification.getKey());
+ })
+ .build();
+
+ isWorkerConfigurationLoaded = true;
+ }
+ }
+
+ public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() {
+ return resources;
+ }
+
+ public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
+ return resources.get(resourceJobManagerType);
+ }
+}