You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ad...@apache.org on 2017/04/13 18:47:49 UTC

[1/6] airavata git commit: Adding common factory methods

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt a974f3fb2 -> ea865f237


Adding common factory methods


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4248419d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4248419d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4248419d

Branch: refs/heads/feature-workload-mgmt
Commit: 4248419db5f02aa3ed9e8d8fc0fc1d0c85ce953b
Parents: a974f3f
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:34:51 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:34:51 2017 -0400

----------------------------------------------------------------------
 .../worker/core/utils/WorkerFactory.java        | 181 ++++++++++++++++++-
 1 file changed, 180 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4248419d/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
index 6dcd275..75a8062 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
@@ -23,9 +23,17 @@ package org.apache.airavata.worker.core.utils;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
-import com.jcraft.jsch.Session;
+import com.jcraft.jsch.*;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
 import org.apache.airavata.worker.core.config.ResourceConfig;
 import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
 import org.apache.airavata.worker.core.exceptions.WorkerException;
@@ -34,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -79,4 +88,174 @@ public class WorkerFactory {
     public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
         return resources.get(resourceJobManagerType);
     }
+
+    public static SSHKeyAuthentication getSshKeyAuthentication(String gatewayId,
+                                                                String loginUserName,
+                                                                String credentialStoreToken)
+            throws ApplicationSettingsException, IllegalAccessException, InstantiationException,
+            CredentialStoreException, WorkerException {
+
+        SSHKeyAuthentication sshKA;CredentialReader credentialReader = WorkerUtils.getCredentialReader();
+        Credential credential = credentialReader.getCredential(gatewayId, credentialStoreToken);
+        if (credential instanceof SSHCredential) {
+            sshKA = new SSHKeyAuthentication();
+            sshKA.setUserName(loginUserName);
+            SSHCredential sshCredential = (SSHCredential) credential;
+            sshKA.setPublicKey(sshCredential.getPublicKey());
+            sshKA.setPrivateKey(sshCredential.getPrivateKey());
+            sshKA.setPassphrase(sshCredential.getPassphrase());
+            sshKA.setStrictHostKeyChecking("no");
+/*            sshKA.setStrictHostKeyChecking(ServerSettings.getSetting("ssh.strict.hostKey.checking", "no"));
+            sshKA.setKnownHostsFilePath(ServerSettings.getSetting("ssh.known.hosts.file", null));
+            if (sshKA.getStrictHostKeyChecking().equals("yes") && sshKA.getKnownHostsFilePath() == null) {
+                throw new ApplicationSettingsException("If ssh strict hostkey checking property is set to yes, you must " +
+                        "provide known host file path");
+            }*/
+            return sshKA;
+        } else {
+            String msg = "Provided credential store token is not valid. Please provide the correct credential store token";
+            log.error(msg);
+            throw new CredentialStoreException("Invalid credential store token:" + credentialStoreToken);
+        }
+    }
+
+    public static synchronized Session getSSHSession(AuthenticationInfo authenticationInfo,
+                                                     ServerInfo serverInfo) throws WorkerException {
+        if (authenticationInfo == null
+                || serverInfo == null) {
+
+            throw new IllegalArgumentException("Can't create ssh session, argument should be valid (not null)");
+        }
+        SSHKeyAuthentication authentication;
+        if (authenticationInfo instanceof SSHKeyAuthentication) {
+            authentication = (SSHKeyAuthentication) authenticationInfo;
+        } else {
+            throw new WorkerException("Support ssh key authentication only");
+        }
+        String key = buildKey(serverInfo);
+        Session session = sessionCache.getIfPresent(key);
+        boolean valid = isValidSession(session);
+        // FIXME - move following info logs to debug
+        if (valid) {
+            log.info("SSH Session validation succeeded, key :" + key);
+            valid = testChannelCreation(session);
+            if (valid) {
+                log.info("Channel creation test succeeded, key :" + key);
+            } else {
+                log.info("Channel creation test failed, key :" + key);
+            }
+        } else {
+            log.info("Session validation failed, key :" + key);
+        }
+
+        if (!valid) {
+            if (session != null) {
+                log.info("Reinitialize a new SSH session for :" + key);
+            } else {
+                log.info("Initialize a new SSH session for :" + key);
+            }
+            try {
+
+                JSch jSch = new JSch();
+                jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(),
+                        authentication.getPassphrase().getBytes());
+                session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(),
+                        serverInfo.getPort());
+                session.setUserInfo(new DefaultUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+                if (authentication.getStrictHostKeyChecking().equals("yes")) {
+                    jSch.setKnownHosts(authentication.getKnownHostsFilePath());
+                } else {
+                    session.setConfig("StrictHostKeyChecking", "no");
+                }
+                session.connect(); // 0 connection timeout
+                sessionCache.put(key, session);
+            } catch (JSchException e) {
+                throw new WorkerException("JSch initialization error ", e);
+            }
+        } else {
+            // FIXME - move following info log to debug
+            log.info("Reuse SSH session for :" + key);
+        }
+        return session;
+
+    }
+
+    private static boolean testChannelCreation(Session session) {
+
+        String command = "pwd ";
+        Channel channel = null;
+        try {
+            channel = session.openChannel("exec");
+            StandardOutReader stdOutReader = new StandardOutReader();
+            ((ChannelExec) channel).setCommand(command);
+            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+            channel.connect();
+            stdOutReader.onOutput(channel);
+        } catch (JSchException e) {
+            log.error("Test Channel creation failed.", e);
+            return false;
+        } finally {
+            if (channel != null) {
+                channel.disconnect();
+            }
+        }
+        return true;
+    }
+
+    private static boolean isValidSession(Session session) {
+        return session != null && session.isConnected();
+    }
+
+    private static String buildKey(ServerInfo serverInfo) {
+        return serverInfo.getUserName() +
+                "_" +
+                serverInfo.getHost() +
+                "_" +
+                serverInfo.getPort() +
+                "_" +
+                serverInfo.getCredentialToken();
+    }
+
+    private static class DefaultUserInfo implements UserInfo {
+
+        private String userName;
+        private String password;
+        private String passphrase;
+
+        public DefaultUserInfo(String userName, String password, String passphrase) {
+            this.userName = userName;
+            this.password = password;
+            this.passphrase = passphrase;
+        }
+
+        @Override
+        public String getPassphrase() {
+            return null;
+        }
+
+        @Override
+        public String getPassword() {
+            return null;
+        }
+
+        @Override
+        public boolean promptPassword(String s) {
+            return false;
+        }
+
+        @Override
+        public boolean promptPassphrase(String s) {
+            return false;
+        }
+
+        @Override
+        public boolean promptYesNo(String s) {
+            return false;
+        }
+
+        @Override
+        public void showMessage(String s) {
+
+        }
+    }
 }


