You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/13 18:47:49 UTC
[1/6] airavata git commit: Adding common factory methods
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt a974f3fb2 -> ea865f237
Adding common factory methods
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4248419d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4248419d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4248419d
Branch: refs/heads/feature-workload-mgmt
Commit: 4248419db5f02aa3ed9e8d8fc0fc1d0c85ce953b
Parents: a974f3f
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:34:51 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:34:51 2017 -0400
----------------------------------------------------------------------
.../worker/core/utils/WorkerFactory.java | 181 ++++++++++++++++++-
1 file changed, 180 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4248419d/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
index 6dcd275..75a8062 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
@@ -23,9 +23,17 @@ package org.apache.airavata.worker.core.utils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
-import com.jcraft.jsch.Session;
+import com.jcraft.jsch.*;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
import org.apache.airavata.worker.core.config.ResourceConfig;
import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
import org.apache.airavata.worker.core.exceptions.WorkerException;
@@ -34,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
@@ -79,4 +88,174 @@ public class WorkerFactory {
public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
return resources.get(resourceJobManagerType);
}
+
+ public static SSHKeyAuthentication getSshKeyAuthentication(String gatewayId,
+ String loginUserName,
+ String credentialStoreToken)
+ throws ApplicationSettingsException, IllegalAccessException, InstantiationException,
+ CredentialStoreException, WorkerException {
+
+ SSHKeyAuthentication sshKA;CredentialReader credentialReader = WorkerUtils.getCredentialReader();
+ Credential credential = credentialReader.getCredential(gatewayId, credentialStoreToken);
+ if (credential instanceof SSHCredential) {
+ sshKA = new SSHKeyAuthentication();
+ sshKA.setUserName(loginUserName);
+ SSHCredential sshCredential = (SSHCredential) credential;
+ sshKA.setPublicKey(sshCredential.getPublicKey());
+ sshKA.setPrivateKey(sshCredential.getPrivateKey());
+ sshKA.setPassphrase(sshCredential.getPassphrase());
+ sshKA.setStrictHostKeyChecking("no");
+/* sshKA.setStrictHostKeyChecking(ServerSettings.getSetting("ssh.strict.hostKey.checking", "no"));
+ sshKA.setKnownHostsFilePath(ServerSettings.getSetting("ssh.known.hosts.file", null));
+ if (sshKA.getStrictHostKeyChecking().equals("yes") && sshKA.getKnownHostsFilePath() == null) {
+ throw new ApplicationSettingsException("If ssh strict hostkey checking property is set to yes, you must " +
+ "provide known host file path");
+ }*/
+ return sshKA;
+ } else {
+ String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+ log.error(msg);
+ throw new CredentialStoreException("Invalid credential store token:" + credentialStoreToken);
+ }
+ }
+
+ public static synchronized Session getSSHSession(AuthenticationInfo authenticationInfo,
+ ServerInfo serverInfo) throws WorkerException {
+ if (authenticationInfo == null
+ || serverInfo == null) {
+
+ throw new IllegalArgumentException("Can't create ssh session, argument should be valid (not null)");
+ }
+ SSHKeyAuthentication authentication;
+ if (authenticationInfo instanceof SSHKeyAuthentication) {
+ authentication = (SSHKeyAuthentication) authenticationInfo;
+ } else {
+ throw new WorkerException("Support ssh key authentication only");
+ }
+ String key = buildKey(serverInfo);
+ Session session = sessionCache.getIfPresent(key);
+ boolean valid = isValidSession(session);
+ // FIXME - move following info logs to debug
+ if (valid) {
+ log.info("SSH Session validation succeeded, key :" + key);
+ valid = testChannelCreation(session);
+ if (valid) {
+ log.info("Channel creation test succeeded, key :" + key);
+ } else {
+ log.info("Channel creation test failed, key :" + key);
+ }
+ } else {
+ log.info("Session validation failed, key :" + key);
+ }
+
+ if (!valid) {
+ if (session != null) {
+ log.info("Reinitialize a new SSH session for :" + key);
+ } else {
+ log.info("Initialize a new SSH session for :" + key);
+ }
+ try {
+
+ JSch jSch = new JSch();
+ jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(),
+ authentication.getPassphrase().getBytes());
+ session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(),
+ serverInfo.getPort());
+ session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+ if (authentication.getStrictHostKeyChecking().equals("yes")) {
+ jSch.setKnownHosts(authentication.getKnownHostsFilePath());
+ } else {
+ session.setConfig("StrictHostKeyChecking", "no");
+ }
+ session.connect(); // 0 connection timeout
+ sessionCache.put(key, session);
+ } catch (JSchException e) {
+ throw new WorkerException("JSch initialization error ", e);
+ }
+ } else {
+ // FIXME - move following info log to debug
+ log.info("Reuse SSH session for :" + key);
+ }
+ return session;
+
+ }
+
+ private static boolean testChannelCreation(Session session) {
+
+ String command = "pwd ";
+ Channel channel = null;
+ try {
+ channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ channel.connect();
+ stdOutReader.onOutput(channel);
+ } catch (JSchException e) {
+ log.error("Test Channel creation failed.", e);
+ return false;
+ } finally {
+ if (channel != null) {
+ channel.disconnect();
+ }
+ }
+ return true;
+ }
+
+ private static boolean isValidSession(Session session) {
+ return session != null && session.isConnected();
+ }
+
+ private static String buildKey(ServerInfo serverInfo) {
+ return serverInfo.getUserName() +
+ "_" +
+ serverInfo.getHost() +
+ "_" +
+ serverInfo.getPort() +
+ "_" +
+ serverInfo.getCredentialToken();
+ }
+
+ private static class DefaultUserInfo implements UserInfo {
+
+ private String userName;
+ private String password;
+ private String passphrase;
+
+ public DefaultUserInfo(String userName, String password, String passphrase) {
+ this.userName = userName;
+ this.password = password;
+ this.passphrase = passphrase;
+ }
+
+ @Override
+ public String getPassphrase() {
+ return null;
+ }
+
+ @Override
+ public String getPassword() {
+ return null;
+ }
+
+ @Override
+ public boolean promptPassword(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptPassphrase(String s) {
+ return false;
+ }
+
+ @Override
+ public boolean promptYesNo(String s) {
+ return false;
+ }
+
+ @Override
+ public void showMessage(String s) {
+
+ }
+ }
}
[3/6] airavata git commit: Adding StandardOutReader to worker core
Posted by ad...@apache.org.
Adding StandardOutReader to worker core
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/99fb72d1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/99fb72d1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/99fb72d1
Branch: refs/heads/feature-workload-mgmt
Commit: 99fb72d1fd0ae376d531a3e46b700ec9973870fd
Parents: e1ff08b
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:36:11 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:36:11 2017 -0400
----------------------------------------------------------------------
.../worker/core/utils/StandardOutReader.java | 86 ++++++++++++++++++++
1 file changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/99fb72d1/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
new file mode 100644
index 0000000..d5cb2e8
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.worker.core.utils;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.worker.core.cluster.CommandOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StandardOutReader implements CommandOutput {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardOutReader.class);
+ String stdOutputString = null;
+ ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+ private int exitCode;
+
+ public void onOutput(Channel channel) {
+ try {
+ StringBuffer pbsOutput = new StringBuffer("");
+ InputStream inputStream = channel.getInputStream();
+ byte[] tmp = new byte[1024];
+ do {
+ while (inputStream.available() > 0) {
+ int i = inputStream.read(tmp, 0, 1024);
+ if (i < 0) break;
+ pbsOutput.append(new String(tmp, 0, i));
+ }
+ } while (!channel.isClosed()) ;
+ String output = pbsOutput.toString();
+ this.setStdOutputString(output);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ }
+
+
+ public void exitCode(int code) {
+ System.out.println("Program exit code - " + code);
+ this.exitCode = code;
+ }
+
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public String getStdOutputString() {
+ return stdOutputString;
+ }
+
+ public void setStdOutputString(String stdOutputString) {
+ this.stdOutputString = stdOutputString;
+ }
+
+ public String getStdErrorString() {
+ return errorStream.toString();
+ }
+
+ public OutputStream getStandardError() {
+ return errorStream;
+ }
+}
[6/6] airavata git commit: Adding Data staging task implementation
and related utils
Posted by ad...@apache.org.
Adding Data staging task implementation and related utils
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ea865f23
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ea865f23
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ea865f23
Branch: refs/heads/feature-workload-mgmt
Commit: ea865f23709778addf03480b10c5a9e80b2294f3
Parents: fbc0351
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:38:31 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:38:31 2017 -0400
----------------------------------------------------------------------
modules/worker/task-datastaging/pom.xml | 72 +++++
.../task/datastaging/handler/DataStageTask.java | 125 ++++++++
.../datastaging/handler/SCPDataStageTask.java | 292 +++++++++++++++++++
.../datastaging/utils/DataStagingFactory.java | 30 ++
.../datastaging/utils/DataStagingUtils.java | 107 +++++++
5 files changed, 626 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/pom.xml b/modules/worker/task-datastaging/pom.xml
new file mode 100644
index 0000000..c046f45
--- /dev/null
+++ b/modules/worker/task-datastaging/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>airavata-data-staging</artifactId>
+ <name>Airavata Task - Data Staging</name>
+ <description>I/O Data staging task implementation.</description>
+ <url>http://airavata.apache.org/</url>
+
+ <properties>
+ <jcraft.version>0.1.53</jcraft.version>
+ <net.schmizz.version>0.6.1</net.schmizz.version>
+ </properties>
+
+ <dependencies>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>${jcraft.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.schmizz</groupId>
+ <artifactId>sshj</artifactId>
+ <version>${net.schmizz.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-data-models</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-worker-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.ethz.ganymed</groupId>
+ <artifactId>ganymed-ssh2</artifactId>
+ <version>262</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy</artifactId>
+ <version>${groovy.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-templates</artifactId>
+ <version>${groovy.version}</version>
+ </dependency>
+ <!-- Credential Store -->
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
new file mode 100644
index 0000000..aab6a735
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+ if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ + taskContext.getTaskModel().getTaskType().toString());
+ } else {
+ try {
+ DataStagingTaskModel subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+ URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = new URI(subTaskModel.getDestination());
+
+ ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ /**
+ * copy local file to compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+ .getPath());
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ /**
+ * copy remote file from compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+ .getPath());
+ }
+ status.setReason("Successfully staged data");
+ } catch (WorkerException e) {
+ String msg = "Scp attempt failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (TException e) {
+ String msg = "Invalid task invocation";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (URISyntaxException e) {
+ String msg = "source or destination is not a valid URI";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ TaskState state = taskContext.getTaskStatus().getState();
+ if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+ return execute(taskContext);
+ } else {
+ // files already transferred or failed
+ return taskContext.getTaskStatus();
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
new file mode 100644
index 0000000..bd13b1e
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.CommandInfo;
+import org.apache.airavata.worker.core.cluster.RawCommandInfo;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
+public class SCPDataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+ private static final int DEFAULT_SSH_PORT = 22;
+ private String hostName;
+ private String inputPath;
+
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+ AuthenticationInfo authenticationInfo = null;
+ DataStagingTaskModel subTaskModel = null;
+ String localDataDir = null;
+
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ ProcessState processState = processContext.getProcessState();
+ try {
+ subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+ if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ OutputDataObjectType processOutput = taskContext.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processOutput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processOutput.isIsRequired()) {
+ status.setReason("File name is null, but this output's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+ InputDataObjectType processInput = taskContext.getProcessInput();
+ if (processInput != null && processInput.getValue() == null) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ processInput.getName());
+ status = new TaskStatus(TaskState.FAILED);
+ if (processInput.isIsRequired()) {
+ status.setReason("File name is null, but this input's isRequired bit is not set");
+ } else {
+ status.setReason("File name is null");
+ }
+ return status;
+ }
+ } else {
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+ "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+ return status;
+ }
+
+ StorageResourceDescription storageResource = processContext.getStorageResource();
+// StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+ if (storageResource != null) {
+ hostName = storageResource.getHostName();
+ } else {
+ throw new WorkerException("Storage Resource is null");
+ }
+ inputPath = processContext.getStorageFileSystemRootLocation();
+ inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+ // use rsync instead of scp if source and destination host and user name is same.
+ URI sourceURI = new URI(subTaskModel.getSource());
+ String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+ sourceURI.getPath().length());
+ URI destinationURI = null;
+ if (subTaskModel.getDestination().startsWith("dummy")) {
+ destinationURI = DataStagingUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+ subTaskModel.setDestination(destinationURI.toString());
+ } else {
+ destinationURI = new URI(subTaskModel.getDestination());
+ }
+
+ if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+ && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+ localDataCopy(taskContext, sourceURI, destinationURI);
+ status.setState(TaskState.COMPLETED);
+ status.setReason("Locally copied file using 'cp' command ");
+ return status;
+ }
+
+ authenticationInfo = DataStagingFactory.getComputerResourceSSHKeyAuthentication(processContext);
+ status = new TaskStatus(TaskState.COMPLETED);
+
+ ServerInfo serverInfo = processContext.getComputeResourceServerInfo();
+ Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
+ if (processState == ProcessState.INPUT_DATA_STAGING) {
+ inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged input data");
+ } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+ makeDir(taskContext, destinationURI);
+ // TODO - save updated subtask model with new destination
+ outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+ status.setReason("Successfully staged output data");
+ }
+ } catch (TException e) {
+ String msg = "Couldn't create subTask model thrift model";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ return status;
+ } catch (ApplicationSettingsException | FileNotFoundException e) {
+ String msg = "Failed while reading credentials";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (URISyntaxException e) {
+ String msg = "Source or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+ "destination : " + subTaskModel.getDestination();
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (CredentialStoreException e) {
+ String msg = "Storage authentication issue, could be invalid credential token";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (AiravataException e) {
+ String msg = "Error while creating ssh session with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (JSchException | IOException e) {
+ String msg = "Failed to do scp with client";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ } catch (WorkerException e) {
+ String msg = "Data staging failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ return status;
+ }
+
+ private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws WorkerException {
+ StringBuilder sb = new StringBuilder("rsync -cr ");
+ sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+ CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
+ }
+
+ private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+ destinationURI) throws WorkerException, IOException, JSchException {
+ /**
+ * scp third party file transfer 'to' compute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, false);
+ }
+
+ private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+ throws AiravataException, IOException, JSchException, WorkerException {
+
+ /**
+ * scp third party file transfer 'from' comute resource.
+ */
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, true);
+ // update output locations
+ DataStagingUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+ DataStagingUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+
+ }
+
+ private void makeDir(TaskContext taskContext, URI pathURI) throws WorkerException {
+ int endIndex = pathURI.getPath().lastIndexOf('/');
+ if (endIndex < 1) {
+ return;
+ }
+ String targetPath = pathURI.getPath().substring(0, endIndex);
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().makeDirectory(targetPath);
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ TaskState state = taskContext.getTaskStatus().getState();
+ if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+ return execute(taskContext);
+ } else {
+ // files already transferred or failed
+ return taskContext.getTaskStatus();
+ }
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
new file mode 100644
index 0000000..51996b8
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(DataStagingFactory.class);
+
+ public static SSHKeyAuthentication getComputerResourceSSHKeyAuthentication(ProcessContext pc)
+ throws WorkerException, CredentialStoreException {
+ try {
+ return WorkerFactory.getSshKeyAuthentication(pc.getGatewayId(),
+ pc.getComputeResourceLoginUserName(),
+ pc.getComputeResourceCredentialToken());
+ } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+ throw new WorkerException("Couldn't build ssh authentication object", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
new file mode 100644
index 0000000..d3d7522
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(DataStagingUtils.class);
+
+ public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+ String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ if (experimentDataDir.startsWith(File.separator)) {
+ filePath = experimentDataDir + fileName;
+ } else {
+ filePath = inputPath + experimentDataDir + fileName;
+ }
+ } else {
+ filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ }
+ //FIXME
+ return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+ }
+ public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String experimentId = processContext.getExperimentId();
+ ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+ for (OutputDataObjectType expOutput : experimentOutputs){
+ if (expOutput.getName().equals(outputName)){
+ DataProductModel dataProductModel = new DataProductModel();
+ dataProductModel.setGatewayId(processContext.getGatewayId());
+ dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+ dataProductModel.setProductName(outputName);
+ dataProductModel.setDataProductType(DataProductType.FILE);
+
+ DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+ replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+ replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+ replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+ replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+ replicaLocationModel.setFilePath(outputVal);
+ dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+ ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+ String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+ expOutput.setValue(productUri);
+ }
+ }
+ }
+ experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment outputs";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static void saveProcessOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String processId = processContext.getProcessId();
+ List<OutputDataObjectType> processOutputs = (List<OutputDataObjectType> )experimentCatalog.get(ExperimentCatalogModelType.PROCESS_OUTPUT, processId);
+ if (processOutputs != null && !processOutputs.isEmpty()){
+ for (OutputDataObjectType processOutput : processOutputs){
+ if (processOutput.getName().equals(outputName)){
+ processOutput.setValue(outputVal);
+ }
+ }
+ }
+ ProcessModel processModel = processContext.getProcessModel();
+ processModel.setProcessOutputs(processOutputs);
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment outputs";
+ throw new WorkerException(msg, e);
+ }
+ }
+}
[5/6] airavata git commit: Adding environment setup task
implementation
Posted by ad...@apache.org.
Adding environment setup task implementation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/fbc0351e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/fbc0351e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/fbc0351e
Branch: refs/heads/feature-workload-mgmt
Commit: fbc0351eb06cc369692cb45fcd90e0227ff851e4
Parents: a20405b
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:37:44 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:37:44 2017 -0400
----------------------------------------------------------------------
.../envsetup/handler/EnvironmentSetupTask.java | 75 ++++++++++++++++++++
1 file changed, 75 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/fbc0351e/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
new file mode 100644
index 0000000..390b5e2
--- /dev/null
+++ b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.envsetup.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class EnvironmentSetupTask implements Task {
+
+ private static final Logger log = LoggerFactory.getLogger(EnvironmentSetupTask.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+ try {
+ RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
+ remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+ status.setReason("Successfully created environment");
+ } catch (WorkerException e) {
+ String msg = "Error while environment setup";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+ }
+ return status;
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return execute(taskContext);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.ENV_SETUP;
+ }
+}
[2/6] airavata git commit: Adding common utils methods to worker core
Posted by ad...@apache.org.
Adding common utils methods to worker core
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e1ff08b6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e1ff08b6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e1ff08b6
Branch: refs/heads/feature-workload-mgmt
Commit: e1ff08b6787a964e6c92002e562e555755367b82
Parents: 4248419
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:35:28 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:35:28 2017 -0400
----------------------------------------------------------------------
.../airavata/worker/core/utils/WorkerUtils.java | 21 ++++++++++++++++++++
1 file changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e1ff08b6/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
index ef11c20..a53d736 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
@@ -1,6 +1,11 @@
package org.apache.airavata.worker.core.utils;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.commons.ErrorModel;
@@ -211,4 +216,20 @@ public class WorkerUtils {
return null;
}
}
+
+ public static CredentialReader getCredentialReader()
+ throws ApplicationSettingsException, IllegalAccessException,
+ InstantiationException {
+ try {
+ String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+ String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+ String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+ String driver = ServerSettings.getCredentialStoreDBDriver();
+ return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+ driver));
+ } catch (ClassNotFoundException e) {
+ logger.error("Not able to find driver: " + e.getLocalizedMessage());
+ return null;
+ }
+ }
}
[4/6] airavata git commit: Updating worker core pom
Posted by ad...@apache.org.
Updating worker core pom
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a20405be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a20405be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a20405be
Branch: refs/heads/feature-workload-mgmt
Commit: a20405be05985dd6bdbe58d47d6b4564ae73893d
Parents: 99fb72d
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:36:45 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:36:45 2017 -0400
----------------------------------------------------------------------
modules/worker/worker-core/pom.xml | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a20405be/modules/worker/worker-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/pom.xml b/modules/worker/worker-core/pom.xml
index 9ac27d4..8f00821 100644
--- a/modules/worker/worker-core/pom.xml
+++ b/modules/worker/worker-core/pom.xml
@@ -33,6 +33,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.53</version>