You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/12 16:54:49 UTC
[1/2] airavata git commit: Adding workload commons
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt f93d4c330 -> 9d68f0b9f
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
new file mode 100644
index 0000000..5fa5fc4
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerConstants.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.utils;
+
+public class WorkerConstants {
+ public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
+ public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+ public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
+
+ public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
+ public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+ public static final String XPATH_EXPR_APPLICATION_PROVIDER = "']/OutHandlers/Handler";
+
+
+ public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='";
+ public static final String XPATH_EXPR_PROVIDER_ON_HOST = "/GFac/Provider[@host='";
+ public static final String XPATH_EXPR_PROVIDER_ON_SUBMISSION = "/GFac/Provider[@submission='";
+ public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
+ public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
+
+ public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
+ public static final String GFAC_CONFIG_SECURITY_ATTRIBUTE = "security";
+ public static final String GFAC_CONFIG_SUBMISSION_ATTRIBUTE = "submission";
+ public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
+ public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
+ public static final String NEWLINE = System.getProperty("line.separator");
+ public static final String INPUT_DATA_DIR_VAR_NAME = "input";
+ public static final String OUTPUT_DATA_DIR_VAR_NAME = "output";
+ public static final int DEFAULT_GSI_FTP_PORT = 2811;
+ public static final String _127_0_0_1 = "127.0.0.1";
+ public static final String LOCALHOST = "localhost";
+
+ public static final String MULTIPLE_INPUTS_SPLITTER = ",";
+
+ public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id";
+ public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id";
+ public static final String PROP_BROKER_URL = "broker.url";
+ public static final String PROP_TOPIC = "topic";
+ public static final String SPACE = " ";
+ public static final int COMMAND_EXECUTION_TIMEOUT = 5;
+ public static final String EXECUTABLE_NAME = "run.sh";
+
+ public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
+ public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
+ public static final String MYPROXY_SERVER = "myproxy.server";
+ public static final String MYPROXY_SERVER_PORT = "myproxy.port";
+ public static final String MYPROXY_USER = "myproxy.username";
+ public static final String MYPROXY_PASS = "myproxy.password";
+ public static final String MYPROXY_LIFE = "myproxy.life";
+ /*
+ * SSH properties
+ */
+ public static final String SSH_PRIVATE_KEY = "private.ssh.key";
+ public static final String SSH_PUBLIC_KEY = "public.ssh.key";
+ public static final String SSH_PRIVATE_KEY_PASS = "ssh.keypass";
+ public static final String SSH_USER_NAME = "ssh.username";
+ public static final String SSH_PASSWORD = "ssh.password";
+ public static final String PROPERTY = "property";
+ public static final String NAME = "name";
+ public static final String VALUE = "value";
+ public static final String OUTPUT_DATA_DIR = "output.location";
+
+
+}
[2/2] airavata git commit: Adding workload commons
Posted by ad...@apache.org.
Adding workload commons
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9d68f0b9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9d68f0b9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9d68f0b9
Branch: refs/heads/feature-workload-mgmt
Commit: 9d68f0b9f1a57f46bce250e9a676a832ad098237
Parents: f93d4c3
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Wed Apr 12 12:54:44 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Wed Apr 12 12:54:44 2017 -0400
----------------------------------------------------------------------
.../worker/task/envsetup/handler/sample | 1 -
modules/worker/worker-commons/pom.xml | 52 ++
.../authentication/AuthenticationInfo.java | 27 +
.../authentication/GSIAuthenticationInfo.java | 43 +
.../authentication/SSHKeyAuthentication.java | 90 +++
.../SSHPasswordAuthentication.java | 42 +
.../SSHPublicKeyAuthentication.java | 45 ++
.../SSHPublicKeyFileAuthentication.java | 45 ++
.../commons/cluster/AbstractRemoteCluster.java | 45 ++
.../worker/commons/cluster/CommandInfo.java | 34 +
.../worker/commons/cluster/CommandOutput.java | 56 ++
.../commons/cluster/JobSubmissionOutput.java | 87 ++
.../worker/commons/cluster/OutputParser.java | 68 ++
.../worker/commons/cluster/RawCommandInfo.java | 45 ++
.../worker/commons/cluster/RemoteCluster.java | 165 ++++
.../worker/commons/cluster/ServerInfo.java | 59 ++
.../worker/commons/context/ProcessContext.java | 806 +++++++++++++++++++
.../worker/commons/context/TaskContext.java | 139 ++++
.../commons/exceptions/SSHApiException.java | 34 +
.../commons/exceptions/WorkerException.java | 47 ++
.../org/apache/airavata/worker/commons/sample | 0
.../airavata/worker/commons/task/Task.java | 62 ++
.../worker/commons/task/TaskException.java | 44 +
.../worker/commons/utils/WorkerConstants.java | 82 ++
24 files changed, 2117 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/sample
----------------------------------------------------------------------
diff --git a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/sample b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/sample
deleted file mode 100644
index f6c4fd0..0000000
--- a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/sample
+++ /dev/null
@@ -1 +0,0 @@
-delete me
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/pom.xml b/modules/worker/worker-commons/pom.xml
new file mode 100644
index 0000000..8964a2f
--- /dev/null
+++ b/modules/worker/worker-commons/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>airavata-worker-commons</artifactId>
+ <name>Worker Commons - Holds commons for all workers</name>
+ <description>Environment setup task implementation.</description>
+ <url>http://airavata.apache.org/</url>
+
+ <dependencies>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-cpi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.53</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.5</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/AuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/AuthenticationInfo.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/AuthenticationInfo.java
new file mode 100644
index 0000000..51e94c8
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/AuthenticationInfo.java
@@ -0,0 +1,27 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * An empty interface that represents authentication data to the API.
+ */
+public interface AuthenticationInfo {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/GSIAuthenticationInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/GSIAuthenticationInfo.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/GSIAuthenticationInfo.java
new file mode 100644
index 0000000..db33fb2
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/GSIAuthenticationInfo.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+import org.ietf.jgss.GSSCredential;
+
+import java.util.Properties;
+
+/**
+ * Authentication data. Could be MyProxy user name, password, could be GSSCredentials
+ * or could be SSH keys.
+ */
+public abstract class GSIAuthenticationInfo implements AuthenticationInfo {
+
+ public Properties properties = new Properties();
+
+ public abstract GSSCredential getCredentials() throws SecurityException;
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHKeyAuthentication.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHKeyAuthentication.java
new file mode 100644
index 0000000..57a567e
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHKeyAuthentication.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 10/4/13
+ * Time: 2:39 PM
+ */
+
+/**
+ * Abstracts out common methods for SSH key authentication.
+ */
+public class SSHKeyAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private byte[] privateKey;
+ private byte[] publicKey;
+ private String passphrase;
+ private String knownHostsFilePath;
+ private String strictHostKeyChecking; // yes or no
+
+ public SSHKeyAuthentication() {
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public byte[] getPrivateKey() {
+ return privateKey;
+ }
+
+ public void setPrivateKey(byte[] privateKey) {
+ this.privateKey = privateKey;
+ }
+
+ public byte[] getPublicKey() {
+ return publicKey;
+ }
+
+ public void setPublicKey(byte[] publicKey) {
+ this.publicKey = publicKey;
+ }
+
+ public String getPassphrase() {
+ return passphrase;
+ }
+
+ public void setPassphrase(String passphrase) {
+ this.passphrase = passphrase;
+ }
+
+ public String getKnownHostsFilePath() {
+ return knownHostsFilePath;
+ }
+
+ public void setKnownHostsFilePath(String knownHostsFilePath) {
+ this.knownHostsFilePath = knownHostsFilePath;
+ }
+
+ public String getStrictHostKeyChecking() {
+ return strictHostKeyChecking;
+ }
+
+ public void setStrictHostKeyChecking(String strictHostKeyChecking) {
+ this.strictHostKeyChecking = strictHostKeyChecking;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPasswordAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPasswordAuthentication.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPasswordAuthentication.java
new file mode 100644
index 0000000..2f9d372
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPasswordAuthentication.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Password authentication for vanilla SSH.
+ */
+public class SSHPasswordAuthentication implements AuthenticationInfo {
+
+ private String userName;
+ private String password;
+
+ public SSHPasswordAuthentication(String userName, String password) {
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyAuthentication.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyAuthentication.java
new file mode 100644
index 0000000..8134065
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyAuthentication.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Public key authentication for vanilla SSH.
+ * The public key and private key are returned as byte arrays. Useful when we store private key/public key
+ * in a secure storage such as credential store. API user should implement this.
+ */
+public interface SSHPublicKeyAuthentication extends AuthenticationInfo {
+
+ /**
+ * Gets the public key as byte array.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The public key as a byte array.
+ */
+ byte[] getPrivateKey(String userName, String hostName);
+
+ /**
+ * Gets the private key as byte array.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The private key as a byte array.
+ */
+ byte[] getPublicKey(String userName, String hostName);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyFileAuthentication.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyFileAuthentication.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyFileAuthentication.java
new file mode 100644
index 0000000..1915d21
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/authentication/SSHPublicKeyFileAuthentication.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.authentication;
+
+/**
+ * Public key authentication for vanilla SSH.
+ * The public key and private key stored files are returned. API user should implement this.
+ */
+public interface SSHPublicKeyFileAuthentication extends AuthenticationInfo {
+
+ /**
+ * The file which contains the public key.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The name of the file which contains the public key.
+ */
+ String getPublicKeyFile(String userName, String hostName);
+
+ /**
+ * The file which contains the public key.
+ * @param userName The user who is trying to SSH
+ * @param hostName The host which user wants to connect to.
+ * @return The name of the file which contains the private key.
+ */
+ String getPrivateKeyFile(String userName, String hostName);
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
new file mode 100644
index 0000000..abc62aa
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/AbstractRemoteCluster.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+
+public abstract class AbstractRemoteCluster implements RemoteCluster {
+
+ protected final OutputParser outputParser;
+ protected final AuthenticationInfo authenticationInfo;
+ protected final ServerInfo serverInfo;
+ protected final JobManagerConfiguration jobManagerConfiguration;
+
+ public AbstractRemoteCluster(ServerInfo serverInfo,
+ JobManagerConfiguration jobManagerConfiguration,
+ AuthenticationInfo authenticationInfo) {
+
+ this.serverInfo = serverInfo;
+ this.jobManagerConfiguration = jobManagerConfiguration;
+ this.authenticationInfo = authenticationInfo;
+ if (jobManagerConfiguration != null) {
+ this.outputParser = jobManagerConfiguration.getParser();
+ }else {
+ this.outputParser = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandInfo.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandInfo.java
new file mode 100644
index 0000000..559430a
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandInfo.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * Encapsulates information about
+ */
+public interface CommandInfo {
+
+ /**
+ * Gets the executable command as a string.
+ * @return String encoded command. Should be able to execute
+ * directly on remote shell. Should includes appropriate parameters.
+ */
+ String getCommand();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandOutput.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandOutput.java
new file mode 100644
index 0000000..791616c
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/CommandOutput.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+
+import com.jcraft.jsch.Channel;
+
+import java.io.OutputStream;
+
+/**
+ * Output of a certain command. TODO rethink
+ */
+public interface CommandOutput {
+
+ /**
+ * Gets the output of the command as a stream.
+ * @param channel Command output as a stream.
+ */
+ void onOutput(Channel channel);
+
+ /**
+ * Gets standard error as a output stream.
+ * @return Command error as a stream.
+ */
+ OutputStream getStandardError();
+
+ /**
+ * The command exit code.
+ * @param code The program exit code
+ */
+ void exitCode(int code);
+
+ /**
+ * Return the exit code of the command execution.
+ * @return exit code
+ */
+ int getExitCode();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
new file mode 100644
index 0000000..85cb033
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/JobSubmissionOutput.java
@@ -0,0 +1,87 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+public class JobSubmissionOutput {
+
+ private int exitCode = Integer.MIN_VALUE;
+ private String stdOut;
+ private String stdErr;
+ private String command;
+ private String jobId;
+ private boolean isJobSubmissionFailed;
+ private String failureReason;
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public String getStdOut() {
+ return stdOut;
+ }
+
+ public void setStdOut(String stdOut) {
+ this.stdOut = stdOut;
+ }
+
+ public String getStdErr() {
+ return stdErr;
+ }
+
+ public void setStdErr(String stdErr) {
+ this.stdErr = stdErr;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public boolean isJobSubmissionFailed() {
+ return isJobSubmissionFailed;
+ }
+
+ public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+ isJobSubmissionFailed = jobSubmissionFailed;
+ }
+
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public void setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
new file mode 100644
index 0000000..f6f1824
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/OutputParser.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.Map;
+
+public interface OutputParser {
+
+ /**
+ * This can be used to parseSingleJob the result of a job submission to get the JobID
+ * @param rawOutput
+ * @return
+ */
+ public String parseJobSubmission(String rawOutput)throws GFacException;
+
+
+ /**
+ * Parse output return by job submission task and identify jobSubmission failures.
+ * @param rawOutput
+ * @return true if job submission has been failed, false otherwise.
+ */
+ public boolean isJobSubmissionFailed(String rawOutput);
+
+
+ /**
+ * This can be used to get the job status from the output
+ * @param jobID
+ * @param rawOutput
+ */
+ public JobStatus parseJobStatus(String jobID, String rawOutput)throws GFacException;
+
+ /**
+ * This can be used to parseSingleJob a big output and get multipleJob statuses
+ * @param statusMap list of status map will return and key will be the job ID
+ * @param rawOutput
+ */
+ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput)throws GFacException;
+
+ /**
+ * filter the jobId value of given JobName from rawOutput
+ * @param jobName
+ * @param rawOutput
+ * @return
+ * @throws SSHApiException
+ */
+ public String parseJobId(String jobName, String rawOutput) throws GFacException;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
new file mode 100644
index 0000000..6045f4e
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RawCommandInfo.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * The raw command information. String returned by getCommand is directly executed in SSH
+ * shell. E.g :- getCommand return string set for rawCommand - "/opt/torque/bin/qsub /home/ogce/test.pbs".
+ */
+public class RawCommandInfo implements CommandInfo {
+
+ private String rawCommand;
+
+ public RawCommandInfo(String cmd) {
+ this.rawCommand = cmd;
+ }
+
+ public String getCommand() {
+ return this.rawCommand;
+ }
+
+ public String getRawCommand() {
+ return rawCommand;
+ }
+
+ public void setRawCommand(String rawCommand) {
+ this.rawCommand = rawCommand;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
new file mode 100644
index 0000000..936e1ce
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/RemoteCluster.java
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.worker.commons.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.exceptions.SSHApiException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface represents a RemoteCluster machine
+ * End users of the API can implement this and come up with their own
+ * implementations, but mostly this interface is for internal usage.
+ */
+public interface RemoteCluster { // FIXME: replace SSHApiException with suitable exception.
+
+ /**
+ * This will submit a job to the cluster with a given pbs file and some parameters
+ *
+ * @param jobScriptFilePath path of the job script file
+ * @param workingDirectory working directory where pbs should has to copy
+ * @return jobId after successful job submission
+ * @throws SSHApiException throws exception during error
+ */
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws WorkerException;
+
+ /**
+ * This will copy the localFile to remoteFile location in configured cluster
+ *
+ * @param localFile local file path of the file which needs to copy to remote location
+ * @param remoteFile remote file location, this can be a directory too
+ * @throws SSHApiException throws exception during error
+ */
+ public void copyTo(String localFile, String remoteFile) throws WorkerException;
+
+ /**
+ * This will copy a remote file in path rFile to local file lFile
+ *
+ * @param remoteFile remote file path, this has to be a full qualified path
+ * @param localFile This is the local file to copy, this can be a directory too
+ */
+ public void copyFrom(String remoteFile, String localFile) throws WorkerException;
+
+ /**
+ * This wil copy source remote file to target remote file.
+ *
+ * @param sourceFile remote file path, this has to be a full qualified path
+ * @param destinationFile This is the local file to copy, this can be a directory too
+ * @param session jcraft session of other coner of thirdparty file transfer.
+ * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT)
+ *
+ */
+ public void scpThirdParty(String sourceFile,
+ String destinationFile,
+ Session session,
+ DIRECTION inOrOut,
+ boolean ignoreEmptyFile) throws WorkerException;
+
+ /**
+ * This will create directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public void makeDirectory(String directoryPath) throws WorkerException;
+
+ /**
+ * This will delete the given job from the queue
+ *
+ * @param jobID jobId of the job which user wants to delete
+ * @return return the description of the deleted job
+ * @throws SSHApiException throws exception during error
+ */
+ public JobStatus cancelJob(String jobID) throws WorkerException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobID jobId of the job user want to get the status
+ * @return job status of the given jobID
+ * @throws SSHApiException throws exception during error
+ */
+ public JobStatus getJobStatus(String jobID) throws WorkerException;
+
+ /**
+ * This will get the job status of the the job associated with this jobId
+ *
+ * @param jobName jobName of the job user want to get the status
+ * @return jobId of the given jobName
+ * @throws SSHApiException throws exception during error
+ */
+ public String getJobIdByJobName(String jobName, String userName) throws WorkerException;
+
+ /**
+ * This method can be used to poll the jobstatuses based on the given
+ * user but we should pass the jobID list otherwise we will get unwanted
+ * job statuses which submitted by different middleware outside apache
+ * airavata with the same uername which we are not considering
+ *
+ * @param userName userName of the jobs which required to get the status
+ * @param jobIDs precises set of jobIDs
+ */
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws WorkerException;
+
+ /**
+ * This will list directories in computing resources
+ *
+ * @param directoryPath the full qualified path for the directory user wants to create
+ * @throws SSHApiException throws during error
+ */
+ public List<String> listDirectory(String directoryPath) throws WorkerException;
+
+ /**
+ * This method can use to execute custom command on remote compute resource.
+ * @param commandInfo
+ * @return <code>true</code> if command successfully executed, <code>false</code> otherwise.
+ * @throws SSHApiException
+ */
+ public boolean execute(CommandInfo commandInfo) throws WorkerException;
+
+ /**
+ * This method can be used to get created ssh session
+ * to reuse the created session.
+ */
+ public Session getSession() throws WorkerException;
+
+ /**
+ * This method can be used to close the connections initialized
+ * to handle graceful shutdown of the system
+ */
+ public void disconnect() throws WorkerException;
+
+ /**
+ * This gives the server Info
+ */
+ public ServerInfo getServerInfo();
+
+ public AuthenticationInfo getAuthentication();
+ enum DIRECTION {
+ TO,
+ FROM
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
new file mode 100644
index 0000000..a451d2b
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/cluster/ServerInfo.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.cluster;
+
+/**
+ * Encapsulate server information.
+ */
+public class ServerInfo {
+ private static int DEFAULT_PORT = 22;
+ private String host;
+ private String userName;
+ private int port;
+ private String credentialToken;
+
+ public ServerInfo(String userName, String host, String credentialToken) {
+ this(userName, host, credentialToken, DEFAULT_PORT);
+ }
+
+ public ServerInfo(String userName, String host, String credentialToken, int port) {
+ this.host = host;
+ this.userName = userName;
+ this.port = port;
+ this.credentialToken = credentialToken;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getCredentialToken() {
+ return credentialToken;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
new file mode 100644
index 0000000..53bda5b
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/ProcessContext.java
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
+import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
+import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.worker.commons.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.commons.cluster.RemoteCluster;
+import org.apache.airavata.worker.commons.cluster.ServerInfo;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class ProcessContext {
+
+ private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
+ // process model
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private CuratorFramework curatorClient;
+ private Publisher statusPublisher;
+ private final String processId;
+ private final String gatewayId;
+ private final String tokenId;
+ private ProcessModel processModel;
+ private String workingDir;
+ private String scratchLocation;
+ private String inputDir;
+ private String outputDir;
+ private String localWorkingDir;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private UserResourceProfile userResourceProfile;
+ private UserComputeResourcePreference userComputeResourcePreference;
+ private UserStoragePreference userStoragePreference;
+ private ComputeResourceDescription computeResourceDescription;
+ private ApplicationDeploymentDescription applicationDeploymentDescription;
+ private ApplicationInterfaceDescription applicationInterfaceDescription;
+ private RemoteCluster jobSubmissionRemoteCluster;
+ private RemoteCluster dataMovementRemoteCluster;
+ private Map<String, String> sshProperties;
+ private String stdoutLocation;
+ private String stderrLocation;
+ private JobSubmissionProtocol jobSubmissionProtocol;
+ private DataMovementProtocol dataMovementProtocol;
+ private JobModel jobModel;
+ private StorageResourceDescription storageResource;
+ private MonitorMode monitorMode;
+ private ResourceJobManager resourceJobManager;
+ private boolean handOver;
+ private boolean cancel;
+ private ServerInfo serverInfo;
+ private List<String> taskExecutionOrder;
+ private List<TaskModel> taskList;
+ private Map<String, TaskModel> taskMap;
+ private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value
+ private boolean complete = false; // all tasks executed?
+ private boolean recovery = false; // is process in recovery mode?
+ private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+ private boolean acknowledge;
+ private SSHKeyAuthentication sshKeyAuthentication;
+ private boolean recoveryWithCancel = false;
+ private String usageReportingGatewayId;
+
+ /**
+ * Note: process context property use lazy loading approach. In runtime you will see some properties as null
+ * unless you have access it previously. Once that property access using the api,it will be set to correct value.
+ */
+ private ProcessContext(String processId, String gatewayId, String tokenId) {
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ public ExperimentCatalog getExperimentCatalog() {
+ return experimentCatalog;
+ }
+
+ public void setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ }
+
+ public AppCatalog getAppCatalog() {
+ return appCatalog;
+ }
+
+ public void setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public String getTokenId() {
+ return tokenId;
+ }
+
+ public String getProcessId() {
+ return processId;
+ }
+
+ public CuratorFramework getCuratorClient() {
+ return curatorClient;
+ }
+
+ public void setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ }
+
+ public Publisher getStatusPublisher() {
+ return statusPublisher;
+ }
+
+ public void setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ }
+
+ public ProcessModel getProcessModel() {
+ return processModel;
+ }
+
+ public void setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ }
+
+ public String getWorkingDir() {
+ if (workingDir == null) {
+ if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
+ workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir();
+ }else {
+ String scratchLocation = getScratchLocation();
+ workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" +
+ processId);
+ }
+ }
+ return workingDir;
+ }
+
+ public String getScratchLocation() {
+ if (scratchLocation == null) {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getScratchLocation())) {
+ scratchLocation = userComputeResourcePreference.getScratchLocation();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) {
+ scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
+ }else {
+ scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
+ }
+ }
+ return scratchLocation;
+ }
+
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+
+ public GatewayResourceProfile getGatewayResourceProfile() {
+ return gatewayResourceProfile;
+ }
+
+ public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ }
+
+ public UserResourceProfile getUserResourceProfile() {
+ return userResourceProfile;
+ }
+
+ public void setUserResourceProfile(UserResourceProfile userResourceProfile) {
+ this.userResourceProfile = userResourceProfile;
+ }
+
+ private UserComputeResourcePreference getUserComputeResourcePreference() {
+ return userComputeResourcePreference;
+ }
+
+ public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) {
+ this.userComputeResourcePreference = userComputeResourcePreference;
+ }
+
+ public UserStoragePreference getUserStoragePreference() {
+ return userStoragePreference;
+ }
+
+ public void setUserStoragePreference(UserStoragePreference userStoragePreference) {
+ this.userStoragePreference = userStoragePreference;
+ }
+
+ public StoragePreference getGatewayStorageResourcePreference() {
+ return gatewayStorageResourcePreference;
+ }
+
+ public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ }
+
+ public RemoteCluster getJobSubmissionRemoteCluster() {
+ return jobSubmissionRemoteCluster;
+ }
+
+ public void setJobSubmissionRemoteCluster(RemoteCluster jobSubmissoinRemoteCluster) {
+ this.jobSubmissionRemoteCluster = jobSubmissoinRemoteCluster;
+ }
+
+ public RemoteCluster getDataMovementRemoteCluster() {
+ return dataMovementRemoteCluster;
+ }
+
+ public void setDataMovementRemoteCluster(RemoteCluster dataMovementRemoteCluster) {
+ this.dataMovementRemoteCluster = dataMovementRemoteCluster;
+ }
+
+ public Map<String, String> getSshProperties() {
+ return sshProperties;
+ }
+
+ public void setSshProperties(Map<String, String> sshProperties) {
+ this.sshProperties = sshProperties;
+ }
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
+ }
+
+ public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+ return applicationDeploymentDescription;
+ }
+
+ public void setApplicationDeploymentDescription(ApplicationDeploymentDescription
+ applicationDeploymentDescription) {
+ this.applicationDeploymentDescription = applicationDeploymentDescription;
+ }
+
+ public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+ return applicationInterfaceDescription;
+ }
+
+ public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+ this.applicationInterfaceDescription = applicationInterfaceDescription;
+ }
+
+ public String getStdoutLocation() {
+ return stdoutLocation;
+ }
+
+ public void setStdoutLocation(String stdoutLocation) {
+ this.stdoutLocation = stdoutLocation;
+ }
+
+ public String getStderrLocation() {
+ return stderrLocation;
+ }
+
+ public void setStderrLocation(String stderrLocation) {
+ this.stderrLocation = stderrLocation;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public String getOutputDir() {
+ if (outputDir == null) {
+ outputDir = getWorkingDir();
+ }
+ return outputDir;
+ }
+
+ public String getInputDir() {
+ if (inputDir == null) {
+ inputDir = getWorkingDir();
+ }
+ return inputDir;
+ }
+
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+
+ public JobSubmissionProtocol getJobSubmissionProtocol() {
+ if (jobSubmissionProtocol == null) {
+ jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+ return jobSubmissionProtocol;
+ }
+
+ public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) {
+ this.jobSubmissionProtocol = jobSubmissionProtocol;
+ }
+
+ public DataMovementProtocol getDataMovementProtocol() {
+ if (dataMovementProtocol == null) {
+ dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+ return dataMovementProtocol;
+ }
+
+ public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) {
+ this.dataMovementProtocol = dataMovementProtocol;
+ }
+
+ public String getTaskDag() {
+ return getProcessModel().getTaskDag();
+ }
+
+ public List<TaskModel> getTaskList() {
+ if (taskList == null) {
+ synchronized (TaskModel.class){
+ if (taskList == null) {
+ taskList = getProcessModel().getTasks();
+ }
+ }
+ }
+ return taskList;
+ }
+
+
+ public List<String> getTaskExecutionOrder() {
+ return taskExecutionOrder;
+ }
+
+ public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+ this.taskExecutionOrder = taskExecutionOrder;
+ }
+
+ public Map<String, TaskModel> getTaskMap() {
+ if (taskMap == null) {
+ synchronized (TaskModel.class) {
+ if (taskMap == null) {
+ taskMap = new HashMap<>();
+ for (TaskModel taskModel : getTaskList()) {
+ taskMap.put(taskModel.getTaskId(), taskModel);
+ }
+ }
+ }
+ }
+ return taskMap;
+ }
+
+ public JobModel getJobModel() {
+ if (jobModel == null) {
+ jobModel = new JobModel();
+ jobModel.setProcessId(processId);
+ jobModel.setWorkingDir(getWorkingDir());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ }
+ return jobModel;
+ }
+
+ public void setJobModel(JobModel jobModel) {
+ this.jobModel = jobModel;
+ }
+
+ private ComputeResourcePreference getGatewayComputeResourcePreference() {
+ return gatewayComputeResourcePreference;
+ }
+
+ public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ }
+
+ public ProcessState getProcessState() {
+ if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0)
+ return processModel.getProcessStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public void setProcessStatus(ProcessStatus status) {
+ if (status != null) {
+ log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId,
+ getProcessState().name(), status.getState().name());
+ List<ProcessStatus> processStatuses = new ArrayList<>();
+ processStatuses.add(status);
+ processModel.setProcessStatuses(processStatuses);
+ }
+ }
+
+ public ProcessStatus getProcessStatus(){
+ if(processModel.getProcessStatuses() != null)
+ return processModel.getProcessStatuses().get(0);
+ else
+ return null;
+ }
+
+ public String getComputeResourceId() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getComputeResourceId())) {
+ return userComputeResourcePreference.getComputeResourceId();
+ } else {
+ return gatewayComputeResourcePreference.getComputeResourceId();
+ }
+ }
+
+ public String getComputeResourceCredentialToken(){
+ if (isUseUserCRPref()) {
+ if (userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return userComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return userResourceProfile.getCredentialStoreToken();
+ }
+ } else {
+ if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+ }
+
+ public String getStorageResourceCredentialToken(){
+ if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) {
+ return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken();
+ } else {
+ return gatewayResourceProfile.getCredentialStoreToken();
+ }
+ }
+
+ public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
+ return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+ }
+
+ public DataMovementProtocol getPreferredDataMovementProtocol() {
+ return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+ }
+
+ public void setMonitorMode(MonitorMode monitorMode) {
+ this.monitorMode = monitorMode;
+ }
+
+ public MonitorMode getMonitorMode() {
+ return monitorMode;
+ }
+
+ public void setResourceJobManager(ResourceJobManager resourceJobManager) {
+ this.resourceJobManager = resourceJobManager;
+ }
+
+ public ResourceJobManager getResourceJobManager() {
+ return resourceJobManager;
+ }
+
+ public String getLocalWorkingDir() {
+ return localWorkingDir;
+ }
+
+ public void setLocalWorkingDir(String localWorkingDir) {
+ this.localWorkingDir = localWorkingDir;
+ }
+
+ public String getExperimentId() {
+ return processModel.getExperimentId();
+ }
+
+ public boolean isHandOver() {
+ return handOver;
+ }
+
+ public void setHandOver(boolean handOver) {
+ this.handOver = handOver;
+ }
+
+ public boolean isCancel() {
+ return cancel;
+ }
+
+ public void setCancel(boolean cancel) {
+ this.cancel = cancel;
+ }
+
+ public boolean isInterrupted(){
+ return this.cancel || this.handOver;
+ }
+
+ public String getCurrentExecutingTaskId() {
+ if (currentExecutingTaskModel != null) {
+ return currentExecutingTaskModel.getTaskId();
+ }
+ return null;
+ }
+
+ public boolean isPauseTaskExecution() {
+ return pauseTaskExecution;
+ }
+
+ public void setPauseTaskExecution(boolean pauseTaskExecution) {
+ this.pauseTaskExecution = pauseTaskExecution;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public void setComplete(boolean complete) {
+ this.complete = complete;
+ }
+
+ public boolean isRecovery() {
+ return recovery;
+ }
+
+ public void setRecovery(boolean recovery) {
+ this.recovery = recovery;
+ }
+
+ public TaskModel getCurrentExecutingTaskModel() {
+ return currentExecutingTaskModel;
+ }
+
+ public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+ this.currentExecutingTaskModel = currentExecutingTaskModel;
+ }
+
+ public StorageResourceDescription getStorageResource() {
+ return storageResource;
+ }
+
+ public void setStorageResource(StorageResourceDescription storageResource) {
+ this.storageResource = storageResource;
+ }
+
+ public void setAcknowledge(boolean acknowledge) {
+ this.acknowledge = acknowledge;
+ }
+
+ public boolean isAcknowledge() {
+ return acknowledge;
+ }
+
+ public boolean isRecoveryWithCancel() {
+ return recoveryWithCancel;
+ }
+
+ public void setRecoveryWithCancel(boolean recoveryWithCancel) {
+ this.recoveryWithCancel = recoveryWithCancel;
+ }
+
+ public boolean isUseUserCRPref() {
+ return getProcessModel().isUseUserCRPref();
+ }
+
+ public String getComputeResourceLoginUserName(){
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getLoginUserName())) {
+ return userComputeResourcePreference.getLoginUserName();
+ } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) {
+ return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
+ } else {
+ return gatewayComputeResourcePreference.getLoginUserName();
+ }
+ }
+
+ public String getStorageResourceLoginUserName(){
+ return gatewayStorageResourcePreference.getLoginUserName();
+ }
+
+ public String getStorageFileSystemRootLocation(){
+ return gatewayStorageResourcePreference.getFileSystemRootLocation();
+ }
+
+ public String getStorageResourceId() {
+ return gatewayStorageResourcePreference.getStorageResourceId();
+ }
+
+ private ComputationalResourceSchedulingModel getProcessCRSchedule() {
+ if (getProcessModel() != null) {
+ return getProcessModel().getProcessResourceSchedule();
+ } else {
+ return null;
+ }
+ }
+
+ public ServerInfo getComputeResourceServerInfo(){
+ return new ServerInfo(getComputeResourceLoginUserName(),
+ getComputeResourceDescription().getHostName(),
+ getComputeResourceCredentialToken());
+ }
+
+ public ServerInfo getStorageResourceServerInfo() {
+ return new ServerInfo(getStorageResourceLoginUserName(),
+ getStorageResource().getHostName(),
+ getStorageResourceCredentialToken());
+ }
+
+ private boolean isValid(String str) {
+ return str != null && !str.trim().isEmpty();
+ }
+
+ public String getUsageReportingGatewayId() {
+ return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+ }
+
+ public String getAllocationProjectNumber() {
+ return gatewayComputeResourcePreference.getAllocationProjectNumber();
+ }
+
+ public String getReservation() {
+ long start = 0, end = 0;
+ String reservation = null;
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getReservation())) {
+ reservation = userComputeResourcePreference.getReservation();
+ start = userComputeResourcePreference.getReservationStartTime();
+ end = userComputeResourcePreference.getReservationEndTime();
+ } else {
+ reservation = gatewayComputeResourcePreference.getReservation();
+ start = gatewayComputeResourcePreference.getReservationStartTime();
+ end = gatewayComputeResourcePreference.getReservationEndTime();
+ }
+ if (reservation != null && start > 0 && start < end) {
+ long now = Calendar.getInstance().getTimeInMillis();
+ if (now > start && now < end) {
+ return reservation;
+ }
+ }
+ return null;
+ }
+
+ public String getQualityOfService() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getQualityOfService())) {
+ return userComputeResourcePreference.getQualityOfService();
+ } else {
+ return gatewayComputeResourcePreference.getQualityOfService();
+ }
+ }
+
+
+ public String getQueueName() {
+ if (isUseUserCRPref() &&
+ userComputeResourcePreference != null &&
+ isValid(userComputeResourcePreference.getPreferredBatchQueue())) {
+ return userComputeResourcePreference.getPreferredBatchQueue();
+ } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
+ return processModel.getProcessResourceSchedule().getQueueName();
+ } else {
+ return gatewayComputeResourcePreference.getPreferredBatchQueue();
+ }
+ }
+
+ public static class ProcessContextBuilder{
+ private final String processId;
+ private final String gatewayId;
+ private final String tokenId;
+ private ExperimentCatalog experimentCatalog;
+ private AppCatalog appCatalog;
+ private CuratorFramework curatorClient;
+ private Publisher statusPublisher;
+ private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourcePreference gatewayComputeResourcePreference;
+ private StoragePreference gatewayStorageResourcePreference;
+ private ProcessModel processModel;
+
+ public ProcessContextBuilder(String processId, String gatewayId, String tokenId) throws WorkerException {
+ if (notValid(processId) || notValid(gatewayId) || notValid(tokenId)) {
+ throwError("Process Id, Gateway Id and tokenId must be not null");
+ }
+ this.processId = processId;
+ this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
+ }
+
+ public ProcessContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) {
+ this.gatewayResourceProfile = gatewayResourceProfile;
+ return this;
+ }
+
+ public ProcessContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) {
+ this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
+ return this;
+ }
+
+ public ProcessContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) {
+ this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
+ return this;
+ }
+
+ public ProcessContextBuilder setProcessModel(ProcessModel processModel) {
+ this.processModel = processModel;
+ return this;
+ }
+
+ public ProcessContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) {
+ this.experimentCatalog = experimentCatalog;
+ return this;
+ }
+
+ public ProcessContextBuilder setAppCatalog(AppCatalog appCatalog) {
+ this.appCatalog = appCatalog;
+ return this;
+ }
+
+ public ProcessContextBuilder setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ return this;
+ }
+
+ public ProcessContextBuilder setStatusPublisher(Publisher statusPublisher) {
+ this.statusPublisher = statusPublisher;
+ return this;
+ }
+
+ public ProcessContext build() throws WorkerException {
+ if (notValid(gatewayResourceProfile)) {
+ throwError("Invalid GatewayResourceProfile");
+ }
+ if (notValid(gatewayComputeResourcePreference)) {
+ throwError("Invalid Gateway ComputeResourcePreference");
+ }
+ if (notValid(gatewayStorageResourcePreference)) {
+ throwError("Invalid Gateway StoragePreference");
+ }
+ if (notValid(processModel)) {
+ throwError("Invalid Process Model");
+ }
+ if (notValid(appCatalog)) {
+ throwError("Invalid AppCatalog");
+ }
+ if (notValid(experimentCatalog)) {
+ throwError("Invalid Experiment catalog");
+ }
+ if (notValid(curatorClient)) {
+ throwError("Invalid Curator Client");
+ }
+ if (notValid(statusPublisher)) {
+ throwError("Invalid Status Publisher");
+ }
+
+ ProcessContext pc = new ProcessContext(processId, gatewayId, tokenId);
+ pc.setAppCatalog(appCatalog);
+ pc.setExperimentCatalog(experimentCatalog);
+ pc.setCuratorClient(curatorClient);
+ pc.setStatusPublisher(statusPublisher);
+ pc.setProcessModel(processModel);
+ pc.setGatewayResourceProfile(gatewayResourceProfile);
+ pc.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
+ pc.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
+
+ return pc;
+ }
+
+ private boolean notValid(Object value) {
+ return value == null;
+ }
+
+ private void throwError(String msg) throws WorkerException {
+ throw new WorkerException(msg);
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
new file mode 100644
index 0000000..f94ebd5
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/context/TaskContext.java
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.context;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TaskContext {
+ private static final Logger log = LoggerFactory.getLogger(TaskContext.class);
+
+ private TaskModel taskModel;
+ private ProcessContext parentProcessContext;
+ private InputDataObjectType processInput;
+ private OutputDataObjectType processOutput;
+ private Object subTaskModel = null;
+ private boolean isCancel = false;
+
+ public TaskModel getTaskModel() {
+ return taskModel;
+ }
+
+ public void setTaskModel(TaskModel taskModel) {
+ this.taskModel = taskModel;
+ }
+
+ public ProcessContext getParentProcessContext() {
+ return parentProcessContext;
+ }
+
+ public void setParentProcessContext(ProcessContext parentProcessContext) {
+ this.parentProcessContext = parentProcessContext;
+ }
+
+ public String getWorkingDir() {
+ return getParentProcessContext().getWorkingDir();
+ }
+
+ public void setTaskStatus(TaskStatus taskStatus) {
+ log.info("expId: {}, processId: {}, taskId: {}, type: {} : Task status changed {} -> {}", parentProcessContext
+ .getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+ getTaskState().name(), taskStatus .getState().name());
+ List<TaskStatus> taskStatuses = new ArrayList<>();
+ taskStatuses.add(taskStatus);
+ taskModel.setTaskStatuses(taskStatuses);
+ }
+
+ public TaskStatus getTaskStatus() {
+ if(taskModel.getTaskStatuses() != null)
+ return taskModel.getTaskStatuses().get(0);
+ else
+ return null;
+ }
+
+ public TaskState getTaskState() {
+ if(taskModel.getTaskStatuses() != null)
+ return taskModel.getTaskStatuses().get(0).getState();
+ else
+ return null;
+ }
+
+ public TaskTypes getTaskType() {
+ return taskModel.getTaskType();
+ }
+
+ public String getTaskId() {
+ return taskModel.getTaskId();
+ }
+
+ public String getLocalWorkingDir() {
+ return getParentProcessContext().getLocalWorkingDir();
+ }
+
+ public InputDataObjectType getProcessInput() {
+ return processInput;
+ }
+
+ public void setProcessInput(InputDataObjectType processInput) {
+ this.processInput = processInput;
+ }
+
+ public OutputDataObjectType getProcessOutput() {
+ return processOutput;
+ }
+
+ public void setProcessOutput(OutputDataObjectType processOutput) {
+ this.processOutput = processOutput;
+ }
+
+ public String getProcessId() {
+ return parentProcessContext.getProcessId();
+ }
+
+ public String getExperimentId() {
+ return parentProcessContext.getExperimentId();
+ }
+
+ public Object getSubTaskModel() throws TException {
+ if (subTaskModel == null) {
+ subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
+ }
+ return subTaskModel;
+ }
+
+ public boolean isCancel() {
+ return isCancel;
+ }
+
+ public void setCancel(boolean cancel) {
+ isCancel = cancel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
new file mode 100644
index 0000000..0dcdd0e
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/SSHApiException.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ * An exception class to wrap SSH command execution related errors.
+ */
+public class SSHApiException extends Exception {
+
+ public SSHApiException(String message) {
+ super(message);
+ }
+
+ public SSHApiException(String message, Exception e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
new file mode 100644
index 0000000..334ee0f
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/exceptions/WorkerException.java
@@ -0,0 +1,47 @@
+package org.apache.airavata.worker.commons.exceptions;
+
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkerException extends Exception {
+ private static final Logger log = LoggerFactory.getLogger(WorkerException.class);
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public WorkerException(String s) {
+ super(s);
+ }
+
+ public WorkerException(Exception e) {
+ super(e);
+ log.error(e.getMessage(),e);
+ }
+
+ public WorkerException(String s, Throwable throwable) {
+ super(s, throwable);
+ log.error(s,throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/Task.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/Task.java
new file mode 100644
index 0000000..b7a8b45
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/Task.java
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.commons.context.TaskContext;
+
+import java.util.Map;
+
+/**
+ * All Tasks should inherit this interface.
+ */
+public interface Task {
+
+ /**
+ * Task initialization method, this method will be invoked after create a new task instance.
+ * @param propertyMap
+ * @throws TaskException
+ */
+ public void init(Map<String, String> propertyMap) throws TaskException;
+
+ /**
+ * This method will be called at the first time of task chain execution. This method should called before recover
+ * method. For a given task chain execute method only call one time. recover method may be called more than once.
+ * @param taskContext
+ * @return completed task status if success otherwise failed task status.
+ */
+ public TaskStatus execute(TaskContext taskContext);
+
+ /**
+ * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
+ * This method may be called zero or few time in a process chain.
+ * @param taskContext
+ * @return completed task status if success otherwise failed task status.
+ */
+ public TaskStatus recover(TaskContext taskContext);
+
+ /**
+ * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
+ * @return type of this task object
+ */
+ public TaskTypes getType();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9d68f0b9/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
new file mode 100644
index 0000000..d290881
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/task/TaskException.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.task;
+
+public class TaskException extends Exception {
+ private static final long serialVersionUID = 8662332011259328779L;
+
+ public TaskException() {
+ super();
+ }
+
+ public TaskException(String message) {
+ super(message);
+ }
+
+ public TaskException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskException(Throwable cause) {
+ super(cause);
+ }
+
+ protected TaskException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}