[3/6] airavata git commit: Adding StandardOutReader to worker core

Posted by ad...@apache.org.
Adding StandardOutReader to worker core


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/99fb72d1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/99fb72d1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/99fb72d1

Branch: refs/heads/feature-workload-mgmt
Commit: 99fb72d1fd0ae376d531a3e46b700ec9973870fd
Parents: e1ff08b
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:36:11 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:36:11 2017 -0400

----------------------------------------------------------------------
 .../worker/core/utils/StandardOutReader.java    | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/99fb72d1/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
new file mode 100644
index 0000000..d5cb2e8
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/StandardOutReader.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.worker.core.utils;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.worker.core.cluster.CommandOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StandardOutReader implements CommandOutput {
+
+    private static final Logger logger = LoggerFactory.getLogger(StandardOutReader.class);
+    String stdOutputString = null;
+    ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+	private int exitCode;
+
+	public void onOutput(Channel channel) {
+        try {
+            StringBuffer pbsOutput = new StringBuffer("");
+            InputStream inputStream =  channel.getInputStream();
+            byte[] tmp = new byte[1024];
+            do {
+                while (inputStream.available() > 0) {
+                    int i = inputStream.read(tmp, 0, 1024);
+                    if (i < 0) break;
+                    pbsOutput.append(new String(tmp, 0, i));
+                }
+            } while (!channel.isClosed()) ;
+            String output = pbsOutput.toString();
+            this.setStdOutputString(output);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+
+    }
+
+
+    public void exitCode(int code) {
+        System.out.println("Program exit code - " + code);
+	    this.exitCode = code;
+    }
+
+	@Override
+	public int getExitCode() {
+		return exitCode;
+	}
+
+	public String getStdOutputString() {
+        return stdOutputString;
+    }
+
+    public void setStdOutputString(String stdOutputString) {
+        this.stdOutputString = stdOutputString;
+    }
+
+    public String getStdErrorString() {
+        return errorStream.toString();
+    }
+
+    public OutputStream getStandardError() {
+        return errorStream;
+    }
+}


[6/6] airavata git commit: Adding Data staging task implementation and related utils

Posted by ad...@apache.org.
Adding Data staging task implementation and related utils


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ea865f23
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ea865f23
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ea865f23

Branch: refs/heads/feature-workload-mgmt
Commit: ea865f23709778addf03480b10c5a9e80b2294f3
Parents: fbc0351
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:38:31 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:38:31 2017 -0400

----------------------------------------------------------------------
 modules/worker/task-datastaging/pom.xml         |  72 +++++
 .../task/datastaging/handler/DataStageTask.java | 125 ++++++++
 .../datastaging/handler/SCPDataStageTask.java   | 292 +++++++++++++++++++
 .../datastaging/utils/DataStagingFactory.java   |  30 ++
 .../datastaging/utils/DataStagingUtils.java     | 107 +++++++
 5 files changed, 626 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/pom.xml b/modules/worker/task-datastaging/pom.xml
new file mode 100644
index 0000000..c046f45
--- /dev/null
+++ b/modules/worker/task-datastaging/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata-worker</artifactId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>airavata-data-staging</artifactId>
+    <name>Airavata Task - Data Staging</name>
+    <description>I/O Data staging task implementation.</description>
+    <url>http://airavata.apache.org/</url>
+
+    <properties>
+        <jcraft.version>0.1.53</jcraft.version>
+        <net.schmizz.version>0.6.1</net.schmizz.version>
+    </properties>
+
+    <dependencies>
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>${jcraft.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>net.schmizz</groupId>
+            <artifactId>sshj</artifactId>
+            <version>${net.schmizz.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-worker-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.ethz.ganymed</groupId>
+            <artifactId>ganymed-ssh2</artifactId>
+            <version>262</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-templates</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <!-- Credential Store -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
new file mode 100644
index 0000000..aab6a735
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/DataStageTask.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+	private static final Logger log = LoggerFactory.getLogger(DataStageTask.class);
+
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+			status.setState(TaskState.FAILED);
+			status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+					+ taskContext.getTaskModel().getTaskType().toString());
+		} else {
+			try {
+				DataStagingTaskModel subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+				URI sourceURI = new URI(subTaskModel.getSource());
+				URI destinationURI = new URI(subTaskModel.getDestination());
+
+				ProcessState processState = taskContext.getParentProcessContext().getProcessState();
+				if (processState == ProcessState.INPUT_DATA_STAGING) {
+					/**
+					 * copy local file to compute resource.
+					 */
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+							.getPath());
+				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+					/**
+					 * copy remote file from compute resource.
+					 */
+					taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+							.getPath());
+				}
+				status.setReason("Successfully staged data");
+			} catch (WorkerException e) {
+				String msg = "Scp attempt failed";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			} catch (TException e) {
+				String msg = "Invalid task invocation";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			} catch (URISyntaxException e) {
+				String msg = "source or destination is not a valid URI";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+			}
+		}
+		return status;
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.DATA_STAGING;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
new file mode 100644
index 0000000..bd13b1e
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/handler/SCPDataStageTask.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.datastaging.handler;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.CommandInfo;
+import org.apache.airavata.worker.core.cluster.RawCommandInfo;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingFactory;
+import org.apache.airavata.worker.task.datastaging.utils.DataStagingUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * This will be used for both Input file staging and output file staging, hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output cases.
+ */
+public class SCPDataStageTask implements Task {
+    private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String hostName;
+    private String inputPath;
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+        AuthenticationInfo authenticationInfo = null;
+        DataStagingTaskModel subTaskModel = null;
+        String localDataDir = null;
+
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        ProcessState processState = processContext.getProcessState();
+        try {
+            subTaskModel = ((DataStagingTaskModel) taskContext.getSubTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+                InputDataObjectType processInput = taskContext.getProcessInput();
+                if (processInput != null && processInput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+                            processInput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processInput.isIsRequired()) {
+                        status.setReason("File name is null, but this input's isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else {
+                status.setState(TaskState.FAILED);
+                status.setReason("Invalid task invocation, Support " + ProcessState.INPUT_DATA_STAGING.name() + " and " +
+                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " process phases. found " + processState.name());
+                return status;
+            }
+
+            StorageResourceDescription storageResource = processContext.getStorageResource();
+//            StoragePreference storagePreference = taskContext.getParentProcessContext().getStoragePreference();
+
+            if (storageResource != null) {
+                hostName = storageResource.getHostName();
+            } else {
+                throw new WorkerException("Storage Resource is null");
+            }
+            inputPath  = processContext.getStorageFileSystemRootLocation();
+            inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
+
+            // use rsync instead of scp if source and destination host and user name is same.
+            URI sourceURI = new URI(subTaskModel.getSource());
+            String fileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
+                    sourceURI.getPath().length());
+            URI destinationURI = null;
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = DataStagingUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                subTaskModel.setDestination(destinationURI.toString());
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
+
+            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+                    && sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+                localDataCopy(taskContext, sourceURI, destinationURI);
+                status.setState(TaskState.COMPLETED);
+                status.setReason("Locally copied file using 'cp' command ");
+                return status;
+            }
+
+            authenticationInfo = DataStagingFactory.getComputerResourceSSHKeyAuthentication(processContext);
+            status = new TaskStatus(TaskState.COMPLETED);
+
+            ServerInfo serverInfo = processContext.getComputeResourceServerInfo();
+            Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
+            if (processState == ProcessState.INPUT_DATA_STAGING) {
+                inputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged input data");
+            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                makeDir(taskContext, destinationURI);
+                // TODO - save updated subtask model with new destination
+                outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
+                status.setReason("Successfully staged output data");
+            }
+        } catch (TException e) {
+            String msg = "Couldn't create subTask model thrift model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+            return status;
+        } catch (ApplicationSettingsException | FileNotFoundException e) {
+            String msg = "Failed while reading credentials";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (URISyntaxException e) {
+            String msg = "Source or destination uri is not correct source : " + subTaskModel.getSource() + ", " +
+                    "destination : " + subTaskModel.getDestination();
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (CredentialStoreException e) {
+            String msg = "Storage authentication issue, could be invalid credential token";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (AiravataException e) {
+            String msg = "Error while creating ssh session with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (JSchException | IOException e) {
+            String msg = "Failed to do scp with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        } catch (WorkerException e) {
+            String msg = "Data staging failed";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+        }
+        return status;
+    }
+
+    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI destinationURI) throws WorkerException {
+        StringBuilder sb = new StringBuilder("rsync -cr ");
+        sb.append(sourceURI.getPath()).append(" ").append(destinationURI.getPath());
+        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().execute(commandInfo);
+    }
+
+    private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
+            destinationURI) throws WorkerException, IOException, JSchException {
+        /**
+         * scp third party file transfer 'to' compute resource.
+         */
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, false);
+    }
+
+    private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
+            throws AiravataException, IOException, JSchException, WorkerException {
+
+        /**
+         * scp third party file transfer 'from' comute resource.
+         */
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, true);
+        // update output locations
+        DataStagingUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+        DataStagingUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+
+    }
+
+    private void makeDir(TaskContext taskContext, URI pathURI) throws WorkerException {
+        int endIndex = pathURI.getPath().lastIndexOf('/');
+        if (endIndex < 1) {
+            return;
+        }
+        String targetPath = pathURI.getPath().substring(0, endIndex);
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster().makeDirectory(targetPath);
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.DATA_STAGING;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
new file mode 100644
index 0000000..51996b8
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingFactory.java
@@ -0,0 +1,30 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(DataStagingFactory.class);
+
+    public static SSHKeyAuthentication getComputerResourceSSHKeyAuthentication(ProcessContext pc)
+            throws WorkerException, CredentialStoreException {
+        try {
+            return WorkerFactory.getSshKeyAuthentication(pc.getGatewayId(),
+                    pc.getComputeResourceLoginUserName(),
+                    pc.getComputeResourceCredentialToken());
+        } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+            throw new WorkerException("Couldn't build ssh authentication object", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ea865f23/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
new file mode 100644
index 0000000..d3d7522
--- /dev/null
+++ b/modules/worker/task-datastaging/src/main/java/org/apache/airavata/worker/task/datastaging/utils/DataStagingUtils.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.worker.task.datastaging.utils;
+
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Created by Ajinkya on 4/13/17.
+ */
+public class DataStagingUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(DataStagingUtils.class);
+
+    public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+        String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            if (experimentDataDir.startsWith(File.separator)) {
+                filePath = experimentDataDir + fileName;
+            } else {
+                filePath = inputPath + experimentDataDir + fileName;
+            }
+        } else {
+            filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        }
+        //FIXME
+        return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+    }
+    public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String experimentId = processContext.getExperimentId();
+            ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+            if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+                for (OutputDataObjectType expOutput : experimentOutputs){
+                    if (expOutput.getName().equals(outputName)){
+                        DataProductModel dataProductModel = new DataProductModel();
+                        dataProductModel.setGatewayId(processContext.getGatewayId());
+                        dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+                        dataProductModel.setProductName(outputName);
+                        dataProductModel.setDataProductType(DataProductType.FILE);
+
+                        DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+                        replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+                        replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+                        replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+                        replicaLocationModel.setFilePath(outputVal);
+                        dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+                        ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+                        String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+                        expOutput.setValue(productUri);
+                    }
+                }
+            }
+            experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static void saveProcessOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String processId = processContext.getProcessId();
+            List<OutputDataObjectType>  processOutputs = (List<OutputDataObjectType> )experimentCatalog.get(ExperimentCatalogModelType.PROCESS_OUTPUT, processId);
+            if (processOutputs != null && !processOutputs.isEmpty()){
+                for (OutputDataObjectType processOutput : processOutputs){
+                    if (processOutput.getName().equals(outputName)){
+                        processOutput.setValue(outputVal);
+                    }
+                }
+            }
+            ProcessModel processModel = processContext.getProcessModel();
+            processModel.setProcessOutputs(processOutputs);
+            experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+}


