You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/06/28 19:39:57 UTC
airavata-sandbox git commit: Add SSH Job Workflow
Repository: airavata-sandbox
Updated Branches:
refs/heads/master 8b284d8f8 -> 2e56d5ce3
Add SSH Job Workflow
Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/2e56d5ce
Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/2e56d5ce
Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/2e56d5ce
Branch: refs/heads/master
Commit: 2e56d5ce3a30c8f546dc70d80879a86680e0c196
Parents: 8b284d8
Author: Gourav Shenoy <sh...@gmail.com>
Authored: Wed Jun 28 15:39:52 2017 -0400
Committer: Gourav Shenoy <sh...@gmail.com>
Committed: Wed Jun 28 15:39:52 2017 -0400
----------------------------------------------------------------------
.gitignore | 1 +
helix-playground/pom.xml | 9 +-
.../iu/helix/airavata/HelixClusterManager.java | 18 +-
.../java/edu/iu/helix/airavata/HelixUtil.java | 222 +++++++++--
.../edu/iu/helix/airavata/ParticipantNode.java | 7 +
.../java/edu/iu/helix/airavata/ZkUtils.java | 72 ++++
.../airavata/tasks/ssh/Authentication.java | 4 +-
.../airavata/tasks/ssh/SSHApiException.java | 35 ++
.../tasks/ssh/SSHCommandOutputReader.java | 90 +++++
.../tasks/ssh/SSHKeyAuthentication.java | 2 +-
.../iu/helix/airavata/tasks/ssh/SSHRunner.java | 394 +++++++++++++++++++
.../iu/helix/airavata/tasks/ssh/SSHTask.java | 38 +-
.../airavata/tasks/ssh/SSHTaskContext.java | 17 +-
.../helix/airavata/tasks/ssh/SSHUserInfo.java | 4 +-
.../iu/helix/airavata/tasks/ssh/ServerInfo.java | 4 +-
.../src/main/resources/ssh/id_rsa.pub | 1 -
16 files changed, 865 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4a9b5e3..d6e588b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
*.class
+*id_rsa*
# Mobile Tools for Java (J2ME)
.mtj.tmp/
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/pom.xml
----------------------------------------------------------------------
diff --git a/helix-playground/pom.xml b/helix-playground/pom.xml
index 0b624c6..809c712 100644
--- a/helix-playground/pom.xml
+++ b/helix-playground/pom.xml
@@ -19,8 +19,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
</plugins>
@@ -54,6 +54,11 @@
<artifactId>jsch</artifactId>
<version>0.1.53</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.8.0</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
index 313a674..927b377 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixClusterManager.java
@@ -102,10 +102,10 @@ public class HelixClusterManager {
}
public boolean submitDag(HelixUtil.DAGType dagType) {
- Workflow workflow = HelixUtil.getWorkflow(dagType);
- taskDriver.start(workflow);
- System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
try {
+ Workflow workflow = HelixUtil.getWorkflow(dagType);
+ taskDriver.start(workflow);
+ System.out.println("Started workflow for DagType: " + dagType + ", in cluster: " + clusterName);
taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED);
// while (true) {
// Thread.sleep(100);
@@ -129,8 +129,8 @@ public class HelixClusterManager {
}
public static void main(String[] args) {
- String clusterName = "HelixDemoCluster";
- String zkAddress = "localhost:2199";
+ String clusterName = HelixUtil.CLUSTER_NAME;
+ String zkAddress = HelixUtil.ZK_ADDRESS;
int numWorkers = 3;
int numPartitions = 1;
@@ -142,11 +142,11 @@ public class HelixClusterManager {
HelixClusterManager manager = new HelixClusterManager(clusterName, zkAddress, numWorkers, numPartitions);
manager.startHelixCluster();
- System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.TYPE_A);
- if (manager.submitDag(HelixUtil.DAGType.TYPE_A)) {
- System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+ System.out.println("Submitting Workflow for DagType: " + HelixUtil.DAGType.SSH);
+ if (manager.submitDag(HelixUtil.DAGType.SSH)) {
+ System.out.println("Successfully completed workflow for Dag: " + HelixUtil.DAGType.SSH);
} else {
- throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.TYPE_A);
+ throw new Exception("Failed to run workflow for Dag: " + HelixUtil.DAGType.SSH);
}
} catch (Exception ex) {
logger.error("Something went wrong while running helix cluster manager. Reason: " + ex, ex);
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
index 2cea68f..b2b634d 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/HelixUtil.java
@@ -4,11 +4,19 @@ import edu.iu.helix.airavata.tasks.HelixTaskA;
import edu.iu.helix.airavata.tasks.HelixTaskB;
import edu.iu.helix.airavata.tasks.HelixTaskC;
import edu.iu.helix.airavata.tasks.HelixTaskD;
+import edu.iu.helix.airavata.tasks.ssh.SSHKeyAuthentication;
+import edu.iu.helix.airavata.tasks.ssh.SSHServerInfo;
+import edu.iu.helix.airavata.tasks.ssh.SSHTask;
+import edu.iu.helix.airavata.tasks.ssh.SSHTaskContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.helix.task.*;
import org.jboss.netty.util.internal.ThreadLocalRandom;
+import java.io.*;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
/**
* Created by goshenoy on 6/21/17.
@@ -16,62 +24,121 @@ import java.util.List;
public class HelixUtil {
public static final String TASK_STATE_DEF = "Task";
+ public static final String ZK_ADDRESS = "localhost:2199";
+ public static final String CLUSTER_NAME = "HelixDemoCluster";
+
+ public static final String SSH_WORKFLOW = "SSH_Workflow";
+ public static final String CREATE_DIR_TASK = "Task_CreateDir";
+ public static final String COPY_PBS_TASK = "Task_CopyPBS";
+ public static final String COPY_PY_TASK = "Task_CopyPY";
+ public static final String QSUB_TASK = "Task_QSUB";
+
+
+ public static final String USERNAME = "username";
+ public static final String PRIVATE_KEY = "private_key";
+ public static final String PUBLIC_KEY = "public_key";
+ public static final String HOSTNAME = "hostname";
+ public static final String PORT = "port";
+ public static final String COMMAND = "command";
+ public static final String SRC_PATH = "src_path";
+ public static final String DEST_PATH = "dest_path";
+
public enum DAGType {
TYPE_A,
TYPE_B,
- TYPE_C
+ TYPE_C,
+ SSH
}
- private static Workflow.Builder getWorkflowBuilder(DAGType dagType) {
- // create task configs
- List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
- List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
- List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
- List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
- taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
- taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
- taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
- taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
-
- // create job configs
- JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
- JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
- JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
- JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
-
- // create workflow
- Workflow.Builder workflowBuilder = new Workflow.Builder("helix_workflow").setExpiry(0);
- workflowBuilder.addJob("helix_job_a", jobConfig1);
- workflowBuilder.addJob("helix_job_b", jobConfig2);
- workflowBuilder.addJob("helix_job_c", jobConfig3);
- workflowBuilder.addJob("helix_job_d", jobConfig4);
+ private static Workflow.Builder getWorkflowBuilder(DAGType dagType) throws Exception {
+ Workflow.Builder workflow = null;
+
+ if (dagType.equals(DAGType.SSH)) {
+ if (!setWorkflowData()) {
+ throw new Exception("Failed to create zk data for SSH workflow");
+ }
+ // create dir task - 1 task for job
+ List<TaskConfig> createDirTask = new ArrayList<TaskConfig>();
+ createDirTask.add(new TaskConfig.Builder().setTaskId(CREATE_DIR_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+ // copy files task - 2 tasks for job
+ List<TaskConfig> copyFilesTask = new ArrayList<TaskConfig>();
+ copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PBS_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+ copyFilesTask.add(new TaskConfig.Builder().setTaskId(COPY_PY_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+ // qsub task - 1 task for job
+ List<TaskConfig> qsubTask = new ArrayList<TaskConfig>();
+ qsubTask.add(new TaskConfig.Builder().setTaskId(QSUB_TASK).setCommand(SSHTask.TASK_COMMAND).build());
+
+ // create-dir job config
+ JobConfig.Builder createDirJob = new JobConfig.Builder().addTaskConfigs(createDirTask).setMaxAttemptsPerTask(3);
+ // copy-files job config
+ JobConfig.Builder copyFilesJob = new JobConfig.Builder().addTaskConfigs(copyFilesTask).setMaxAttemptsPerTask(3);
+
+ // qsub job config
+ JobConfig.Builder qsubJob = new JobConfig.Builder().addTaskConfigs(qsubTask).setMaxAttemptsPerTask(1);
+
+ // create workflow
+ workflow = new Workflow.Builder(SSH_WORKFLOW).setExpiry(0);
+ workflow.addJob("createDirJob", createDirJob);
+ workflow.addJob("copyFilesJob", copyFilesJob);
+ workflow.addJob("qsubJob", qsubJob);
+ } else {
+ // create task configs
+ List<TaskConfig> taskConfig1 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig2 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig3 = new ArrayList<TaskConfig>();
+ List<TaskConfig> taskConfig4 = new ArrayList<TaskConfig>();
+ taskConfig1.add(new TaskConfig.Builder().setTaskId("helix_task_a").setCommand(HelixTaskA.TASK_COMMAND).build());
+ taskConfig2.add(new TaskConfig.Builder().setTaskId("helix_task_b").setCommand(HelixTaskB.TASK_COMMAND).build());
+ taskConfig3.add(new TaskConfig.Builder().setTaskId("helix_task_c").setCommand(HelixTaskC.TASK_COMMAND).build());
+ taskConfig4.add(new TaskConfig.Builder().setTaskId("helix_task_d").setCommand(HelixTaskD.TASK_COMMAND).build());
+
+ // create job configs
+ JobConfig.Builder jobConfig1 = new JobConfig.Builder().addTaskConfigs(taskConfig1).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig2 = new JobConfig.Builder().addTaskConfigs(taskConfig2).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig3 = new JobConfig.Builder().addTaskConfigs(taskConfig3).setMaxAttemptsPerTask(3);
+ JobConfig.Builder jobConfig4 = new JobConfig.Builder().addTaskConfigs(taskConfig4).setMaxAttemptsPerTask(3);
+
+ // create workflow
+ workflow = new Workflow.Builder("helix_workflow").setExpiry(0);
+ workflow.addJob("helix_job_a", jobConfig1);
+ workflow.addJob("helix_job_b", jobConfig2);
+ workflow.addJob("helix_job_c", jobConfig3);
+ workflow.addJob("helix_job_d", jobConfig4);
+ }
switch (dagType) {
case TYPE_A:
- workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_b");
- workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
- workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
+ workflow.addParentChildDependency("helix_job_a", "helix_job_b");
+ workflow.addParentChildDependency("helix_job_b", "helix_job_c");
+ workflow.addParentChildDependency("helix_job_c", "helix_job_d");
break;
case TYPE_B:
- workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_c");
- workflowBuilder.addParentChildDependency("helix_job_c", "helix_job_d");
- workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
+ workflow.addParentChildDependency("helix_job_a", "helix_job_c");
+ workflow.addParentChildDependency("helix_job_c", "helix_job_d");
+ workflow.addParentChildDependency("helix_job_d", "helix_job_b");
break;
case TYPE_C:
- workflowBuilder.addParentChildDependency("helix_job_a", "helix_job_d");
- workflowBuilder.addParentChildDependency("helix_job_d", "helix_job_b");
- workflowBuilder.addParentChildDependency("helix_job_b", "helix_job_c");
+ workflow.addParentChildDependency("helix_job_a", "helix_job_d");
+ workflow.addParentChildDependency("helix_job_d", "helix_job_b");
+ workflow.addParentChildDependency("helix_job_b", "helix_job_c");
+ break;
+
+ case SSH:
+ workflow.addParentChildDependency("createDirJob", "copyFilesJob");
+ workflow.addParentChildDependency("copyFilesJob", "qsubJob");
break;
}
- return workflowBuilder;
+ return workflow;
}
- public static Workflow getWorkflow(DAGType dagType) {
+ public static Workflow getWorkflow(DAGType dagType) throws Exception {
Workflow.Builder workflowBuilder = getWorkflowBuilder(dagType);
return workflowBuilder.build();
}
@@ -79,4 +146,89 @@ public class HelixUtil {
private static String generateWorkflowName() {
return "workflow_" + ThreadLocalRandom.current().nextInt(9999);
}
+
+ private static boolean setWorkflowData() {
+ try {
+ CuratorFramework curatorClient = ZkUtils.getCuratorClient();
+
+ SSHKeyAuthentication br2SshAuthentication = new SSHKeyAuthentication(
+ "snakanda",
+ IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")),
+ IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")),
+ "dummy",
+ HelixUtil.class.getClassLoader().getResource("ssh/known_hosts").getPath(),
+ false
+ );
+
+ SSHServerInfo br2 = new SSHServerInfo("snakanda", "bigred2.uits.iu.edu", br2SshAuthentication,22);
+
+ SSHTaskContext createDirTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
+ br2SshAuthentication, null, br2, "mkdir -p airavata");
+
+ SSHTaskContext qsubTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.EXECUTE_COMMAND,
+ br2SshAuthentication, null, br2, "qsub ~/airavata/job_tf.pbs");
+
+
+ SSHTaskContext copyPbsTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
+ br2SshAuthentication, null, br2,
+ HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
+
+ SSHTaskContext copyPyTC = new SSHTaskContext(SSHTaskContext.TASK_TYPE.FILE_COPY,
+ br2SshAuthentication, null, br2,
+ HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath(), "~/airavata/");
+
+ ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, CREATE_DIR_TASK, getBytes(createDirTC));
+ ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PBS_TASK, getBytes(copyPbsTC));
+ ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, COPY_PY_TASK, getBytes(copyPyTC));
+ ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, QSUB_TASK, getBytes(qsubTC));
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ private static byte[] getBytes(Object object) throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(bos);
+ out.writeObject(object);
+ out.flush();
+ return bos.toByteArray();
+ }
+
+ public static Object getObject(byte[] objectBytes) throws Exception {
+ ByteArrayInputStream bis = new ByteArrayInputStream(objectBytes);
+ ObjectInput in = new ObjectInputStream(bis);
+ return in.readObject();
+ }
+
+// public static void main(String[] args) throws Exception {
+//
+// CuratorFramework curatorClient = ZkUtils.getCuratorClient();
+//
+// // set common data
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, USERNAME, "snakanda");
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, HOSTNAME, "bigred2.uits.iu.edu");
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PORT, "22");
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PUBLIC_KEY,
+// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa.pub")));
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW, PRIVATE_KEY,
+// IOUtils.toByteArray(HelixUtil.class.getClassLoader().getResourceAsStream("ssh/id_rsa")));
+//
+//
+// // set data for mkdir
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "mkdir -p airavata");
+//
+// // set data for copy files
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, SRC_PATH,
+// HelixUtil.class.getClassLoader().getResource("job_tf.pbs").getPath());
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PBS_TASK, DEST_PATH, "~/airavata/");
+//
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, SRC_PATH,
+// HelixUtil.class.getClassLoader().getResource("code_tf.py").getPath());
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + COPY_PY_TASK, DEST_PATH, "~/airavata/");
+//
+// // set data for qsub
+// ZkUtils.createZkNode(curatorClient, SSH_WORKFLOW + "/" + CREATE_DIR_TASK, COMMAND, "qsub ~/airavata/job_tf.pbs");
+// }
}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
index 54466f2..9bca76e 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ParticipantNode.java
@@ -4,6 +4,7 @@ import edu.iu.helix.airavata.tasks.HelixTaskA;
import edu.iu.helix.airavata.tasks.HelixTaskB;
import edu.iu.helix.airavata.tasks.HelixTaskC;
import edu.iu.helix.airavata.tasks.HelixTaskD;
+import edu.iu.helix.airavata.tasks.ssh.SSHTask;
import org.apache.helix.InstanceType;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -93,6 +94,12 @@ public class ParticipantNode implements Runnable {
// register task model
Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>();
+ taskRegistry.put(SSHTask.TASK_COMMAND, new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new SSHTask(context);
+ }
+ });
taskRegistry.put(HelixTaskA.TASK_COMMAND, new TaskFactory() {
@Override
public Task createNewTask(TaskCallbackContext context) {
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
new file mode 100644
index 0000000..d07a17a
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/ZkUtils.java
@@ -0,0 +1,72 @@
+package edu.iu.helix.airavata;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by goshenoy on 6/28/17.
+ */
+public class ZkUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZkUtils.class);
+ private static CuratorFramework curatorClient;
+
+ /**
+ * Get curatorFramework instance
+ * @return
+ * @throws Exception
+ */
+ public static CuratorFramework getCuratorClient() throws Exception {
+ if (curatorClient == null) {
+ synchronized (ZkUtils.class) {
+ if (curatorClient == null) {
+ String connectionSting = HelixUtil.ZK_ADDRESS;
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+ curatorClient.start();
+ }
+ }
+ }
+
+ return curatorClient;
+ }
+
+ public static String getZkPath(String parentNode, String childNode) throws Exception {
+ return ZKPaths.makePath(parentNode, childNode);
+ }
+
+ public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, String data) throws Exception {
+ String zkPath = getZkPath(parentNode, childNode);
+ logger.debug("Creating Zk node for: " + zkPath);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath);
+
+ setZkData(curatorClient, zkPath, data.getBytes());
+ }
+
+ public static void createZkNode(CuratorFramework curatorClient, String parentNode, String childNode, byte[] data) throws Exception {
+ String zkPath = getZkPath(parentNode, childNode);
+ logger.debug("Creating Zk node for: " + zkPath);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkPath);
+
+ setZkData(curatorClient, zkPath, data);
+ }
+
+ public static void setZkData(CuratorFramework curatorClient, String zkPath, byte[] data) throws Exception {
+ curatorClient.setData().withVersion(-1).forPath(zkPath, data);
+ }
+
+ public static Object getZkData(CuratorFramework curatorClient, String parentNode, String childNode) throws Exception {
+ String zkPath = getZkPath(parentNode, childNode);
+ return getZkData(curatorClient, zkPath);
+ }
+
+ public static Object getZkData(CuratorFramework curatorClient, String zkPath) throws Exception {
+ byte[] data = curatorClient.getData().forPath(zkPath);
+ return HelixUtil.getObject(data);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
index 49a8177..2d77210 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/Authentication.java
@@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Authentication {
+import java.io.Serializable;
+
+public class Authentication implements Serializable {
private final static Logger logger = LoggerFactory.getLogger(Authentication.class);
protected String userName;
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
new file mode 100644
index 0000000..b398248
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHApiException.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+/**
+ * An exception class to wrap SSH command execution related errors.
+ */
+public class SSHApiException extends Exception {
+
+ public SSHApiException(String message) {
+ super(message);
+ }
+
+ public SSHApiException(String message, Exception e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
new file mode 100644
index 0000000..89e64c4
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHCommandOutputReader.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+import com.jcraft.jsch.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSHCommandOutputReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(SSHCommandOutputReader.class);
+ String stdOutputString = null;
+ String stdErrorString = null;
+ ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+ private int exitCode;
+
+ public void onOutput(Channel channel) {
+ try {
+ this.setStdOutputString(getOutputStream(channel, channel.getInputStream()));
+ this.setStdErrorString(new String(errorStream.toByteArray(), "UTF-8"));
+ this.exitCode = channel.getExitStatus();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ }
+
+ private String getOutputStream(Channel channel, InputStream inputStream) throws IOException {
+ StringBuilder output = new StringBuilder("");
+ byte[] tmp = new byte[1024];
+ do {
+ while (inputStream.available() > 0) {
+ int i = inputStream.read(tmp, 0, 1024);
+ if (i < 0) break;
+ output.append(new String(tmp, 0, i));
+ }
+ } while (!channel.isClosed()) ;
+ return output.toString();
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public String getStdOutputString() {
+ return stdOutputString;
+ }
+
+ public void setStdOutputString(String stdOutputString) {
+ this.stdOutputString = stdOutputString;
+ }
+
+ public String getStdErrorString() {
+ return stdErrorString;
+ }
+
+ public void setStdErrorString(String stdErrorString) {
+ this.stdErrorString = stdErrorString;
+ }
+
+ public ByteArrayOutputStream getErrorStream() {
+ return errorStream;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
index 0ddf61e..add975c 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHKeyAuthentication.java
@@ -21,7 +21,7 @@
package edu.iu.helix.airavata.tasks.ssh;
-public class SSHKeyAuthentication extends Authentication{
+public class SSHKeyAuthentication extends Authentication {
private byte[] privateKey;
private byte[] publicKey;
private String passphrase;
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
new file mode 100644
index 0000000..b2486ef
--- /dev/null
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHRunner.java
@@ -0,0 +1,394 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package edu.iu.helix.airavata.tasks.ssh;
+
+import com.jcraft.jsch.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+public class SSHRunner {
+ private final static Logger log = LoggerFactory.getLogger(SSHRunner.class);
+
+ public Session createSSHSession(SSHServerInfo serverInfo, SSHKeyAuthentication authentication) throws JSchException {
+ JSch jSch = new JSch();
+ jSch.addIdentity(UUID.randomUUID().toString(), authentication.getPrivateKey(), authentication.getPublicKey(),
+ authentication.getPassphrase().getBytes());
+ Session session = jSch.getSession(serverInfo.getUserName(), serverInfo.getHost(),
+ serverInfo.getSshPort());
+ session.setUserInfo(new SSHUserInfo(serverInfo.getUserName(), null, authentication.getPassphrase()));
+ if (authentication.getStrictHostKeyChecking().equals("yes")) {
+ jSch.setKnownHosts(authentication.getKnownHostsFilePath());
+ } else {
+ session.setConfig("StrictHostKeyChecking", "no");
+ }
+ session.connect();
+
+ return session;
+ }
+
+ public String scpTo(String routingKey, String localFile, String remoteFile, SSHServerInfo serverInfo,
+ SSHKeyAuthentication authentication) throws IOException, JSchException, SSHApiException {
+
+ Session session = createSSHSession(serverInfo, authentication);
+
+ FileInputStream fis = null;
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+ Channel channel = session.openChannel("exec");
+
+ SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ File _lfile = new File(localFile);
+
+ if (ptimestamp) {
+ command = "T" + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (localFile.lastIndexOf('/') > 0) {
+ command += localFile.substring(localFile.lastIndexOf('/') + 1);
+ } else {
+ command += localFile;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send a content of localFile
+ fis = new FileInputStream(localFile);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ out.close();
+ stdOutReader.onOutput(channel);
+
+
+ channel.disconnect();
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ session.disconnect();
+
+ //since remote file is always a file we just return the file
+ return remoteFile;
+ }
+
+ /**
+ * This method will copy a remote file to a local directory
+ *
+ * @param remoteFile remote file path, this has to be a full qualified path
+ * @param localFile This is the local file to copy, this can be a directory too
+ * @return returns the final local file path of the new file came from the remote resource
+ */
+ public void scpFrom(String routingKey, String remoteFile, String localFile, SSHServerInfo serverInfo,
+ SSHKeyAuthentication authentication) throws IOException,
+ JSchException, SSHApiException {
+ Session session = createSSHSession(serverInfo, authentication);
+ FileOutputStream fos = null;
+ try {
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+
+ SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+ // exec 'scp -f remotefile' remotely
+ String command = "scp -f " + remoteFile;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ if (!channel.isClosed()){
+ channel.connect();
+ }
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ //System.out.println("filesize="+filesize+", file="+file);
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+ int foo;
+ while (true) {
+ if (buf.length < filesize) foo = buf.length;
+ else foo = (int) filesize;
+ foo = in.read(buf, 0, foo);
+ if (foo < 0) {
+ // error
+ break;
+ }
+ fos.write(buf, 0, foo);
+ filesize -= foo;
+ if (filesize == 0L) break;
+ }
+ fos.close();
+ fos = null;
+
+ if (checkAck(in) != 0) {
+ String error = "Error transfering the file content";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ try {
+ if (fos != null) fos.close();
+ session.disconnect();
+ } catch (Exception ee) {
+ }
+ }
+ }
+
+ public void makeDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication)
+ throws IOException, JSchException, Exception {
+
+ Session session = createSSHSession(serverInfo, authentication);
+
+ // exec 'scp -t rfile' remotely
+ String command = "mkdir -p " + path;
+ Channel channel = session.openChannel("exec");
+ SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+ log.error("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName());
+ throw e;
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+ throw new Exception(stdOutReader.getStdErrorString());
+ }
+
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ public List<String> listDirectory(String routingKey, String path, SSHServerInfo serverInfo, SSHKeyAuthentication authentication)
+ throws IOException, JSchException, Exception {
+
+ Session session = createSSHSession(serverInfo, authentication);
+
+ // exec 'scp -t rfile' remotely
+ String command = "ls " + path;
+ Channel channel = session.openChannel("exec");
+ SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+
+ throw new Exception("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ if (stdOutReader.getStdErrorString().contains("ls:")) {
+ throw new Exception(stdOutReader.getStdErrorString());
+ }
+ channel.disconnect();
+ session.disconnect();
+ return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+ }
+
+ public SSHCommandOutputReader executeCommand(String routingKey, String command, SSHServerInfo serverInfo,
+ SSHKeyAuthentication authentication) throws Exception {
+
+ Session session = createSSHSession(serverInfo, authentication);
+
+ Map<String, String> results = new HashMap<>();
+
+ Channel channel = session.openChannel("exec");
+ SSHCommandOutputReader stdOutReader = new SSHCommandOutputReader();
+
+ ((ChannelExec) channel).setCommand(command);
+ ((ChannelExec) channel).setErrStream(stdOutReader.getErrorStream());
+
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+
+ throw new Exception("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ stdOutReader.onOutput(channel);
+ session.disconnect();
+
+ return stdOutReader;
+ }
+
+ public SSHCommandOutputReader executeCommand(String routingKey, String[] commands, SSHServerInfo serverInfo,
+ SSHKeyAuthentication authentication) throws Exception {
+ return executeCommand(routingKey, String.join(" && ", commands), serverInfo, authentication);
+ }
+
+ private int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ System.out.print(sb.toString());
+ }
+ if (b == 2) { // fatal error
+ System.out.print(sb.toString());
+ }
+ log.warn(sb.toString());
+ }
+ return b;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
index 4e72ade..0ea53a7 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTask.java
@@ -20,19 +20,26 @@
*/
package edu.iu.helix.airavata.tasks.ssh;
+import edu.iu.helix.airavata.HelixUtil;
+import edu.iu.helix.airavata.ZkUtils;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.UUID;
+
public class SSHTask implements Task {
private final static Logger logger = LoggerFactory.getLogger(SSHTask.class);
+ public static final String TASK_COMMAND = "SSH_TASK";
private TaskCallbackContext callbackContext;
+ private String taskId;
public SSHTask(TaskCallbackContext callbackContext){
this.callbackContext = callbackContext;
+ this.taskId = callbackContext.getTaskConfig().getId();
}
@Override
@@ -46,7 +53,36 @@ public class SSHTask implements Task {
// Todo Deserialize the TaskContext from data store.
// byte[] output = curator.getData().forPath(path);
// List<String> newList = (List<String>)SerializationUtils.deserialize(output);
- return null;
+
+ System.out.println("Running SSH Task for ID: " + taskId);
+ try {
+ SSHTaskContext taskContext = (SSHTaskContext) ZkUtils.getZkData(ZkUtils.getCuratorClient(), HelixUtil.SSH_WORKFLOW, taskId);
+ String routingKey = UUID.randomUUID().toString();
+ SSHRunner sshExecutor = new SSHRunner();
+
+ System.out.println("Task: " + taskId + ", is of Type: " + taskContext.getTask_type());
+ switch (taskContext.getTask_type()) {
+
+ case EXECUTE_COMMAND:
+ SSHCommandOutputReader sshOut = sshExecutor.executeCommand(routingKey, taskContext.getCommand(),
+ (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication());
+ System.out.println("SSH Command Output: " + sshOut.getStdOutputString());
+ break;
+
+ case FILE_COPY:
+ String scpOut = sshExecutor.scpTo(routingKey, taskContext.getSourceFilePath(), taskContext.getDestFilePath(),
+ (SSHServerInfo) taskContext.getServerInfo(), taskContext.getSshKeyAuthentication());
+ System.out.println("SCP Command Output: " + scpOut);
+ break;
+
+ default:
+ throw new Exception("Unknown SSH Task Type: " + taskContext.getTask_type());
+ }
+ } catch (Exception ex) {
+ System.err.println("Something went wrong for task: " + taskId + ", reason: " + ex);
+ return new TaskResult(TaskResult.Status.FAILED, "SSH command completed!");
+ }
+ return new TaskResult(TaskResult.Status.COMPLETED, "SSH command completed!");
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
index 2b2024c..e780331 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHTaskContext.java
@@ -23,7 +23,9 @@ package edu.iu.helix.airavata.tasks.ssh;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SSHTaskContext {
+import java.io.Serializable;
+
+public class SSHTaskContext implements Serializable {
private final static Logger logger = LoggerFactory.getLogger(SSHTaskContext.class);
public static enum TASK_TYPE {FILE_COPY, EXECUTE_COMMAND;}
@@ -127,4 +129,17 @@ public class SSHTaskContext {
public void setCommand(String command) {
this.command = command;
}
+
+ @Override
+ public String toString() {
+ return "SSHTaskContext{" +
+ "task_type=" + task_type +
+ ", sshKeyAuthentication=" + sshKeyAuthentication +
+ ", sshUserInfo=" + sshUserInfo +
+ ", serverInfo=" + serverInfo +
+ ", sourceFilePath='" + sourceFilePath + '\'' +
+ ", destFilePath='" + destFilePath + '\'' +
+ ", command='" + command + '\'' +
+ '}';
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
index a0ac81c..df5a575 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/SSHUserInfo.java
@@ -22,7 +22,9 @@ package edu.iu.helix.airavata.tasks.ssh;
import com.jcraft.jsch.UserInfo;
-public class SSHUserInfo implements UserInfo {
+import java.io.Serializable;
+
+public class SSHUserInfo implements UserInfo, Serializable {
private String userName;
private String password;
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
index f47fc5b..4917e47 100644
--- a/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
+++ b/helix-playground/src/main/java/edu/iu/helix/airavata/tasks/ssh/ServerInfo.java
@@ -20,7 +20,9 @@
*/
package edu.iu.helix.airavata.tasks.ssh;
-public class ServerInfo {
+import java.io.Serializable;
+
+public class ServerInfo implements Serializable {
public static enum ComProtocol {SSH, LOCAL}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/2e56d5ce/helix-playground/src/main/resources/ssh/id_rsa.pub
----------------------------------------------------------------------
diff --git a/helix-playground/src/main/resources/ssh/id_rsa.pub b/helix-playground/src/main/resources/ssh/id_rsa.pub
index c3ac0b0..e378be0 100644
--- a/helix-playground/src/main/resources/ssh/id_rsa.pub
+++ b/helix-playground/src/main/resources/ssh/id_rsa.pub
@@ -1,2 +1 @@
-
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCr9VP22p0I+2W5o/klPv/OvfTihvcBQwZXKPrSLFF+OB9nVNtIfDDETIwwex7mknn3Kks1jFvEdKMrvRjOFeFInDv3N40LjohHu4v2tiawAON7MOLpz/iX5dWp0wteixlDKfGe7PAEMAk054kLSDiB3em2zBK4d9ApedA5k2JG1dmAsNK0KkbfgFPd5+iXrzgTg4XiefHQoaCSUyS7w6t8645djbYOP+b+SJtgslaf2RqeoBVvrA6YQJE1pUYjcm9yL4KwyqaPo+N/2XZ6xys5+WN8svtL3uRduENU1MQSTpdFq+GLCY4SgLMFgLJKoxHjcjPRfKyE/eYk1gQA7b/Z snakanda@149-161-141-51.dhcp-bl.indiana.edu