You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2016/05/04 10:04:22 UTC
[2/6] incubator-taverna-common-activities git commit:
SshToolInvocation
SshToolInvocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/commit/7d28decd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/tree/7d28decd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/diff/7d28decd
Branch: refs/heads/master
Commit: 7d28decd969c7d0bd3da6bd9825dde03b735ea7b
Parents: 3c48b20
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Wed May 4 01:06:14 2016 +0100
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Wed May 4 01:06:14 2016 +0100
----------------------------------------------------------------------
.../ssh/ExternalToolSshInvocationMechanism.java | 2 +-
.../externaltool/ssh/SshInvocationCreator.java | 6 +-
.../ssh/SshInvocationPersister.java | 6 +-
.../externaltool/ssh/SshMechanismCreator.java | 2 +-
.../externaltool/ssh/SshToolInvocation.java | 560 +++++++++++++++++++
.../externaltool/ssh/SshUseCaseInvocation.java | 560 -------------------
6 files changed, 568 insertions(+), 568 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java
index 94cc466..b92c62b 100644
--- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java
+++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/ExternalToolSshInvocationMechanism.java
@@ -39,7 +39,7 @@ public class ExternalToolSshInvocationMechanism extends InvocationMechanism {
*/
@Override
public String getType() {
- return SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE;
+ return SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java
index f0b1e83..8f1950e 100644
--- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java
+++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationCreator.java
@@ -54,16 +54,16 @@ public final class SshInvocationCreator implements InvocationCreator {
@Override
public boolean canHandle(String mechanismType) {
- return mechanismType.equals(SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE);
+ return mechanismType.equals(SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE);
}
@Override
public ToolInvocation convert(InvocationMechanism m, UseCaseDescription description, Map<String, T2Reference> data, ReferenceService referenceService) {
ExternalToolSshInvocationMechanism mechanism = (ExternalToolSshInvocationMechanism) m;
- SshUseCaseInvocation result = null;
+ SshToolInvocation result = null;
try {
SshNode chosenNode = chooseNode(mechanism.getNodes(), data, referenceService);
- result = new SshUseCaseInvocation(description, chosenNode, new RetrieveLoginFromTaverna(new SshUrl(chosenNode).toString(), credentialManager), credentialManager);
+ result = new SshToolInvocation(description, chosenNode, new RetrieveLoginFromTaverna(new SshUrl(chosenNode).toString(), credentialManager), credentialManager);
} catch (JSchException e) {
logger.error("Null invocation", e);
} catch (SftpException e) {
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java
index bad5c2e..93bb3e9 100644
--- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java
+++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshInvocationPersister.java
@@ -62,7 +62,7 @@ public class SshInvocationPersister extends InvocationPersister {
*/
@Override
public void load(File directory) {
- SshUseCaseInvocation.load(directory);
+ SshToolInvocation.load(directory);
}
/* (non-Javadoc)
@@ -70,13 +70,13 @@ public class SshInvocationPersister extends InvocationPersister {
*/
@Override
public void persist(File directory) {
- SshUseCaseInvocation.persist(directory);
+ SshToolInvocation.persist(directory);
}
@Override
public void deleteRun(String runId) {
try {
- SshUseCaseInvocation.cleanup(runId, credentialManager);
+ SshToolInvocation.cleanup(runId, credentialManager);
} catch (InvocationException e) {
logger.error(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java
index 097f8db..c064947 100644
--- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java
+++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshMechanismCreator.java
@@ -41,7 +41,7 @@ public class SshMechanismCreator extends MechanismCreator {
*/
@Override
public boolean canHandle(String mechanismType) {
- return mechanismType.equals(SshUseCaseInvocation.SSH_USE_CASE_INVOCATION_TYPE);
+ return mechanismType.equals(SshToolInvocation.SSH_USE_CASE_INVOCATION_TYPE);
}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java
new file mode 100755
index 0000000..edf9698
--- /dev/null
+++ b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshToolInvocation.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.taverna.activities.externaltool.ssh;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.Vector;
+import java.util.regex.Matcher;
+
+import org.apache.taverna.activities.externaltool.RetrieveLoginFromTaverna;
+import org.apache.taverna.activities.externaltool.desc.ScriptInput;
+import org.apache.taverna.activities.externaltool.desc.ScriptOutput;
+import org.apache.taverna.activities.externaltool.desc.UseCaseDescription;
+import org.apache.taverna.activities.externaltool.invocation.AskUserForPw;
+import org.apache.taverna.activities.externaltool.invocation.InvocationException;
+import org.apache.taverna.activities.externaltool.invocation.ToolInvocation;
+import org.apache.taverna.reference.AbstractExternalReference;
+import org.apache.taverna.reference.ErrorDocument;
+import org.apache.taverna.reference.ErrorDocumentServiceException;
+import org.apache.taverna.reference.ExternalReferenceSPI;
+import org.apache.taverna.reference.Identified;
+import org.apache.taverna.reference.ReferenceService;
+import org.apache.taverna.reference.ReferenceSet;
+import org.apache.taverna.reference.ReferencedDataNature;
+import org.apache.taverna.reference.T2Reference;
+import org.apache.taverna.security.credentialmanager.CredentialManager;
+
+import org.apache.log4j.Logger;
+
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+import com.jcraft.jsch.ChannelSftp.LsEntry;
+
+/**
+ * The job is executed by connecting to a worker pc using ssh, i.e. not via the
+ * grid.
+ *
+ * @author Hajo Krabbenhoeft
+ */
+public class SshToolInvocation extends ToolInvocation {
+
+ private static Logger logger = Logger.getLogger(SshToolInvocation.class);
+
+ private SshUrl location = null;
+
+ private InputStream stdInputStream = null;
+
+ public static final String SSH_USE_CASE_INVOCATION_TYPE = "D0A4CDEB-DD10-4A8E-A49C-8871003083D8";
+ private String tmpname;
+ private final SshNode workerNode;
+ private final AskUserForPw askUserForPw;
+
+ private ChannelExec running;
+
+ private List<String> precedingCommands = new ArrayList<String>();
+
+ private final ByteArrayOutputStream stdout_buf = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream stderr_buf = new ByteArrayOutputStream();
+
+ private static Map<String, Object> nodeLock = Collections
+ .synchronizedMap(new HashMap<String, Object>());
+
+ private static Map<String, Set<SshUrl>> runIdToTempDir = Collections
+ .synchronizedMap(new HashMap<String, Set<SshUrl>>());
+
+ private static String SSH_INVOCATION_FILE = "sshInvocations";
+
+ private final CredentialManager credentialManager;
+
+ public static String test(final SshNode workerNode,
+ final AskUserForPw askUserForPw) {
+ try {
+ Session sshSession = SshPool
+ .getSshSession(workerNode, askUserForPw);
+
+ ChannelSftp sftpTest = (ChannelSftp) sshSession.openChannel("sftp");
+ sftpTest.connect();
+ sftpTest.cd(workerNode.getDirectory());
+ sftpTest.disconnect();
+ sshSession.disconnect();
+ } catch (JSchException e) {
+ return e.toString();
+ } catch (SftpException e) {
+ return e.toString();
+ }
+ return null;
+ }
+
+ public SshToolInvocation(UseCaseDescription desc, SshNode workerNodeA,
+ AskUserForPw askUserForPwA, CredentialManager credentialManager)
+ throws JSchException, SftpException {
+ this.workerNode = workerNodeA;
+ this.credentialManager = credentialManager;
+
+ setRetrieveData(workerNodeA.isRetrieveData());
+ this.askUserForPw = askUserForPwA;
+ usecase = desc;
+
+ ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, askUserForPw);
+ synchronized (getNodeLock(workerNode)) {
+
+ logger.info("Changing remote directory to "
+ + workerNode.getDirectory());
+ sftp.cd(workerNode.getDirectory());
+ Random rnd = new Random();
+ while (true) {
+ tmpname = "usecase" + rnd.nextLong();
+ try {
+ sftp.lstat(workerNode.getDirectory() + tmpname);
+ continue;
+ } catch (Exception e) {
+ // file seems to not exist :)
+ }
+ sftp.mkdir(workerNode.getDirectory() + tmpname);
+ sftp.cd(workerNode.getDirectory() + tmpname);
+ break;
+ }
+ }
+ }
+
+ private static void recursiveDelete(ChannelSftp sftp, String path)
+ throws SftpException, JSchException {
+ Vector<?> entries = sftp.ls(path);
+ for (Object object : entries) {
+ LsEntry entry = (LsEntry) object;
+ if (entry.getFilename().equals(".")
+ || entry.getFilename().equals("..")) {
+ continue;
+ }
+ if (entry.getAttrs().isDir()) {
+ recursiveDelete(sftp, path + entry.getFilename() + "/");
+ } else {
+ sftp.rm(path + entry.getFilename());
+ }
+ }
+ sftp.rmdir(path);
+ }
+
+ private static void deleteDirectory(SshUrl directory,
+ CredentialManager credentialManager) throws InvocationException {
+ URI uri;
+ try {
+ uri = new URI(directory.toString());
+
+ ChannelSftp sftp;
+ SshNode workerNode;
+ String fullPath = uri.getPath();
+ String path = fullPath.substring(0, fullPath.lastIndexOf("/"));
+ String tempDir = fullPath.substring(fullPath.lastIndexOf("/"));
+ try {
+ workerNode = SshNodeFactory.getInstance().getSshNode(
+ uri.getHost(), uri.getPort(), path);
+
+ sftp = SshPool.getSftpPutChannel(workerNode,
+ new RetrieveLoginFromTaverna(workerNode.getUrl()
+ .toString(), credentialManager));
+ } catch (JSchException e) {
+ throw new InvocationException(e);
+ }
+ synchronized (getNodeLock(workerNode)) {
+ try {
+ sftp.cd(path);
+ recursiveDelete(sftp, path + "/" + tempDir + "/");
+ } catch (SftpException e) {
+ throw new InvocationException(e);
+ } catch (JSchException e) {
+ throw new InvocationException(e);
+ }
+ }
+ } catch (URISyntaxException e1) {
+ throw new InvocationException(e1);
+ }
+ }
+
+ public static void cleanup(String runId, CredentialManager credentialManager)
+ throws InvocationException {
+ Set<SshUrl> tempDirectories = runIdToTempDir.get(runId);
+ if (tempDirectories != null) {
+ for (SshUrl tempUrl : tempDirectories) {
+ deleteDirectory(tempUrl, credentialManager);
+ }
+ runIdToTempDir.remove(runId);
+ }
+ }
+
+ @Override
+ protected void submit_generate_job_inner() throws InvocationException {
+ tags.put("uniqueID", "" + getSubmissionID());
+ String command = usecase.getCommand();
+ for (String cur : tags.keySet()) {
+ command = command.replaceAll("\\Q%%" + cur + "%%\\E",
+ Matcher.quoteReplacement(tags.get(cur)));
+ }
+ String fullCommand = "cd " + workerNode.getDirectory() + tmpname;
+ for (String preceding : precedingCommands) {
+ fullCommand += " && " + preceding;
+ }
+ fullCommand += " && " + command;
+
+ logger.info("Full command is " + fullCommand);
+
+ try {
+ running = SshPool.openExecChannel(workerNode, askUserForPw);
+ running.setCommand(fullCommand);
+ running.setOutputStream(stdout_buf);
+ running.setErrStream(stderr_buf);
+ if (stdInputStream != null) {
+ running.setInputStream(stdInputStream);
+ }
+ running.connect();
+ } catch (JSchException e) {
+ throw new InvocationException(e);
+ }
+
+ }
+
+ @Override
+ public HashMap<String, Object> submit_wait_fetch_results(
+ ReferenceService referenceService) throws InvocationException {
+ while (!running.isClosed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new InvocationException("Invocation interrupted:"
+ + e.getMessage());
+ }
+ }
+
+ int exitcode = running.getExitStatus();
+ if (!usecase.getValidReturnCodes().contains(exitcode)) {
+ try {
+ throw new InvocationException("Invalid exit code " + exitcode
+ + ":" + stderr_buf.toString("US-ASCII"));
+ } catch (UnsupportedEncodingException e) {
+ throw new InvocationException("Invalid exit code " + exitcode
+ + ":" + stderr_buf.toString());
+ }
+ }
+
+ HashMap<String, Object> results = new HashMap<String, Object>();
+
+ results.put("STDOUT", stdout_buf.toByteArray());
+ results.put("STDERR", stderr_buf.toByteArray());
+ try {
+ stdout_buf.close();
+ stderr_buf.close();
+ } catch (IOException e2) {
+ throw new InvocationException(e2);
+ }
+
+ try {
+ ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode,
+ askUserForPw);
+ synchronized (getNodeLock(workerNode)) {
+ for (Map.Entry<String, ScriptOutput> cur : usecase.getOutputs()
+ .entrySet()) {
+ ScriptOutput scriptOutput = cur.getValue();
+ String fullPath = workerNode.getDirectory() + tmpname + "/"
+ + scriptOutput.getPath();
+ try {
+ if (sftp.stat(fullPath) != null) {
+ SshUrl url = new SshUrl(workerNode);
+ url.setSubDirectory(tmpname);
+ url.setFileName(scriptOutput.getPath());
+ if (scriptOutput.isBinary()) {
+ url.setDataNature(ReferencedDataNature.BINARY);
+ } else {
+ url.setDataNature(ReferencedDataNature.TEXT);
+ url.setCharset("UTF-8");
+ }
+ if (isRetrieveData()) {
+ SshReference urlRef = new SshReference(url);
+ InputStream is = urlRef.openStream(null);
+ AbstractExternalReference ref;
+ if (scriptOutput.isBinary()) {
+ ref = inlineByteArrayReferenceBuilder
+ .createReference(is, null);
+ } else {
+ ref = inlineStringReferenceBuilder
+ .createReference(is, null);
+ }
+ try {
+ is.close();
+ } catch (IOException e) {
+ throw new InvocationException(e);
+ }
+ results.put(cur.getKey(), ref);
+ } else {
+ results.put(cur.getKey(), url);
+ }
+ } else {
+ ErrorDocument ed = referenceService
+ .getErrorDocumentService().registerError(
+ "No result for " + cur.getKey(), 0,
+ getContext());
+ results.put(cur.getKey(), ed);
+ }
+ } catch (SftpException e) {
+ ErrorDocument ed = referenceService
+ .getErrorDocumentService().registerError(
+ "No result for " + cur.getKey(), 0,
+ getContext());
+ results.put(cur.getKey(), ed);
+
+ }
+ }
+ }
+ } catch (JSchException e1) {
+ throw new InvocationException(e1);
+ } catch (ErrorDocumentServiceException e) {
+ throw new InvocationException(e);
+ }
+
+ if (running != null) {
+ running.disconnect();
+ }
+ if (stdInputStream != null) {
+ try {
+ stdInputStream.close();
+ } catch (IOException e) {
+ throw new InvocationException(e);
+ }
+ }
+
+ if (isRetrieveData()) {
+ forgetRun();
+ deleteDirectory(location, credentialManager);
+
+ }
+ return results;
+ }
+
+ @Override
+ public String setOneInput(ReferenceService referenceService,
+ T2Reference t2Reference, ScriptInput input)
+ throws InvocationException {
+ String target = null;
+ String remoteName = null;
+ if (input.isFile()) {
+ remoteName = input.getTag();
+ } else if (input.isTempFile()) {
+ remoteName = "tempfile." + (nTempFiles++) + ".tmp";
+
+ }
+ if (input.isFile() || input.isTempFile()) {
+ SshReference sshRef = getAsSshReference(referenceService,
+ t2Reference, workerNode);
+ target = workerNode.getDirectory() + tmpname + "/" + remoteName;
+ logger.info("Target is " + target);
+ if (sshRef != null) {
+ if (!input.isForceCopy()) {
+ String linkCommand = workerNode.getLinkCommand();
+ if (linkCommand != null) {
+ String actualLinkCommand = getActualOsCommand(
+ linkCommand, sshRef.getFullPath(), remoteName,
+ target);
+ precedingCommands.add(actualLinkCommand);
+ return target;
+
+ }
+ }
+ String copyCommand = workerNode.getCopyCommand();
+ if (copyCommand != null) {
+ String actualCopyCommand = getActualOsCommand(copyCommand,
+ sshRef.getFullPath(), remoteName, target);
+ precedingCommands.add(actualCopyCommand);
+ return target;
+ }
+ }
+ try {
+ ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode,
+ askUserForPw);
+ synchronized (getNodeLock(workerNode)) {
+ InputStream r = getAsStream(referenceService, t2Reference);
+ sftp.put(r, target);
+ r.close();
+ }
+ } catch (SftpException e) {
+ throw new InvocationException(e);
+ } catch (JSchException e) {
+ throw new InvocationException(e);
+ } catch (IOException e) {
+ throw new InvocationException(e);
+ }
+ return target;
+ } else {
+ String value = (String) referenceService.renderIdentifier(
+ t2Reference, String.class, this.getContext());
+ return value;
+
+ }
+ }
+
+ public SshReference getAsSshReference(ReferenceService referenceService,
+ T2Reference t2Reference, SshNode workerNode) {
+ Identified identified = referenceService.resolveIdentifier(t2Reference,
+ null, null);
+ if (identified instanceof ReferenceSet) {
+ for (ExternalReferenceSPI ref : ((ReferenceSet) identified)
+ .getExternalReferences()) {
+ if (ref instanceof SshReference) {
+ SshReference sshRef = (SshReference) ref;
+ if (sshRef.getHost().equals(workerNode.getHost())) {
+ return sshRef;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private static Object getNodeLock(final SshNode node) {
+ return getNodeLock(node.getHost());
+ }
+
+ private static synchronized Object getNodeLock(String hostName) {
+ if (!nodeLock.containsKey(hostName)) {
+ nodeLock.put(hostName, new Object());
+ }
+ return nodeLock.get(hostName);
+ }
+
+ @Override
+ public void setStdIn(ReferenceService referenceService,
+ T2Reference t2Reference) {
+ stdInputStream = new BufferedInputStream(getAsStream(referenceService,
+ t2Reference));
+ }
+
+ @Override
+ public void rememberRun(String runId) {
+ this.setRunId(runId);
+ Set<SshUrl> directories = runIdToTempDir.get(runId);
+ if (directories == null) {
+ directories = Collections.synchronizedSet(new HashSet<SshUrl>());
+ runIdToTempDir.put(runId, directories);
+ }
+ location = new SshUrl(workerNode);
+ location.setSubDirectory(tmpname);
+ directories.add(location);
+ }
+
+ private void forgetRun() {
+ Set<SshUrl> directories = runIdToTempDir.get(getRunId());
+ directories.remove(location);
+ }
+
+ public static void load(File directory) {
+ File invocationsFile = new File(directory, SSH_INVOCATION_FILE);
+ if (!invocationsFile.exists()) {
+ return;
+ }
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(invocationsFile));
+ String line = reader.readLine();
+ while (line != null) {
+ String[] parts = line.split(" ");
+ if (parts.length != 2) {
+ break;
+ }
+ String runId = parts[0];
+ String urlString = parts[1];
+ Set<SshUrl> urls = runIdToTempDir.get(runId);
+ if (urls == null) {
+ urls = new HashSet<SshUrl>();
+ runIdToTempDir.put(runId, urls);
+ }
+ URI uri = new URI(urlString);
+ String fullPath = uri.getPath();
+ String path = fullPath.substring(0, fullPath.lastIndexOf("/"));
+ String tempDir = fullPath.substring(fullPath.lastIndexOf("/"));
+ SshNode node = SshNodeFactory.getInstance().getSshNode(
+ uri.getHost(), uri.getPort(), path);
+ SshUrl newUrl = new SshUrl(node);
+ newUrl.setSubDirectory(tempDir);
+ urls.add(newUrl);
+ line = reader.readLine();
+ }
+ } catch (FileNotFoundException e) {
+ logger.error(e);
+ } catch (URISyntaxException e) {
+ logger.error(e);
+ } catch (IOException e) {
+ logger.error(e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+
+ public static void persist(File directory) {
+ File invocationsFile = new File(directory, SSH_INVOCATION_FILE);
+ BufferedWriter writer = null;
+ try {
+ writer = new BufferedWriter(new FileWriter(invocationsFile));
+ for (String runId : runIdToTempDir.keySet()) {
+ for (SshUrl url : runIdToTempDir.get(runId)) {
+ writer.write(runId);
+ writer.write(" ");
+ writer.write(url.toString());
+ writer.newLine();
+ }
+ }
+ } catch (IOException e) {
+ logger.error(e);
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ logger.error(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-common-activities/blob/7d28decd/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java
----------------------------------------------------------------------
diff --git a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java b/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java
deleted file mode 100755
index 06300a8..0000000
--- a/taverna-external-tool-activity/src/main/java/org/apache/taverna/activities/externaltool/ssh/SshUseCaseInvocation.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.taverna.activities.externaltool.ssh;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.Vector;
-import java.util.regex.Matcher;
-
-import org.apache.taverna.activities.externaltool.RetrieveLoginFromTaverna;
-import org.apache.taverna.activities.externaltool.desc.ScriptInput;
-import org.apache.taverna.activities.externaltool.desc.ScriptOutput;
-import org.apache.taverna.activities.externaltool.desc.UseCaseDescription;
-import org.apache.taverna.activities.externaltool.invocation.AskUserForPw;
-import org.apache.taverna.activities.externaltool.invocation.InvocationException;
-import org.apache.taverna.activities.externaltool.invocation.ToolInvocation;
-import org.apache.taverna.reference.AbstractExternalReference;
-import org.apache.taverna.reference.ErrorDocument;
-import org.apache.taverna.reference.ErrorDocumentServiceException;
-import org.apache.taverna.reference.ExternalReferenceSPI;
-import org.apache.taverna.reference.Identified;
-import org.apache.taverna.reference.ReferenceService;
-import org.apache.taverna.reference.ReferenceSet;
-import org.apache.taverna.reference.ReferencedDataNature;
-import org.apache.taverna.reference.T2Reference;
-import org.apache.taverna.security.credentialmanager.CredentialManager;
-
-import org.apache.log4j.Logger;
-
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-import com.jcraft.jsch.ChannelSftp.LsEntry;
-
-/**
- * The job is executed by connecting to a worker pc using ssh, i.e. not via the
- * grid.
- *
- * @author Hajo Krabbenhoeft
- */
-public class SshUseCaseInvocation extends ToolInvocation {
-
- private static Logger logger = Logger.getLogger(SshUseCaseInvocation.class);
-
- private SshUrl location = null;
-
- private InputStream stdInputStream = null;
-
- public static final String SSH_USE_CASE_INVOCATION_TYPE = "D0A4CDEB-DD10-4A8E-A49C-8871003083D8";
- private String tmpname;
- private final SshNode workerNode;
- private final AskUserForPw askUserForPw;
-
- private ChannelExec running;
-
- private List<String> precedingCommands = new ArrayList<String>();
-
- private final ByteArrayOutputStream stdout_buf = new ByteArrayOutputStream();
- private final ByteArrayOutputStream stderr_buf = new ByteArrayOutputStream();
-
- private static Map<String, Object> nodeLock = Collections
- .synchronizedMap(new HashMap<String, Object>());
-
- private static Map<String, Set<SshUrl>> runIdToTempDir = Collections
- .synchronizedMap(new HashMap<String, Set<SshUrl>>());
-
- private static String SSH_INVOCATION_FILE = "sshInvocations";
-
- private final CredentialManager credentialManager;
-
- public static String test(final SshNode workerNode,
- final AskUserForPw askUserForPw) {
- try {
- Session sshSession = SshPool
- .getSshSession(workerNode, askUserForPw);
-
- ChannelSftp sftpTest = (ChannelSftp) sshSession.openChannel("sftp");
- sftpTest.connect();
- sftpTest.cd(workerNode.getDirectory());
- sftpTest.disconnect();
- sshSession.disconnect();
- } catch (JSchException e) {
- return e.toString();
- } catch (SftpException e) {
- return e.toString();
- }
- return null;
- }
-
- public SshUseCaseInvocation(UseCaseDescription desc, SshNode workerNodeA,
- AskUserForPw askUserForPwA, CredentialManager credentialManager)
- throws JSchException, SftpException {
- this.workerNode = workerNodeA;
- this.credentialManager = credentialManager;
-
- setRetrieveData(workerNodeA.isRetrieveData());
- this.askUserForPw = askUserForPwA;
- usecase = desc;
-
- ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode, askUserForPw);
- synchronized (getNodeLock(workerNode)) {
-
- logger.info("Changing remote directory to "
- + workerNode.getDirectory());
- sftp.cd(workerNode.getDirectory());
- Random rnd = new Random();
- while (true) {
- tmpname = "usecase" + rnd.nextLong();
- try {
- sftp.lstat(workerNode.getDirectory() + tmpname);
- continue;
- } catch (Exception e) {
- // file seems to not exist :)
- }
- sftp.mkdir(workerNode.getDirectory() + tmpname);
- sftp.cd(workerNode.getDirectory() + tmpname);
- break;
- }
- }
- }
-
- private static void recursiveDelete(ChannelSftp sftp, String path)
- throws SftpException, JSchException {
- Vector<?> entries = sftp.ls(path);
- for (Object object : entries) {
- LsEntry entry = (LsEntry) object;
- if (entry.getFilename().equals(".")
- || entry.getFilename().equals("..")) {
- continue;
- }
- if (entry.getAttrs().isDir()) {
- recursiveDelete(sftp, path + entry.getFilename() + "/");
- } else {
- sftp.rm(path + entry.getFilename());
- }
- }
- sftp.rmdir(path);
- }
-
- private static void deleteDirectory(SshUrl directory,
- CredentialManager credentialManager) throws InvocationException {
- URI uri;
- try {
- uri = new URI(directory.toString());
-
- ChannelSftp sftp;
- SshNode workerNode;
- String fullPath = uri.getPath();
- String path = fullPath.substring(0, fullPath.lastIndexOf("/"));
- String tempDir = fullPath.substring(fullPath.lastIndexOf("/"));
- try {
- workerNode = SshNodeFactory.getInstance().getSshNode(
- uri.getHost(), uri.getPort(), path);
-
- sftp = SshPool.getSftpPutChannel(workerNode,
- new RetrieveLoginFromTaverna(workerNode.getUrl()
- .toString(), credentialManager));
- } catch (JSchException e) {
- throw new InvocationException(e);
- }
- synchronized (getNodeLock(workerNode)) {
- try {
- sftp.cd(path);
- recursiveDelete(sftp, path + "/" + tempDir + "/");
- } catch (SftpException e) {
- throw new InvocationException(e);
- } catch (JSchException e) {
- throw new InvocationException(e);
- }
- }
- } catch (URISyntaxException e1) {
- throw new InvocationException(e1);
- }
- }
-
- public static void cleanup(String runId, CredentialManager credentialManager)
- throws InvocationException {
- Set<SshUrl> tempDirectories = runIdToTempDir.get(runId);
- if (tempDirectories != null) {
- for (SshUrl tempUrl : tempDirectories) {
- deleteDirectory(tempUrl, credentialManager);
- }
- runIdToTempDir.remove(runId);
- }
- }
-
- @Override
- protected void submit_generate_job_inner() throws InvocationException {
- tags.put("uniqueID", "" + getSubmissionID());
- String command = usecase.getCommand();
- for (String cur : tags.keySet()) {
- command = command.replaceAll("\\Q%%" + cur + "%%\\E",
- Matcher.quoteReplacement(tags.get(cur)));
- }
- String fullCommand = "cd " + workerNode.getDirectory() + tmpname;
- for (String preceding : precedingCommands) {
- fullCommand += " && " + preceding;
- }
- fullCommand += " && " + command;
-
- logger.info("Full command is " + fullCommand);
-
- try {
- running = SshPool.openExecChannel(workerNode, askUserForPw);
- running.setCommand(fullCommand);
- running.setOutputStream(stdout_buf);
- running.setErrStream(stderr_buf);
- if (stdInputStream != null) {
- running.setInputStream(stdInputStream);
- }
- running.connect();
- } catch (JSchException e) {
- throw new InvocationException(e);
- }
-
- }
-
- @Override
- public HashMap<String, Object> submit_wait_fetch_results(
- ReferenceService referenceService) throws InvocationException {
- while (!running.isClosed()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new InvocationException("Invocation interrupted:"
- + e.getMessage());
- }
- }
-
- int exitcode = running.getExitStatus();
- if (!usecase.getValidReturnCodes().contains(exitcode)) {
- try {
- throw new InvocationException("Invalid exit code " + exitcode
- + ":" + stderr_buf.toString("US-ASCII"));
- } catch (UnsupportedEncodingException e) {
- throw new InvocationException("Invalid exit code " + exitcode
- + ":" + stderr_buf.toString());
- }
- }
-
- HashMap<String, Object> results = new HashMap<String, Object>();
-
- results.put("STDOUT", stdout_buf.toByteArray());
- results.put("STDERR", stderr_buf.toByteArray());
- try {
- stdout_buf.close();
- stderr_buf.close();
- } catch (IOException e2) {
- throw new InvocationException(e2);
- }
-
- try {
- ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode,
- askUserForPw);
- synchronized (getNodeLock(workerNode)) {
- for (Map.Entry<String, ScriptOutput> cur : usecase.getOutputs()
- .entrySet()) {
- ScriptOutput scriptOutput = cur.getValue();
- String fullPath = workerNode.getDirectory() + tmpname + "/"
- + scriptOutput.getPath();
- try {
- if (sftp.stat(fullPath) != null) {
- SshUrl url = new SshUrl(workerNode);
- url.setSubDirectory(tmpname);
- url.setFileName(scriptOutput.getPath());
- if (scriptOutput.isBinary()) {
- url.setDataNature(ReferencedDataNature.BINARY);
- } else {
- url.setDataNature(ReferencedDataNature.TEXT);
- url.setCharset("UTF-8");
- }
- if (isRetrieveData()) {
- SshReference urlRef = new SshReference(url);
- InputStream is = urlRef.openStream(null);
- AbstractExternalReference ref;
- if (scriptOutput.isBinary()) {
- ref = inlineByteArrayReferenceBuilder
- .createReference(is, null);
- } else {
- ref = inlineStringReferenceBuilder
- .createReference(is, null);
- }
- try {
- is.close();
- } catch (IOException e) {
- throw new InvocationException(e);
- }
- results.put(cur.getKey(), ref);
- } else {
- results.put(cur.getKey(), url);
- }
- } else {
- ErrorDocument ed = referenceService
- .getErrorDocumentService().registerError(
- "No result for " + cur.getKey(), 0,
- getContext());
- results.put(cur.getKey(), ed);
- }
- } catch (SftpException e) {
- ErrorDocument ed = referenceService
- .getErrorDocumentService().registerError(
- "No result for " + cur.getKey(), 0,
- getContext());
- results.put(cur.getKey(), ed);
-
- }
- }
- }
- } catch (JSchException e1) {
- throw new InvocationException(e1);
- } catch (ErrorDocumentServiceException e) {
- throw new InvocationException(e);
- }
-
- if (running != null) {
- running.disconnect();
- }
- if (stdInputStream != null) {
- try {
- stdInputStream.close();
- } catch (IOException e) {
- throw new InvocationException(e);
- }
- }
-
- if (isRetrieveData()) {
- forgetRun();
- deleteDirectory(location, credentialManager);
-
- }
- return results;
- }
-
- @Override
- public String setOneInput(ReferenceService referenceService,
- T2Reference t2Reference, ScriptInput input)
- throws InvocationException {
- String target = null;
- String remoteName = null;
- if (input.isFile()) {
- remoteName = input.getTag();
- } else if (input.isTempFile()) {
- remoteName = "tempfile." + (nTempFiles++) + ".tmp";
-
- }
- if (input.isFile() || input.isTempFile()) {
- SshReference sshRef = getAsSshReference(referenceService,
- t2Reference, workerNode);
- target = workerNode.getDirectory() + tmpname + "/" + remoteName;
- logger.info("Target is " + target);
- if (sshRef != null) {
- if (!input.isForceCopy()) {
- String linkCommand = workerNode.getLinkCommand();
- if (linkCommand != null) {
- String actualLinkCommand = getActualOsCommand(
- linkCommand, sshRef.getFullPath(), remoteName,
- target);
- precedingCommands.add(actualLinkCommand);
- return target;
-
- }
- }
- String copyCommand = workerNode.getCopyCommand();
- if (copyCommand != null) {
- String actualCopyCommand = getActualOsCommand(copyCommand,
- sshRef.getFullPath(), remoteName, target);
- precedingCommands.add(actualCopyCommand);
- return target;
- }
- }
- try {
- ChannelSftp sftp = SshPool.getSftpPutChannel(workerNode,
- askUserForPw);
- synchronized (getNodeLock(workerNode)) {
- InputStream r = getAsStream(referenceService, t2Reference);
- sftp.put(r, target);
- r.close();
- }
- } catch (SftpException e) {
- throw new InvocationException(e);
- } catch (JSchException e) {
- throw new InvocationException(e);
- } catch (IOException e) {
- throw new InvocationException(e);
- }
- return target;
- } else {
- String value = (String) referenceService.renderIdentifier(
- t2Reference, String.class, this.getContext());
- return value;
-
- }
- }
-
- public SshReference getAsSshReference(ReferenceService referenceService,
- T2Reference t2Reference, SshNode workerNode) {
- Identified identified = referenceService.resolveIdentifier(t2Reference,
- null, null);
- if (identified instanceof ReferenceSet) {
- for (ExternalReferenceSPI ref : ((ReferenceSet) identified)
- .getExternalReferences()) {
- if (ref instanceof SshReference) {
- SshReference sshRef = (SshReference) ref;
- if (sshRef.getHost().equals(workerNode.getHost())) {
- return sshRef;
- }
- }
- }
- }
- return null;
- }
-
- private static Object getNodeLock(final SshNode node) {
- return getNodeLock(node.getHost());
- }
-
- private static synchronized Object getNodeLock(String hostName) {
- if (!nodeLock.containsKey(hostName)) {
- nodeLock.put(hostName, new Object());
- }
- return nodeLock.get(hostName);
- }
-
- @Override
- public void setStdIn(ReferenceService referenceService,
- T2Reference t2Reference) {
- stdInputStream = new BufferedInputStream(getAsStream(referenceService,
- t2Reference));
- }
-
- @Override
- public void rememberRun(String runId) {
- this.setRunId(runId);
- Set<SshUrl> directories = runIdToTempDir.get(runId);
- if (directories == null) {
- directories = Collections.synchronizedSet(new HashSet<SshUrl>());
- runIdToTempDir.put(runId, directories);
- }
- location = new SshUrl(workerNode);
- location.setSubDirectory(tmpname);
- directories.add(location);
- }
-
- private void forgetRun() {
- Set<SshUrl> directories = runIdToTempDir.get(getRunId());
- directories.remove(location);
- }
-
- public static void load(File directory) {
- File invocationsFile = new File(directory, SSH_INVOCATION_FILE);
- if (!invocationsFile.exists()) {
- return;
- }
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(invocationsFile));
- String line = reader.readLine();
- while (line != null) {
- String[] parts = line.split(" ");
- if (parts.length != 2) {
- break;
- }
- String runId = parts[0];
- String urlString = parts[1];
- Set<SshUrl> urls = runIdToTempDir.get(runId);
- if (urls == null) {
- urls = new HashSet<SshUrl>();
- runIdToTempDir.put(runId, urls);
- }
- URI uri = new URI(urlString);
- String fullPath = uri.getPath();
- String path = fullPath.substring(0, fullPath.lastIndexOf("/"));
- String tempDir = fullPath.substring(fullPath.lastIndexOf("/"));
- SshNode node = SshNodeFactory.getInstance().getSshNode(
- uri.getHost(), uri.getPort(), path);
- SshUrl newUrl = new SshUrl(node);
- newUrl.setSubDirectory(tempDir);
- urls.add(newUrl);
- line = reader.readLine();
- }
- } catch (FileNotFoundException e) {
- logger.error(e);
- } catch (URISyntaxException e) {
- logger.error(e);
- } catch (IOException e) {
- logger.error(e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- logger.error(e);
- }
- }
- }
- }
-
- public static void persist(File directory) {
- File invocationsFile = new File(directory, SSH_INVOCATION_FILE);
- BufferedWriter writer = null;
- try {
- writer = new BufferedWriter(new FileWriter(invocationsFile));
- for (String runId : runIdToTempDir.keySet()) {
- for (SshUrl url : runIdToTempDir.get(runId)) {
- writer.write(runId);
- writer.write(" ");
- writer.write(url.toString());
- writer.newLine();
- }
- }
- } catch (IOException e) {
- logger.error(e);
- } finally {
- if (writer != null) {
- try {
- writer.close();
- } catch (IOException e) {
- logger.error(e);
- }
- }
- }
- }
-}