[5/6] airavata git commit: Adding environment setup task implementation

Posted by ad...@apache.org.
Adding environment setup task implementation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/fbc0351e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/fbc0351e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/fbc0351e

Branch: refs/heads/feature-workload-mgmt
Commit: fbc0351eb06cc369692cb45fcd90e0227ff851e4
Parents: a20405b
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:37:44 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:37:44 2017 -0400

----------------------------------------------------------------------
 .../envsetup/handler/EnvironmentSetupTask.java  | 75 ++++++++++++++++++++
 1 file changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/fbc0351e/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
new file mode 100644
index 0000000..390b5e2
--- /dev/null
+++ b/modules/worker/task-envsetup/src/main/java/org/apache/airavata/worker/task/envsetup/handler/EnvironmentSetupTask.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.envsetup.handler;
+
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.Task;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class EnvironmentSetupTask implements Task {
+
+	private static final Logger log = LoggerFactory.getLogger(EnvironmentSetupTask.class);
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+
+	}
+
+	@Override
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+		try {
+			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
+			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+			status.setReason("Successfully created environment");
+		} catch (WorkerException e) {
+			String msg = "Error while environment setup";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
+		}
+		return status;
+	}
+
+	@Override
+	public TaskStatus recover(TaskContext taskContext) {
+		return execute(taskContext);
+	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.ENV_SETUP;
+	}
+}


