You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 17:37:05 UTC

[2/4] airavata git commit: Refactor airavata-worker-commons to airavata-worker-core

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
new file mode 100644
index 0000000..85cb033
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+public class JobSubmissionOutput {
+
+	private int exitCode = Integer.MIN_VALUE;
+	private String stdOut;
+	private String stdErr;
+	private String command;
+	private String jobId;
+	private boolean isJobSubmissionFailed;
+	private String failureReason;
+
+	public int getExitCode() {
+		return exitCode;
+	}
+
+	public void setExitCode(int exitCode) {
+		this.exitCode = exitCode;
+	}
+
+	public String getStdOut() {
+		return stdOut;
+	}
+
+	public void setStdOut(String stdOut) {
+		this.stdOut = stdOut;
+	}
+
+	public String getStdErr() {
+		return stdErr;
+	}
+
+	public void setStdErr(String stdErr) {
+		this.stdErr = stdErr;
+	}
+
+	public String getCommand() {
+		return command;
+	}
+
+	public void setCommand(String command) {
+		this.command = command;
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(String jobId) {
+		this.jobId = jobId;
+	}
+
+	public boolean isJobSubmissionFailed() {
+		return isJobSubmissionFailed;
+	}
+
+	public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+		isJobSubmissionFailed = jobSubmissionFailed;
+	}
+
+	public String getFailureReason() {
+		return failureReason;
+	}
+
+	public void setFailureReason(String failureReason) {
+		this.failureReason = failureReason;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
new file mode 100644
index 0000000..f6f1824
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.Map;
+
+public interface OutputParser {
+
+    /**
+     * This can be used to parseSingleJob the result of a job submission to get the JobID
+     * @param rawOutput
+     * @return
+     */
+    public String parseJobSubmission(String rawOutput)throws GFacException;
+
+
+    /**
+     * Parse output return by job submission task and identify jobSubmission failures.
+     * @param rawOutput
+     * @return true if job submission has been failed, false otherwise.
+     */
+    public boolean isJobSubmissionFailed(String rawOutput);
+
+
+    /**
+     * This can be used to get the job status from the output
+     * @param jobID
+     * @param rawOutput
+     */
+    public JobStatus parseJobStatus(String jobID, String rawOutput)throws GFacException;
+
+    /**
+     * This can be used to parseSingleJob a big output and get multipleJob statuses
+     * @param statusMap list of status map will return and key will be the job ID
+     * @param rawOutput
+     */
+    public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput)throws GFacException;
+
+    /**
+     * filter the jobId value of given JobName from rawOutput
+     * @param jobName
+     * @param rawOutput
+     * @return
+     * @throws SSHApiException
+     */
+    public String parseJobId(String jobName, String rawOutput) throws GFacException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
new file mode 100644
index 0000000..6045f4e
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * The raw command information. String returned by getCommand is directly executed in SSH
+ * shell. E.g :- getCommand return string set for rawCommand - "/opt/torque/bin/qsub /home/ogce/test.pbs".
+ */
+public class RawCommandInfo implements CommandInfo {
+
+    private String rawCommand;
+
+    public RawCommandInfo(String cmd) {
+        this.rawCommand = cmd;
+    }
+
+    public String getCommand() {
+        return this.rawCommand;
+    }
+
+    public String getRawCommand() {
+        return rawCommand;
+    }
+
+    public void setRawCommand(String rawCommand) {
+        this.rawCommand = rawCommand;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
new file mode 100644
index 0000000..936e1ce
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.exceptions.SSHApiException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface represents a RemoteCluster machine
+ * End users of the API can implement this and come up with their own
+ * implementations, but mostly this interface is for internal usage.
+ */
+public interface RemoteCluster { // FIXME: replace SSHApiException with suitable exception.
+
+	/**
+	 * This will submit a job to the cluster with a given pbs file and some parameters
+	 *
+	 * @param jobScriptFilePath path of the job script file
+	 * @param workingDirectory  working directory where pbs should has to copy
+	 * @return jobId after successful job submission
+	 * @throws SSHApiException throws exception during error
+	 */
+	public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws WorkerException;
+
+	/**
+	 * This will copy the localFile to remoteFile location in configured cluster
+	 *
+	 * @param localFile  local file path of the file which needs to copy to remote location
+	 * @param remoteFile remote file location, this can be a directory too
+	 * @throws SSHApiException throws exception during error
+	 */
+	public void copyTo(String localFile, String remoteFile) throws WorkerException;
+
+	/**
+	 * This will copy a remote file in path rFile to local file lFile
+	 *
+	 * @param remoteFile      remote file path, this has to be a full qualified path
+	 * @param localFile This is the local file to copy, this can be a directory too
+	 */
+	public void copyFrom(String remoteFile, String localFile) throws WorkerException;
+
+	/**
+	 * This wil copy source remote file to target remote file.
+	 *
+	 * @param sourceFile remote file path, this has to be a full qualified path
+	 * @param destinationFile This is the local file to copy, this can be a directory too
+     * @param session jcraft session of other coner of thirdparty file transfer.
+     * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT)
+	 *
+	 */
+	public void scpThirdParty(String sourceFile,
+                              String destinationFile,
+                              Session session,
+                              DIRECTION inOrOut,
+                              boolean ignoreEmptyFile) throws WorkerException;
+
+	/**
+	 * This will create directories in computing resources
+	 *
+	 * @param directoryPath the full qualified path for the directory user wants to create
+	 * @throws SSHApiException throws during error
+	 */
+	public void makeDirectory(String directoryPath) throws  WorkerException;
+
+	/**
+	 * This will delete the given job from the queue
+	 *
+	 * @param jobID jobId of the job which user wants to delete
+	 * @return return the description of the deleted job
+	 * @throws SSHApiException throws exception during error
+	 */
+	public JobStatus cancelJob(String jobID) throws WorkerException;
+
+	/**
+	 * This will get the job status of the the job associated with this jobId
+	 *
+	 * @param jobID jobId of the job user want to get the status
+	 * @return job status of the given jobID
+	 * @throws SSHApiException throws exception during error
+	 */
+	public JobStatus getJobStatus(String jobID) throws WorkerException;
+
+	/**
+	 * This will get the job status of the the job associated with this jobId
+	 *
+	 * @param jobName jobName of the job user want to get the status
+	 * @return jobId of the given jobName
+	 * @throws SSHApiException throws exception during error
+	 */
+	public String getJobIdByJobName(String jobName, String userName) throws WorkerException;
+
+	/**
+	 * This method can be used to poll the jobstatuses based on the given
+	 * user but we should pass the jobID list otherwise we will get unwanted
+	 * job statuses which submitted by different middleware outside apache
+	 * airavata with the same uername which we are not considering
+	 *
+	 * @param userName userName of the jobs which required to get the status
+	 * @param jobIDs   precises set of jobIDs
+	 */
+	public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws WorkerException;
+
+	/**
+	 * This will list directories in computing resources
+	 *
+	 * @param directoryPath the full qualified path for the directory user wants to create
+	 * @throws SSHApiException throws during error
+	 */
+	public List<String> listDirectory(String directoryPath) throws WorkerException;
+
+	/**
+	 * This method can use to execute custom command on remote compute resource.
+	 * @param commandInfo
+	 * @return <code>true</code> if command successfully executed, <code>false</code> otherwise.
+	 * @throws SSHApiException
+	 */
+	public boolean execute(CommandInfo commandInfo) throws WorkerException;
+
+	/**
+	 * This method can be used to get created ssh session
+	 * to reuse the created session.
+	 */
+	public Session getSession() throws WorkerException;
+
+	/**
+	 * This method can be used to close the connections initialized
+	 * to handle graceful shutdown of the system
+	 */
+	public void disconnect() throws WorkerException;
+
+	/**
+	 * This gives the server Info
+	 */
+	public ServerInfo getServerInfo();
+
+    public AuthenticationInfo getAuthentication();
+    enum DIRECTION {
+        TO,
+        FROM
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
new file mode 100644
index 0000000..a451d2b
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * Encapsulate server information.
+ */
+public class ServerInfo {
+    private static int DEFAULT_PORT = 22;
+    private String host;
+    private String userName;
+    private int port;
+    private String credentialToken;
+
+    public ServerInfo(String userName, String host, String credentialToken) {
+        this(userName, host, credentialToken, DEFAULT_PORT);
+    }
+
+    public ServerInfo(String userName, String host, String credentialToken, int port) {
+        this.host = host;
+        this.userName = userName;
+        this.port = port;
+        this.credentialToken = credentialToken;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getCredentialToken() {
+        return credentialToken;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
new file mode 100644
index 0000000..1241435
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataTransferTaskConfig {
+	private DataMovementProtocol transferProtocol;
+	private String taskClass;
+	private Map<String,String> properties = new HashMap<>();
+
+
+	public DataMovementProtocol getTransferProtocol() {
+		return transferProtocol;
+	}
+
+	public void setTransferProtocol(DataMovementProtocol transferProtocol) {
+		this.transferProtocol = transferProtocol;
+	}
+
+	public String getTaskClass() {
+		return taskClass;
+	}
+
+	public void setTaskClass(String taskClass) {
+		this.taskClass = taskClass;
+	}
+
+	public void addProperty(String key, String value) {
+		properties.put(key, value);
+	}
+
+	public void addProperties(Map<String, String> propMap) {
+		propMap.forEach(properties::put);
+	}
+
+	public Map<String,String> getProperties(){
+		return properties;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
new file mode 100644
index 0000000..5744345
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobSubmitterTaskConfig {
+	private JobSubmissionProtocol submissionProtocol;
+	private String taskClass;
+	private Map<String,String> properties = new HashMap<>();
+
+	public JobSubmissionProtocol getSubmissionProtocol() {
+		return submissionProtocol;
+	}
+
+	public void setSubmissionProtocol(JobSubmissionProtocol submissionProtocol) {
+		this.submissionProtocol = submissionProtocol;
+	}
+
+	public String getTaskClass() {
+		return taskClass;
+	}
+
+	public void setTaskClass(String taskClass) {
+		this.taskClass = taskClass;
+	}
+
+	public void addProperty(String key, String value) {
+		properties.put(key, value);
+	}
+
+	public void addProperties(Map<String, String> propMap) {
+		propMap.forEach(properties::put);
+	}
+
+	public Map<String,String> getProperties(){
+		return properties;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
new file mode 100644
index 0000000..12eed5a
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+
+import java.util.List;
+
+public class ResourceConfig {
+	private ResourceJobManagerType jobManagerType;
+	private String commandOutputParser;
+	private String emailParser;
+	private List<String> resourceEmailAddresses;
+
+	public ResourceJobManagerType getJobManagerType() {
+		return jobManagerType;
+	}
+
+	public void setJobManagerType(ResourceJobManagerType jobManagerType) {
+		this.jobManagerType = jobManagerType;
+	}
+
+	public String getCommandOutputParser() {
+		return commandOutputParser;
+	}
+
+	public void setCommandOutputParser(String commandOutputParser) {
+		this.commandOutputParser = commandOutputParser;
+	}
+
+	public String getEmailParser() {
+		return emailParser;
+	}
+
+	public void setEmailParser(String emailParser) {
+		this.emailParser = emailParser;
+	}
+
+	public List<String> getResourceEmailAddresses() {
+		return resourceEmailAddresses;
+	}
+
+	public void setResourceEmailAddresses(List<String> resourceEmailAddresses) {
+		this.resourceEmailAddresses = resourceEmailAddresses;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
new file mode 100644
index 0000000..5d2e372
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
@@ -0,0 +1,150 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class WorkerYamlConfigruation {
+
+	private static final String CONFIG = "config";
+	private static final String JOB_SUBMITTERS = "jobSubmitters";
+	private static final String SUBMISSIO_PROTOCOL = "submissionProtocol";
+	private static final String TASK_CLASS = "taskClass";
+	private static final String COMMON_TASKS = "commonTasks";
+	private static final String TYPE = "type";
+	private static final String FILE_TRANSFER_TASKS = "fileTransferTasks";
+	private static final String TRANSFER_PROTOCOL = "transferProtocol";
+	private static final String RESOURCES = "resources";
+	private static final String JOB_MANAGER_TYPE = "jobManagerType";
+	private static final String COMMAND_OUTPUT_PARSER = "commandOutputParser";
+	private static final String EMAIL_PARSER = "emailParser";
+	private static final String RESOURCE_EMAIL_ADDRESS = "resourceEmailAddresses";
+	private static final String PROPERTIES = "properties";
+
+	private List<JobSubmitterTaskConfig> jobSubmitters = new ArrayList<>();
+	private List<DataTransferTaskConfig> fileTransferTasks = new ArrayList<>();
+	private List<ResourceConfig> resources = new ArrayList<>();
+
+
+	public WorkerYamlConfigruation() throws WorkerException {
+		InputStream resourceAsStream = WorkerYamlConfigruation.class.getClassLoader().
+				getResourceAsStream("gfac-config.yaml");
+		parse(resourceAsStream);
+	}
+
+	private void parse(InputStream resourceAsStream) throws WorkerException {
+		if (resourceAsStream == null) {
+			throw new WorkerException("Configuration file{gfac-config.yaml} is not fund");
+		}
+		Yaml yaml = new Yaml();
+		Object load = yaml.load(resourceAsStream);
+		if (load == null) {
+			throw new WorkerException("Yaml configuration object null");
+		}
+
+		if (load instanceof Map) {
+			Map<String, Object> loadMap = (Map<String, Object>) load;
+			String identifier;
+			List<Map<String,Object >> jobSubYamls = (List<Map<String, Object>>) loadMap.get(JOB_SUBMITTERS);
+			JobSubmitterTaskConfig jobSubmitterTaskConfig;
+			if (jobSubYamls != null) {
+				for (Map<String, Object> jobSub : jobSubYamls) {
+					jobSubmitterTaskConfig = new JobSubmitterTaskConfig();
+					identifier = ((String) jobSub.get(SUBMISSIO_PROTOCOL));
+					jobSubmitterTaskConfig.setSubmissionProtocol(JobSubmissionProtocol.valueOf(identifier));
+					jobSubmitterTaskConfig.setTaskClass(((String) jobSub.get(TASK_CLASS)));
+					Object propertiesObj = jobSub.get(PROPERTIES);
+					List propertiesList;
+					if (propertiesObj instanceof List) {
+						propertiesList = ((List) propertiesObj);
+						if (propertiesList.size() > 0) {
+							Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+							jobSubmitterTaskConfig.addProperties(props);
+						}
+					}
+					jobSubmitters.add(jobSubmitterTaskConfig);
+				}
+			}
+
+			List<Map<String, Object>> fileTransYamls = (List<Map<String, Object>>) loadMap.get(FILE_TRANSFER_TASKS);
+			DataTransferTaskConfig dataTransferTaskConfig;
+			if (fileTransYamls != null) {
+				for (Map<String, Object> fileTransConfig : fileTransYamls) {
+					dataTransferTaskConfig = new DataTransferTaskConfig();
+					identifier = ((String) fileTransConfig.get(TRANSFER_PROTOCOL));
+					dataTransferTaskConfig.setTransferProtocol(DataMovementProtocol.valueOf(identifier));
+					dataTransferTaskConfig.setTaskClass(((String) fileTransConfig.get(TASK_CLASS)));
+					Object propertiesObj = fileTransConfig.get(PROPERTIES);
+					List propertiesList;
+					if (propertiesObj instanceof List) {
+						propertiesList = (List) propertiesObj;
+						if (propertiesList.size() > 0) {
+							Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+							dataTransferTaskConfig.addProperties(props);
+						}
+					}
+					fileTransferTasks.add(dataTransferTaskConfig);
+				}
+			}
+
+			List<Map<String, Object>> resourcesYaml = (List<Map<String, Object>>) loadMap.get(RESOURCES);
+			ResourceConfig resourceConfig;
+			if (resourcesYaml != null) {
+				for (Map<String, Object> resource : resourcesYaml) {
+					resourceConfig = new ResourceConfig();
+					identifier = resource.get(JOB_MANAGER_TYPE).toString();
+					resourceConfig.setJobManagerType(ResourceJobManagerType.valueOf(identifier));
+					resourceConfig.setCommandOutputParser(resource.get(COMMAND_OUTPUT_PARSER).toString());
+                    Object emailParser = resource.get(EMAIL_PARSER);
+                    if (emailParser != null){
+                        resourceConfig.setEmailParser(emailParser.toString());
+                    }
+					List<String> emailAddressList = (List<String>) resource.get(RESOURCE_EMAIL_ADDRESS);
+					resourceConfig.setResourceEmailAddresses(emailAddressList);
+					resources.add(resourceConfig);
+				}
+			}
+		}
+	}
+
+	public List<JobSubmitterTaskConfig> getJobSbumitters() {
+		return jobSubmitters;
+	}
+
+	public List<DataTransferTaskConfig> getFileTransferTasks() {
+		return fileTransferTasks;
+	}
+
+	public List<ResourceConfig> getResourceConfiguration() {
+		return resources;
+	}
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
new file mode 100644
index 0000000..53bda5b
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.worker.commons.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.commons.cluster.RemoteCluster;
+import org.apache.airavata.worker.commons.cluster.ServerInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class ProcessContext {
+
+	private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
+	// process model
+	private ExperimentCatalog experimentCatalog;
+	private AppCatalog appCatalog;
+	private CuratorFramework curatorClient;
+	private Publisher statusPublisher;
+	private final String processId;
+	private final String gatewayId;
+	private final String tokenId;
+	private ProcessModel processModel;
+	private String workingDir;
+	private String scratchLocation;
+	private String inputDir;
+	private String outputDir;
+	private String localWorkingDir;
+	private GatewayResourceProfile gatewayResourceProfile;
+	private ComputeResourcePreference gatewayComputeResourcePreference;
+	private StoragePreference gatewayStorageResourcePreference;
+	private UserResourceProfile userResourceProfile;
+	private UserComputeResourcePreference userComputeResourcePreference;
+	private UserStoragePreference userStoragePreference;
+	private ComputeResourceDescription computeResourceDescription;
+	private ApplicationDeploymentDescription applicationDeploymentDescription;
+	private ApplicationInterfaceDescription applicationInterfaceDescription;
+	private RemoteCluster jobSubmissionRemoteCluster;
+	private RemoteCluster dataMovementRemoteCluster;
+	private Map<String, String> sshProperties;
+	private String stdoutLocation;
+	private String stderrLocation;
+	private JobSubmissionProtocol jobSubmissionProtocol;
+	private DataMovementProtocol dataMovementProtocol;
+	private JobModel jobModel;
+    private StorageResourceDescription storageResource;
+	private MonitorMode monitorMode;
+	private ResourceJobManager resourceJobManager;
+	private boolean handOver;
+	private boolean cancel;
+    private ServerInfo serverInfo;
+    private List<String> taskExecutionOrder;
+    private List<TaskModel> taskList;
+    private Map<String, TaskModel> taskMap;
+    private boolean pauseTaskExecution = false;  // Task can pause task execution by setting this value
+    private boolean complete = false; // all tasks executed?
+    private boolean recovery = false; // is process in recovery mode?
+    private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+	private boolean acknowledge;
+	private SSHKeyAuthentication sshKeyAuthentication;
+	private boolean recoveryWithCancel = false;
+	private String usageReportingGatewayId;
+
+	/**
+	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
+	 * unless you have access it previously. Once that property access using the api,it will be set to correct value.
+	 */
+	private ProcessContext(String processId, String gatewayId, String tokenId) {
+		this.processId = processId;
+		this.gatewayId = gatewayId;
+		this.tokenId = tokenId;
+	}
+
+	public ExperimentCatalog getExperimentCatalog() {
+		return experimentCatalog;
+	}
+
+	public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+		this.experimentCatalog = experimentCatalog;
+	}
+
+	public AppCatalog getAppCatalog() {
+		return appCatalog;
+	}
+
+	public void setAppCatalog(AppCatalog appCatalog) {
+		this.appCatalog = appCatalog;
+	}
+
+	public String getGatewayId() {
+		return gatewayId;
+	}
+
+	public String getTokenId() {
+		return tokenId;
+	}
+
+	public String getProcessId() {
+		return processId;
+	}
+
+	public CuratorFramework getCuratorClient() {
+		return curatorClient;
+	}
+
+	public void setCuratorClient(CuratorFramework curatorClient) {
+		this.curatorClient = curatorClient;
+	}
+
+	public Publisher getStatusPublisher() {
+		return statusPublisher;
+	}
+
+	public void setStatusPublisher(Publisher statusPublisher) {
+		this.statusPublisher = statusPublisher;
+	}
+
+	public ProcessModel getProcessModel() {
+		return processModel;
+	}
+
+	public void setProcessModel(ProcessModel processModel) {
+		this.processModel = processModel;
+	}
+
+	public String getWorkingDir() {
+		if (workingDir == null) {
+            if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
+                workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
+            }else {
+                String scratchLocation = getScratchLocation();
+                workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" +
+                        processId);
+            }
+		}
+		return workingDir;
+	}
+
+	public String getScratchLocation() {
+		if (scratchLocation == null) {
+			if (isUseUserCRPref() &&
+					userComputeResourcePreference != null &&
+					isValid(userComputeResourcePreference.getScratchLocation())) {
+				scratchLocation = userComputeResourcePreference.getScratchLocation();
+			} else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+				scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+			}else {
+				scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+			}
+		}
+		return scratchLocation;
+	}
+
+	public void setWorkingDir(String workingDir) {
+		this.workingDir = workingDir;
+	}
+
+	public GatewayResourceProfile getGatewayResourceProfile() {
+		return gatewayResourceProfile;
+	}
+
+	public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+		this.gatewayResourceProfile = gatewayResourceProfile;
+	}
+
+	public UserResourceProfile getUserResourceProfile() {
+		return userResourceProfile;
+	}
+
+	public void setUserResourceProfile(UserResourceProfile userResourceProfile) {
+		this.userResourceProfile = userResourceProfile;
+	}
+
+	private UserComputeResourcePreference getUserComputeResourcePreference() {
+		return userComputeResourcePreference;
+	}
+
+	public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
+		this.userComputeResourcePreference = userComputeResourcePreference;
+	}
+
+	public UserStoragePreference getUserStoragePreference() {
+		return userStoragePreference;
+	}
+
+	public void setUserStoragePreference(UserStoragePreference userStoragePreference) {
+		this.userStoragePreference = userStoragePreference;
+	}
+
+	public StoragePreference getGatewayStorageResourcePreference() {
+		return gatewayStorageResourcePreference;
+	}
+
+	public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+		this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+	}
+
+	public RemoteCluster getJobSubmissionRemoteCluster() {
+		return jobSubmissionRemoteCluster;
+	}
+
+	public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
+		this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
+	}
+
+	public RemoteCluster getDataMovementRemoteCluster() {
+		return dataMovementRemoteCluster;
+	}
+
+	public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
+		this.dataMovementRemoteCluster = dataMovementRemoteCluster;
+	}
+
+	public Map<String, String> getSshProperties() {
+		return sshProperties;
+	}
+
+	public void setSshProperties(Map<String, String> sshProperties) {
+		this.sshProperties = sshProperties;
+	}
+
+	public ComputeResourceDescription getComputeResourceDescription() {
+		return computeResourceDescription;
+	}
+
+	public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+		this.computeResourceDescription = computeResourceDescription;
+	}
+
+	public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+		return applicationDeploymentDescription;
+	}
+
+	public void setApplicationDeploymentDescription(ApplicationDeploymentDescription
+			                                                applicationDeploymentDescription) {
+		this.applicationDeploymentDescription = applicationDeploymentDescription;
+	}
+
+	public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+		return applicationInterfaceDescription;
+	}
+
+	public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+		this.applicationInterfaceDescription = applicationInterfaceDescription;
+	}
+
+	public String getStdoutLocation() {
+		return stdoutLocation;
+	}
+
+	public void setStdoutLocation(String stdoutLocation) {
+		this.stdoutLocation = stdoutLocation;
+	}
+
+	public String getStderrLocation() {
+		return stderrLocation;
+	}
+
+	public void setStderrLocation(String stderrLocation) {
+		this.stderrLocation = stderrLocation;
+	}
+
+	public void setOutputDir(String outputDir) {
+		this.outputDir = outputDir;
+	}
+
+	public String getOutputDir() {
+		if (outputDir == null) {
+			outputDir = getWorkingDir();
+		}
+		return outputDir;
+	}
+
+	public String getInputDir() {
+		if (inputDir == null) {
+			inputDir = getWorkingDir();
+		}
+		return inputDir;
+	}
+
+	public void setInputDir(String inputDir) {
+		this.inputDir = inputDir;
+	}
+
+	public JobSubmissionProtocol getJobSubmissionProtocol() {
+		if (jobSubmissionProtocol == null) {
+			jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+		}
+		return jobSubmissionProtocol;
+	}
+
+	public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
+		this.jobSubmissionProtocol = jobSubmissionProtocol;
+	}
+
+	public DataMovementProtocol getDataMovementProtocol() {
+		if (dataMovementProtocol == null) {
+			dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+		}
+		return dataMovementProtocol;
+	}
+
+	public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
+		this.dataMovementProtocol = dataMovementProtocol;
+	}
+
+    public String getTaskDag() {
+        return getProcessModel().getTaskDag();
+    }
+
+    public List<TaskModel> getTaskList() {
+        if (taskList == null) {
+            synchronized (TaskModel.class){
+                if (taskList == null) {
+                    taskList = getProcessModel().getTasks();
+                }
+            }
+        }
+        return taskList;
+    }
+
+
+    public List<String> getTaskExecutionOrder() {
+        return taskExecutionOrder;
+    }
+
+    public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+        this.taskExecutionOrder = taskExecutionOrder;
+    }
+
+    public Map<String, TaskModel> getTaskMap() {
+        if (taskMap == null) {
+            synchronized (TaskModel.class) {
+                if (taskMap == null) {
+                    taskMap = new HashMap<>();
+                    for (TaskModel taskModel : getTaskList()) {
+                        taskMap.put(taskModel.getTaskId(), taskModel);
+                    }
+                }
+            }
+        }
+        return taskMap;
+    }
+
+	public JobModel getJobModel() {
+		if (jobModel == null) {
+			jobModel = new JobModel();
+			jobModel.setProcessId(processId);
+			jobModel.setWorkingDir(getWorkingDir());
+			jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+		}
+		return jobModel;
+	}
+
+	public void setJobModel(JobModel jobModel) {
+		this.jobModel = jobModel;
+	}
+
+	private ComputeResourcePreference getGatewayComputeResourcePreference() {
+		return gatewayComputeResourcePreference;
+	}
+
+	public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+		this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+	}
+
+	public ProcessState getProcessState() {
+		if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0)
+			return processModel.getProcessStatuses().get(0).getState();
+		else
+			return null;
+	}
+
+	public void setProcessStatus(ProcessStatus status) {
+		if (status != null) {
+			log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
+					getProcessState().name(), status.getState().name());
+			List<ProcessStatus> processStatuses = new ArrayList<>();
+			processStatuses.add(status);
+			processModel.setProcessStatuses(processStatuses);
+		}
+	}
+
+	public ProcessStatus getProcessStatus(){
+		if(processModel.getProcessStatuses() != null)
+			return processModel.getProcessStatuses().get(0);
+		else
+			return null;
+	}
+
+	public String getComputeResourceId() {
+		if (isUseUserCRPref() &&
+				userComputeResourcePreference != null &&
+				isValid(userComputeResourcePreference.getComputeResourceId())) {
+			return userComputeResourcePreference.getComputeResourceId();
+		} else {
+			return gatewayComputeResourcePreference.getComputeResourceId();
+		}
+	}
+
+	public String getComputeResourceCredentialToken(){
+		if (isUseUserCRPref()) {
+			if (userComputeResourcePreference != null &&
+					isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+				return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+			} else {
+				return userResourceProfile.getCredentialStoreToken();
+			}
+		} else {
+			if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+				return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+			} else {
+				return gatewayResourceProfile.getCredentialStoreToken();
+			}
+		}
+	}
+
+	public String getStorageResourceCredentialToken(){
+		if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
+			return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
+		} else {
+			return gatewayResourceProfile.getCredentialStoreToken();
+		}
+	}
+
+	public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
+		return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+	}
+
+	public DataMovementProtocol getPreferredDataMovementProtocol() {
+		return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+	}
+
+	public void setMonitorMode(MonitorMode monitorMode) {
+		this.monitorMode = monitorMode;
+	}
+
+	public MonitorMode getMonitorMode() {
+		return monitorMode;
+	}
+
+	public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+		this.resourceJobManager = resourceJobManager;
+	}
+
+	public ResourceJobManager getResourceJobManager() {
+		return resourceJobManager;
+	}
+
+	public String getLocalWorkingDir() {
+		return localWorkingDir;
+	}
+
+	public void setLocalWorkingDir(String localWorkingDir) {
+		this.localWorkingDir = localWorkingDir;
+	}
+
+	public String getExperimentId() {
+		return processModel.getExperimentId();
+	}
+
+	public boolean isHandOver() {
+		return handOver;
+	}
+
+	public void setHandOver(boolean handOver) {
+		this.handOver = handOver;
+	}
+
+	public boolean isCancel() {
+		return cancel;
+	}
+
+	public void setCancel(boolean cancel) {
+		this.cancel = cancel;
+	}
+
+	public boolean isInterrupted(){
+		return this.cancel || this.handOver;
+	}
+
+    public String getCurrentExecutingTaskId() {
+        if (currentExecutingTaskModel != null) {
+            return currentExecutingTaskModel.getTaskId();
+        }
+        return null;
+    }
+
+    public boolean isPauseTaskExecution() {
+        return pauseTaskExecution;
+    }
+
+    public void setPauseTaskExecution(boolean pauseTaskExecution) {
+        this.pauseTaskExecution = pauseTaskExecution;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    public boolean isRecovery() {
+        return recovery;
+    }
+
+    public void setRecovery(boolean recovery) {
+        this.recovery = recovery;
+    }
+
+    public TaskModel getCurrentExecutingTaskModel() {
+        return currentExecutingTaskModel;
+    }
+
+    public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+        this.currentExecutingTaskModel = currentExecutingTaskModel;
+    }
+
+    public StorageResourceDescription getStorageResource() {
+        return storageResource;
+    }
+
+    public void setStorageResource(StorageResourceDescription storageResource) {
+        this.storageResource = storageResource;
+    }
+
+	public void setAcknowledge(boolean acknowledge) {
+		this.acknowledge = acknowledge;
+	}
+
+	public boolean isAcknowledge() {
+		return acknowledge;
+	}
+
+	public boolean isRecoveryWithCancel() {
+		return recoveryWithCancel;
+	}
+
+	public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+		this.recoveryWithCancel = recoveryWithCancel;
+	}
+
+	public boolean isUseUserCRPref() {
+		return getProcessModel().isUseUserCRPref();
+	}
+
+	public String getComputeResourceLoginUserName(){
+		if (isUseUserCRPref() &&
+				userComputeResourcePreference != null &&
+				isValid(userComputeResourcePreference.getLoginUserName())) {
+			return userComputeResourcePreference.getLoginUserName();
+		} else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
+			return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
+		} else {
+			return gatewayComputeResourcePreference.getLoginUserName();
+		}
+	}
+
+	public String getStorageResourceLoginUserName(){
+		return gatewayStorageResourcePreference.getLoginUserName();
+	}
+
+	public String getStorageFileSystemRootLocation(){
+		return gatewayStorageResourcePreference.getFileSystemRootLocation();
+	}
+
+	public String getStorageResourceId() {
+		return gatewayStorageResourcePreference.getStorageResourceId();
+	}
+
+	private ComputationalResourceSchedulingModel getProcessCRSchedule() {
+		if (getProcessModel() != null) {
+			return getProcessModel().getProcessResourceSchedule();
+		} else {
+			return null;
+		}
+	}
+
+	public ServerInfo getComputeResourceServerInfo(){
+		return new ServerInfo(getComputeResourceLoginUserName(),
+				getComputeResourceDescription().getHostName(),
+				getComputeResourceCredentialToken());
+	}
+
+	public ServerInfo getStorageResourceServerInfo() {
+		return new ServerInfo(getStorageResourceLoginUserName(),
+				getStorageResource().getHostName(),
+				getStorageResourceCredentialToken());
+	}
+
+	private boolean isValid(String str) {
+		return str != null && !str.trim().isEmpty();
+	}
+
+	public String getUsageReportingGatewayId() {
+		return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+	}
+
+	public String getAllocationProjectNumber() {
+		return gatewayComputeResourcePreference.getAllocationProjectNumber();
+	}
+
+	public String getReservation() {
+		long start = 0, end = 0;
+		String reservation = null;
+		if (isUseUserCRPref() &&
+				userComputeResourcePreference != null &&
+				isValid(userComputeResourcePreference.getReservation())) {
+			reservation = userComputeResourcePreference.getReservation();
+			start = userComputeResourcePreference.getReservationStartTime();
+			end = userComputeResourcePreference.getReservationEndTime();
+		} else {
+			reservation = gatewayComputeResourcePreference.getReservation();
+			start = gatewayComputeResourcePreference.getReservationStartTime();
+			end = gatewayComputeResourcePreference.getReservationEndTime();
+		}
+		if (reservation != null && start > 0 && start < end) {
+			long now = Calendar.getInstance().getTimeInMillis();
+			if (now > start && now < end) {
+				return reservation;
+			}
+		}
+		return null;
+	}
+
+	public String getQualityOfService() {
+		if (isUseUserCRPref() &&
+				userComputeResourcePreference != null &&
+				isValid(userComputeResourcePreference.getQualityOfService())) {
+			return userComputeResourcePreference.getQualityOfService();
+		} else {
+			return gatewayComputeResourcePreference.getQualityOfService();
+		}
+	}
+
+
+	public String getQueueName() {
+		if (isUseUserCRPref() &&
+				userComputeResourcePreference != null &&
+				isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
+			return userComputeResourcePreference.getPreferredBatchQueue();
+		} else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
+			return processModel.getProcessResourceSchedule().getQueueName();
+		} else {
+			return gatewayComputeResourcePreference.getPreferredBatchQueue();
+		}
+	}
+
+	public static class ProcessContextBuilder{
+		private final String processId;
+		private final String gatewayId;
+		private final String tokenId;
+		private ExperimentCatalog experimentCatalog;
+		private AppCatalog appCatalog;
+		private CuratorFramework curatorClient;
+		private Publisher statusPublisher;
+		private GatewayResourceProfile gatewayResourceProfile;
+		private ComputeResourcePreference gatewayComputeResourcePreference;
+		private StoragePreference gatewayStorageResourcePreference;
+		private ProcessModel processModel;
+
+		public ProcessContextBuilder(String processId, String gatewayId, String tokenId) throws WorkerException {
+			if (notValid(processId) || notValid(gatewayId) || notValid(tokenId)) {
+				throwError("Process Id, Gateway Id and tokenId must be not null");
+			}
+			this.processId = processId;
+			this.gatewayId = gatewayId;
+			this.tokenId = tokenId;
+		}
+
+		public ProcessContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+			this.gatewayResourceProfile = gatewayResourceProfile;
+			return this;
+		}
+
+		public ProcessContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+			this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+			return this;
+		}
+
+		public ProcessContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+			this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+            return this;
+		}
+
+		public ProcessContextBuilder setProcessModel(ProcessModel processModel) {
+			this.processModel = processModel;
+			return this;
+		}
+
+		public ProcessContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+			this.experimentCatalog = experimentCatalog;
+			return this;
+		}
+
+		public ProcessContextBuilder setAppCatalog(AppCatalog appCatalog) {
+			this.appCatalog = appCatalog;
+			return this;
+		}
+
+		public ProcessContextBuilder setCuratorClient(CuratorFramework curatorClient) {
+			this.curatorClient = curatorClient;
+			return this;
+		}
+
+		public ProcessContextBuilder setStatusPublisher(Publisher statusPublisher) {
+			this.statusPublisher = statusPublisher;
+			return this;
+		}
+
+		public ProcessContext build() throws WorkerException {
+			if (notValid(gatewayResourceProfile)) {
+				throwError("Invalid GatewayResourceProfile");
+			}
+			if (notValid(gatewayComputeResourcePreference)) {
+				throwError("Invalid Gateway ComputeResourcePreference");
+			}
+			if (notValid(gatewayStorageResourcePreference)) {
+				throwError("Invalid Gateway StoragePreference");
+			}
+			if (notValid(processModel)) {
+				throwError("Invalid Process Model");
+			}
+			if (notValid(appCatalog)) {
+				throwError("Invalid AppCatalog");
+			}
+			if (notValid(experimentCatalog)) {
+				throwError("Invalid Experiment catalog");
+			}
+			if (notValid(curatorClient)) {
+				throwError("Invalid Curator Client");
+			}
+			if (notValid(statusPublisher)) {
+				throwError("Invalid Status Publisher");
+			}
+
+			ProcessContext pc = new ProcessContext(processId, gatewayId, tokenId);
+			pc.setAppCatalog(appCatalog);
+			pc.setExperimentCatalog(experimentCatalog);
+			pc.setCuratorClient(curatorClient);
+			pc.setStatusPublisher(statusPublisher);
+			pc.setProcessModel(processModel);
+			pc.setGatewayResourceProfile(gatewayResourceProfile);
+			pc.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
+			pc.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
+
+			return pc;
+		}
+
+		private boolean notValid(Object value) {
+			return value == null;
+		}
+
+		private void throwError(String msg) throws WorkerException {
+			throw new WorkerException(msg);
+		}
+
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
new file mode 100644
index 0000000..f94ebd5
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TaskContext {
+	private static final Logger log = LoggerFactory.getLogger(TaskContext.class);
+
+	private TaskModel taskModel;
+	private ProcessContext parentProcessContext;
+    private InputDataObjectType processInput;
+    private OutputDataObjectType processOutput;
+    private Object subTaskModel = null;
+	private boolean isCancel = false;
+
+	public TaskModel getTaskModel() {
+		return taskModel;
+	}
+
+	public void setTaskModel(TaskModel taskModel) {
+		this.taskModel = taskModel;
+	}
+
+	public ProcessContext getParentProcessContext() {
+		return parentProcessContext;
+	}
+
+	public void setParentProcessContext(ProcessContext parentProcessContext) {
+		this.parentProcessContext = parentProcessContext;
+	}
+
+	public String getWorkingDir() {
+		return getParentProcessContext().getWorkingDir();
+	}
+
+	public void setTaskStatus(TaskStatus taskStatus) {
+		log.info("expId: {}, processId: {}, taskId: {}, type: {} : Task status changed {} -> {}", parentProcessContext
+				.getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+				getTaskState().name(), taskStatus .getState().name());
+		List<TaskStatus> taskStatuses = new ArrayList<>();
+		taskStatuses.add(taskStatus);
+		taskModel.setTaskStatuses(taskStatuses);
+	}
+
+	public TaskStatus getTaskStatus() {
+		if(taskModel.getTaskStatuses() != null)
+			return taskModel.getTaskStatuses().get(0);
+		else
+			return null;
+	}
+
+	public TaskState getTaskState() {
+		if(taskModel.getTaskStatuses() != null)
+			return taskModel.getTaskStatuses().get(0).getState();
+		else
+			return null;
+	}
+
+	public TaskTypes getTaskType() {
+		return taskModel.getTaskType();
+	}
+
+	public String getTaskId() {
+		return taskModel.getTaskId();
+	}
+
+	public String getLocalWorkingDir() {
+		return getParentProcessContext().getLocalWorkingDir();
+	}
+
+    public InputDataObjectType getProcessInput() {
+        return processInput;
+    }
+
+    public void setProcessInput(InputDataObjectType processInput) {
+        this.processInput = processInput;
+    }
+
+    public OutputDataObjectType getProcessOutput() {
+        return processOutput;
+    }
+
+    public void setProcessOutput(OutputDataObjectType processOutput) {
+        this.processOutput = processOutput;
+    }
+
+	public String getProcessId() {
+		return parentProcessContext.getProcessId();
+	}
+
+	public String getExperimentId() {
+		return parentProcessContext.getExperimentId();
+	}
+
+    public Object getSubTaskModel() throws TException {
+        if (subTaskModel == null) {
+            subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
+        }
+        return subTaskModel;
+    }
+
+	public boolean isCancel() {
+		return isCancel;
+	}
+
+	public void setCancel(boolean cancel) {
+		isCancel = cancel;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
new file mode 100644
index 0000000..0dcdd0e
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ * An exception class to wrap SSH command execution related errors.
+ */
+public class SSHApiException extends Exception {
+
+    public SSHApiException(String message) {
+        super(message);
+    }
+
+    public SSHApiException(String message, Exception e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
new file mode 100644
index 0000000..334ee0f
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkerException extends Exception {
+    private static final Logger log = LoggerFactory.getLogger(WorkerException.class);
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    public WorkerException(String s) {
+        super(s);
+    }
+
+    public WorkerException(Exception e) {
+        super(e);
+        log.error(e.getMessage(),e);
+    }
+
+    public WorkerException(String s, Throwable throwable) {
+        super(s, throwable);
+        log.error(s,throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
new file mode 100644
index 0000000..b7a8b45
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/Task.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.commons.context.TaskContext;
+
+import java.util.Map;
+
+/**
+ * All Tasks should inherit this interface.
+ */
+public interface Task {
+
+	/**
+	 * Task initialization method, this method will be invoked after create a new task instance.
+	 * @param propertyMap
+	 * @throws TaskException
+	 */
+	public void init(Map<String, String> propertyMap) throws TaskException;
+
+	/**
+	 * This method will be called at the first time of task chain execution. This method should called before recover
+	 * method. For a given task chain execute method only call one time. recover method may be called more than once.
+	 * @param taskContext
+	 * @return completed task status if success otherwise failed task status.
+	 */
+	public TaskStatus execute(TaskContext taskContext);
+
+	/**
+	 * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
+	 * This method may be called zero or few time in a process chain.
+	 * @param taskContext
+	 * @return completed task status if success otherwise failed task status.
+	 */
+	public TaskStatus recover(TaskContext taskContext);
+
+	/**
+	 * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
+	 * @return type of this task object
+	 */
+	public TaskTypes getType();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
new file mode 100644
index 0000000..d290881
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+public class TaskException extends Exception {
+	private static final long serialVersionUID = 8662332011259328779L;
+
+	public TaskException() {
+		super();
+	}
+
+	public TaskException(String message) {
+		super(message);
+	}
+
+	public TaskException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public TaskException(Throwable cause) {
+		super(cause);
+	}
+
+	protected TaskException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+		super(message, cause, enableSuppression, writableStackTrace);
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
new file mode 100644
index 0000000..022cb79
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/JobManagerConfiguration.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.worker.commons.utils;
+
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.cluster.RawCommandInfo;
+
+/**
+ * Created by goshenoy on 4/12/17.
+ */
+public interface JobManagerConfiguration {
+
+    public RawCommandInfo getCancelCommand(String jobID);
+
+    public String getJobDescriptionTemplateName();
+
+    public RawCommandInfo getMonitorCommand(String jobID);
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
+    public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName);
+
+    public String getScriptExtension();
+
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath);
+
+    public OutputParser getParser();
+
+    public String getInstalledPath();
+
+    public String getBaseCancelCommand();
+
+    public String getBaseMonitorCommand();
+
+    public String getBaseSubmitCommand();
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
new file mode 100644
index 0000000..5fa5fc4
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.utils;
+
+public class WorkerConstants {
+	public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
+	public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+    public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
+
+	public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
+	public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+	public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+    public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
+
+
+	public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
+    public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
+    public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
+	public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+	public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
+	public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
+	public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
+	public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
+    public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
+	public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
+	public static final String NEWLINE = System.getProperty("line.separator");
+	public static final String INPUT_DATA_DIR_VAR_NAME = "input";
+	public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
+	public static final int DEFAULT_GSI_FTP_PORT = 2811;
+	public static final String _127_0_0_1 = "127.0.0.1";
+	public static final String LOCALHOST = "localhost";
+
+	public static final String MULTIPLE_INPUTS_SPLITTER = ",";
+
+	public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
+	public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
+	public static final String PROP_BROKER_URL = "broker.url";
+	public static final String PROP_TOPIC = "topic";
+	public static final String SPACE = " ";
+	public static final int COMMAND_EXECUTION_TIMEOUT = 5;
+	public static final String EXECUTABLE_NAME = "run.sh";
+
+	public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
+    public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
+    public static final String MYPROXY_SERVER = "myproxy.server";
+    public static final String MYPROXY_SERVER_PORT = "myproxy.port";
+    public static final String MYPROXY_USER = "myproxy.username";
+    public static final String MYPROXY_PASS = "myproxy.password";
+    public static final String MYPROXY_LIFE = "myproxy.life";
+    /*
+     * SSH properties
+     */
+    public static final String SSH_PRIVATE_KEY = "private.ssh.key";
+    public static final String SSH_PUBLIC_KEY = "public.ssh.key";
+    public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
+    public static final String SSH_USER_NAME = "ssh.username";
+    public static final String SSH_PASSWORD = "ssh.password";
+    public static final String PROPERTY = "property";
+    public static final String NAME = "name";
+    public static final String VALUE = "value";
+    public static final String OUTPUT_DATA_DIR = "output.location";
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/56c67173/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
new file mode 100644
index 0000000..4ab403f
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.commons.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class WorkerFactory {
+    private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
+
+    private static boolean isWorkerConfigurationLoaded = false;
+    private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
+    private static Cache<String,Session> sessionCache;
+
+    public static void loadConfiguration() throws WorkerException {
+        if (!isWorkerConfigurationLoaded) {
+            WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+            try {
+                for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+                    resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+                }
+            } catch (Exception e) {
+                throw new WorkerException("Worker config issue", e);
+            }
+
+            sessionCache = CacheBuilder.newBuilder()
+                    .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
+                    .removalListener((RemovalListener<String, Session>) removalNotification -> {
+                        if (removalNotification.getValue().isConnected()) {
+                            log.info("Disconnecting ssh session with key: " + removalNotification.getKey());
+                            removalNotification.getValue().disconnect();
+                        }
+                        log.info("Removed ssh session with key: " + removalNotification.getKey());
+                    })
+                    .build();
+
+            isWorkerConfigurationLoaded =  true;
+        }
+    }
+
+    public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() {
+        return resources;
+    }
+
+    public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
+        return resources.get(resourceJobManagerType);
+    }
+}