[2/6] airavata git commit: Adding common utils methods to worker core

Posted by ad...@apache.org.
Adding common utils methods to worker core


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e1ff08b6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e1ff08b6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e1ff08b6

Branch: refs/heads/feature-workload-mgmt
Commit: e1ff08b6787a964e6c92002e562e555755367b82
Parents: 4248419
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:35:28 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:35:28 2017 -0400

----------------------------------------------------------------------
 .../airavata/worker/core/utils/WorkerUtils.java | 21 ++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e1ff08b6/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
index ef11c20..a53d736 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
@@ -1,6 +1,11 @@
 package org.apache.airavata.worker.core.utils;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.commons.ErrorModel;
@@ -211,4 +216,20 @@ public class WorkerUtils {
                 return null;
         }
     }
+
+    public static CredentialReader getCredentialReader()
+            throws ApplicationSettingsException, IllegalAccessException,
+            InstantiationException {
+        try {
+            String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+            String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+            String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+            String driver = ServerSettings.getCredentialStoreDBDriver();
+            return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+                    driver));
+        } catch (ClassNotFoundException e) {
+            logger.error("Not able to find driver: " + e.getLocalizedMessage());
+            return null;
+        }
+    }
 }


[4/6] airavata git commit: Updating worker core pom

Posted by ad...@apache.org.
Updating worker core pom


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a20405be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a20405be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a20405be

Branch: refs/heads/feature-workload-mgmt
Commit: a20405be05985dd6bdbe58d47d6b4564ae73893d
Parents: 99fb72d
Author: Ajinkya Dhamnaskar <ad...@apache.org>
Authored: Thu Apr 13 14:36:45 2017 -0400
Committer: Ajinkya Dhamnaskar <ad...@apache.org>
Committed: Thu Apr 13 14:36:45 2017 -0400

----------------------------------------------------------------------
 modules/worker/worker-core/pom.xml | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a20405be/modules/worker/worker-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/pom.xml b/modules/worker/worker-core/pom.xml
index 9ac27d4..8f00821 100644
--- a/modules/worker/worker-core/pom.xml
+++ b/modules/worker/worker-core/pom.xml
@@ -33,6 +33,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.jcraft</groupId>
             <artifactId>jsch</artifactId>
             <version>0.1.53</version>