You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/23 11:20:09 UTC
[02/26] incubator-taverna-server git commit: Revert "temporarily
empty repository"
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java
----------------------------------------------------------------------
diff --git a/server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java b/server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java
new file mode 100644
index 0000000..adf3ea7
--- /dev/null
+++ b/server-worker/src/main/java/org/taverna/server/localworker/impl/LocalWorker.java
@@ -0,0 +1,769 @@
+/*
+ * Copyright (C) 2010-2012 The University of Manchester
+ *
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.localworker.impl;
+
+import static java.lang.Runtime.getRuntime;
+import static java.lang.System.getProperty;
+import static java.lang.System.out;
+import static java.lang.management.ManagementFactory.getRuntimeMXBean;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FileUtils.forceMkdir;
+import static org.apache.commons.io.FileUtils.writeByteArrayToFile;
+import static org.apache.commons.io.FileUtils.writeLines;
+import static org.taverna.server.localworker.api.Constants.HELIO_TOKEN_NAME;
+import static org.taverna.server.localworker.api.Constants.KEYSTORE_FILE;
+import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD;
+import static org.taverna.server.localworker.api.Constants.SECURITY_DIR_NAME;
+import static org.taverna.server.localworker.api.Constants.SHARED_DIR_PROP;
+import static org.taverna.server.localworker.api.Constants.SUBDIR_LIST;
+import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING;
+import static org.taverna.server.localworker.api.Constants.TRUSTSTORE_FILE;
+import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedFile;
+import static org.taverna.server.localworker.remote.RemoteStatus.Finished;
+import static org.taverna.server.localworker.remote.RemoteStatus.Initialized;
+import static org.taverna.server.localworker.remote.RemoteStatus.Operating;
+import static org.taverna.server.localworker.remote.RemoteStatus.Stopped;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.taverna.server.localworker.api.Worker;
+import org.taverna.server.localworker.api.WorkerFactory;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteSecurityContext;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.remote.StillWorkingOnItException;
+import org.taverna.server.localworker.server.UsageRecordReceiver;
+
+/**
+ * This class implements one side of the connection between the Taverna Server
+ * master server and this process. It delegates to a {@link Worker} instance the
+ * handling of actually running a workflow.
+ *
+ * @author Donal Fellows
+ * @see DirectoryDelegate
+ * @see FileDelegate
+ * @see WorkerCore
+ */
+@SuppressWarnings("serial")
+public class LocalWorker extends UnicastRemoteObject implements RemoteSingleRun {
+ // ----------------------- CONSTANTS -----------------------
+
+ /** Handle to the directory containing the security info. */
+ static final File SECURITY_DIR;
+ static final String SLASHTEMP;
+ static {
+ SLASHTEMP = getProperty("java.io.tmpdir");
+ File home = new File(getProperty("user.home"));
+ // If we can't write to $HOME (i.e., we're in an odd deployment) use
+ // the official version of /tmp/$PID as a fallback.
+ if (!home.canWrite())
+ home = new File(SLASHTEMP, getRuntimeMXBean().getName());
+ SECURITY_DIR = new File(home, SECURITY_DIR_NAME);
+ }
+
+ // ----------------------- VARIABLES -----------------------
+
+ /**
+ * Magic flag used to turn off problematic code when testing inside CI
+ * environment.
+ */
+ static boolean DO_MKDIR = true;
+
+ /** What to use to run a workflow engine. */
+ private final String executeWorkflowCommand;
+ /** What workflow to run. */
+ private final byte[] workflow;
+ /** The remote access object for the working directory. */
+ private final DirectoryDelegate baseDir;
+ /** What inputs to pass as files. */
+ final Map<String, String> inputFiles;
+ /** What inputs to pass as files (as file refs). */
+ final Map<String, File> inputRealFiles;
+ /** What inputs to pass as direct values. */
+ final Map<String, String> inputValues;
+ /** What delimiters to use. */
+ final Map<String, String> inputDelimiters;
+ /** The interface to the workflow engine subprocess. */
+ private final Worker core;
+ /** Our descriptor token (UUID). */
+ private final String masterToken;
+ /**
+ * The root working directory for a workflow run, or <tt>null</tt> if it has
+ * been deleted.
+ */
+ private File base;
+ /**
+ * When did this workflow start running, or <tt>null</tt> for
+ * "never/not yet".
+ */
+ private Date start;
+ /**
+ * When did this workflow finish running, or <tt>null</tt> for
+ * "never/not yet".
+ */
+ private Date finish;
+ /** The cached status of the workflow run. */
+ RemoteStatus status;
+ /**
+ * The name of the input Baclava document, or <tt>null</tt> to not do it
+ * that way.
+ */
+ String inputBaclava;
+ /**
+ * The name of the output Baclava document, or <tt>null</tt> to not do it
+ * that way.
+ */
+ String outputBaclava;
+ /**
+ * The file containing the input Baclava document, or <tt>null</tt> to not
+ * do it that way.
+ */
+ private File inputBaclavaFile;
+ /**
+ * The file containing the output Baclava document, or <tt>null</tt> to not
+ * do it that way.
+ */
+ private File outputBaclavaFile;
+ /**
+ * Registered shutdown hook so that we clean up when this process is killed
+ * off, or <tt>null</tt> if that is no longer necessary.
+ */
+ Thread shutdownHook;
+ /** Location for security information to be written to. */
+ File securityDirectory;
+ /**
+ * Password to use to encrypt security information.
+ */
+ char[] keystorePassword = KEYSTORE_PASSWORD;
+ /** Additional server-specified environment settings. */
+ Map<String, String> environment = new HashMap<>();
+ /** Additional server-specified java runtime settings. */
+ List<String> runtimeSettings = new ArrayList<>();
+ URL interactionFeedURL;
+ URL webdavURL;
+ URL publishURL;//FIXME
+ private boolean doProvenance = true;
+
+ // ----------------------- METHODS -----------------------
+
+ /**
+ * @param executeWorkflowCommand
+ * The script used to execute workflows.
+ * @param workflow
+ * The workflow to execute.
+ * @param workerClass
+ * The class to instantiate as our local representative of the
+ * run.
+ * @param urReceiver
+ * The remote class to report the generated usage record(s) to.
+ * @param id
+ * The UUID to use, or <tt>null</tt> if we are to invent one.
+ * @param seedEnvironment
+ * The key/value pairs to seed the worker subprocess environment
+ * with.
+ * @param javaParams
+ * Parameters to pass to the worker subprocess java runtime
+ * itself.
+ * @param workerFactory
+ * How to make instances of the low-level worker objects.
+ * @throws RemoteException
+ * If registration of the worker fails.
+ * @throws ImplementationException
+ * If something goes wrong during local setup.
+ */
+ protected LocalWorker(String executeWorkflowCommand, byte[] workflow,
+ UsageRecordReceiver urReceiver, UUID id,
+ Map<String, String> seedEnvironment, List<String> javaParams,
+ WorkerFactory workerFactory) throws RemoteException,
+ ImplementationException {
+ super();
+ if (id == null)
+ id = randomUUID();
+ masterToken = id.toString();
+ this.workflow = workflow;
+ this.executeWorkflowCommand = executeWorkflowCommand;
+ String sharedDir = getProperty(SHARED_DIR_PROP, SLASHTEMP);
+ base = new File(sharedDir, masterToken);
+ out.println("about to create " + base);
+ try {
+ forceMkdir(base);
+ for (String subdir : SUBDIR_LIST) {
+ new File(base, subdir).mkdir();
+ }
+ } catch (IOException e) {
+ throw new ImplementationException(
+ "problem creating run working directory", e);
+ }
+ baseDir = new DirectoryDelegate(base, null);
+ inputFiles = new HashMap<>();
+ inputRealFiles = new HashMap<>();
+ inputValues = new HashMap<>();
+ inputDelimiters = new HashMap<>();
+ environment.putAll(seedEnvironment);
+ runtimeSettings.addAll(javaParams);
+ try {
+ core = workerFactory.makeInstance();
+ } catch (Exception e) {
+ out.println("problem when creating core worker implementation");
+ e.printStackTrace(out);
+ throw new ImplementationException(
+ "problem when creating core worker implementation", e);
+ }
+ core.setURReceiver(urReceiver);
+ Thread t = new Thread(new Runnable() {
+ /**
+ * Kill off the worker launched by the core.
+ */
+ @Override
+ public void run() {
+ try {
+ shutdownHook = null;
+ destroy();
+ } catch (ImplementationException e) {
+ // Absolutely nothing we can do here
+ }
+ }
+ });
+ getRuntime().addShutdownHook(t);
+ shutdownHook = t;
+ status = Initialized;
+ }
+
+ @Override
+ public void destroy() throws ImplementationException {
+ killWorkflowSubprocess();
+ removeFromShutdownHooks();
+ // Is this it?
+ deleteWorkingDirectory();
+ deleteSecurityManagerDirectory();
+ core.deleteLocalResources();
+ }
+
+ private void killWorkflowSubprocess() {
+ if (status != Finished && status != Initialized)
+ try {
+ core.killWorker();
+ if (finish == null)
+ finish = new Date();
+ } catch (Exception e) {
+ out.println("problem when killing worker");
+ e.printStackTrace(out);
+ }
+ }
+
+ private void removeFromShutdownHooks() throws ImplementationException {
+ try {
+ if (shutdownHook != null)
+ getRuntime().removeShutdownHook(shutdownHook);
+ } catch (RuntimeException e) {
+ throw new ImplementationException("problem removing shutdownHook",
+ e);
+ } finally {
+ shutdownHook = null;
+ }
+ }
+
+ private void deleteWorkingDirectory() throws ImplementationException {
+ try {
+ if (base != null)
+ forceDelete(base);
+ } catch (IOException e) {
+ out.println("problem deleting working directory");
+ e.printStackTrace(out);
+ throw new ImplementationException(
+ "problem deleting working directory", e);
+ } finally {
+ base = null;
+ }
+ }
+
+ private void deleteSecurityManagerDirectory()
+ throws ImplementationException {
+ try {
+ if (securityDirectory != null)
+ forceDelete(securityDirectory);
+ } catch (IOException e) {
+ out.println("problem deleting security directory");
+ e.printStackTrace(out);
+ throw new ImplementationException(
+ "problem deleting security directory", e);
+ } finally {
+ securityDirectory = null;
+ }
+ }
+
+ @Override
+ public void addListener(RemoteListener listener) throws RemoteException,
+ ImplementationException {
+ throw new ImplementationException("not implemented");
+ }
+
+ @Override
+ public String getInputBaclavaFile() {
+ return inputBaclava;
+ }
+
+ @Override
+ public List<RemoteInput> getInputs() throws RemoteException {
+ ArrayList<RemoteInput> result = new ArrayList<>();
+ for (String name : inputFiles.keySet())
+ result.add(new InputDelegate(name));
+ return result;
+ }
+
+ @Override
+ public List<String> getListenerTypes() {
+ return emptyList();
+ }
+
+ @Override
+ public List<RemoteListener> getListeners() {
+ return singletonList(core.getDefaultListener());
+ }
+
+ @Override
+ public String getOutputBaclavaFile() {
+ return outputBaclava;
+ }
+
+ class SecurityDelegate extends UnicastRemoteObject implements
+ RemoteSecurityContext {
+ private void setPrivatePerms(File dir) {
+ if (!dir.setReadable(false, false) || !dir.setReadable(true, true)
+ || !dir.setExecutable(false, false)
+ || !dir.setExecutable(true, true)
+ || !dir.setWritable(false, false)
+ || !dir.setWritable(true, true)) {
+ out.println("warning: "
+ + "failed to set permissions on security context directory");
+ }
+ }
+
+ protected SecurityDelegate(String token) throws IOException {
+ super();
+ if (DO_MKDIR) {
+ securityDirectory = new File(SECURITY_DIR, token);
+ forceMkdir(securityDirectory);
+ setPrivatePerms(securityDirectory);
+ }
+ }
+
+ /**
+ * Write some data to a given file in the context directory.
+ *
+ * @param name
+ * The name of the file to write.
+ * @param data
+ * The bytes to put in the file.
+ * @throws RemoteException
+ * If anything goes wrong.
+ * @throws ImplementationException
+ */
+ protected void write(String name, byte[] data) throws RemoteException,
+ ImplementationException {
+ try {
+ File f = new File(securityDirectory, name);
+ writeByteArrayToFile(f, data);
+ } catch (IOException e) {
+ throw new ImplementationException("problem writing " + name, e);
+ }
+ }
+
+ /**
+ * Write some data to a given file in the context directory.
+ *
+ * @param name
+ * The name of the file to write.
+ * @param data
+ * The lines to put in the file. The
+ * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding}
+ * will be used to do the writing.
+ * @throws RemoteException
+ * If anything goes wrong.
+ * @throws ImplementationException
+ */
+ protected void write(String name, Collection<String> data)
+ throws RemoteException, ImplementationException {
+ try {
+ File f = new File(securityDirectory, name);
+ writeLines(f, SYSTEM_ENCODING, data);
+ } catch (IOException e) {
+ throw new ImplementationException("problem writing " + name, e);
+ }
+ }
+
+ /**
+ * Write some data to a given file in the context directory.
+ *
+ * @param name
+ * The name of the file to write.
+ * @param data
+ * The line to put in the file. The
+ * {@linkplain LocalWorker#SYSTEM_ENCODING system encoding}
+ * will be used to do the writing.
+ * @throws RemoteException
+ * If anything goes wrong.
+ * @throws ImplementationException
+ */
+ protected void write(String name, char[] data) throws RemoteException,
+ ImplementationException {
+ try {
+ File f = new File(securityDirectory, name);
+ writeLines(f, SYSTEM_ENCODING, asList(new String(data)));
+ } catch (IOException e) {
+ throw new ImplementationException("problem writing " + name, e);
+ }
+ }
+
+ @Override
+ public void setKeystore(byte[] keystore) throws RemoteException,
+ ImplementationException {
+ if (status != Initialized)
+ throw new RemoteException("not initializing");
+ if (keystore == null)
+ throw new IllegalArgumentException("keystore may not be null");
+ write(KEYSTORE_FILE, keystore);
+ }
+
+ @Override
+ public void setPassword(char[] password) throws RemoteException {
+ if (status != Initialized)
+ throw new RemoteException("not initializing");
+ if (password == null)
+ throw new IllegalArgumentException("password may not be null");
+ keystorePassword = password.clone();
+ }
+
+ @Override
+ public void setTruststore(byte[] truststore) throws RemoteException,
+ ImplementationException {
+ if (status != Initialized)
+ throw new RemoteException("not initializing");
+ if (truststore == null)
+ throw new IllegalArgumentException("truststore may not be null");
+ write(TRUSTSTORE_FILE, truststore);
+ }
+
+ @Override
+ public void setUriToAliasMap(Map<URI, String> uriToAliasMap)
+ throws RemoteException {
+ if (status != Initialized)
+ throw new RemoteException("not initializing");
+ if (uriToAliasMap == null)
+ return;
+ ArrayList<String> lines = new ArrayList<>();
+ for (Entry<URI, String> site : uriToAliasMap.entrySet())
+ lines.add(site.getKey().toASCIIString() + " " + site.getValue());
+ // write(URI_ALIAS_MAP, lines);
+ }
+
+ @Override
+ public void setHelioToken(String helioToken) throws RemoteException {
+ if (status != Initialized)
+ throw new RemoteException("not initializing");
+ out.println("registering HELIO CIS token for export");
+ environment.put(HELIO_TOKEN_NAME, helioToken);
+ }
+ }
+
+ @Override
+ public RemoteSecurityContext getSecurityContext() throws RemoteException,
+ ImplementationException {
+ try {
+ return new SecurityDelegate(masterToken);
+ } catch (RemoteException e) {
+ if (e.getCause() != null)
+ throw new ImplementationException(
+ "problem initializing security context", e.getCause());
+ throw e;
+ } catch (IOException e) {
+ throw new ImplementationException(
+ "problem initializing security context", e);
+ }
+ }
+
+ @Override
+ public RemoteStatus getStatus() {
+ // only state that can spontaneously change to another
+ if (status == Operating) {
+ status = core.getWorkerStatus();
+ if (status == Finished && finish == null)
+ finish = new Date();
+ }
+ return status;
+ }
+
+ @Override
+ public RemoteDirectory getWorkingDirectory() {
+ return baseDir;
+ }
+
+ File validateFilename(String filename) throws RemoteException {
+ if (filename == null)
+ throw new IllegalArgumentException("filename must be non-null");
+ try {
+ return getValidatedFile(base, filename.split("/"));
+ } catch (IOException e) {
+ throw new IllegalArgumentException("failed to validate filename", e);
+ }
+ }
+
+ class InputDelegate extends UnicastRemoteObject implements RemoteInput {
+ private String name;
+
+ InputDelegate(String name) throws RemoteException {
+ super();
+ this.name = name;
+ if (!inputFiles.containsKey(name)) {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ inputFiles.put(name, null);
+ inputRealFiles.put(name, null);
+ inputValues.put(name, null);
+ inputDelimiters.put(name, null);
+ }
+ }
+
+ @Override
+ public String getFile() {
+ return inputFiles.get(name);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getValue() {
+ return inputValues.get(name);
+ }
+
+ @Override
+ public String getDelimiter() throws RemoteException {
+ return inputDelimiters.get(name);
+ }
+
+ @Override
+ public void setFile(String file) throws RemoteException {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ inputRealFiles.put(name, validateFilename(file));
+ inputValues.put(name, null);
+ inputFiles.put(name, file);
+ inputBaclava = null;
+ }
+
+ @Override
+ public void setValue(String value) throws RemoteException {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ inputValues.put(name, value);
+ inputFiles.put(name, null);
+ inputRealFiles.put(name, null);
+ inputBaclava = null;
+ }
+
+ @Override
+ public void setDelimiter(String delimiter) throws RemoteException {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ if (inputBaclava != null)
+ throw new IllegalStateException("input baclava file set");
+ if (delimiter != null) {
+ if (delimiter.length() > 1)
+ throw new IllegalStateException(
+ "multi-character delimiter not permitted");
+ if (delimiter.charAt(0) == 0)
+ throw new IllegalStateException(
+ "may not use NUL for splitting");
+ if (delimiter.charAt(0) > 127)
+ throw new IllegalStateException(
+ "only ASCII characters supported for splitting");
+ }
+ inputDelimiters.put(name, delimiter);
+ }
+ }
+
+ @Override
+ public RemoteInput makeInput(String name) throws RemoteException {
+ return new InputDelegate(name);
+ }
+
+ @Override
+ public RemoteListener makeListener(String type, String configuration)
+ throws RemoteException {
+ throw new RemoteException("listener manufacturing unsupported");
+ }
+
+ @Override
+ public void setInputBaclavaFile(String filename) throws RemoteException {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ inputBaclavaFile = validateFilename(filename);
+ for (String input : inputFiles.keySet()) {
+ inputFiles.put(input, null);
+ inputRealFiles.put(input, null);
+ inputValues.put(input, null);
+ }
+ inputBaclava = filename;
+ }
+
+ @Override
+ public void setOutputBaclavaFile(String filename) throws RemoteException {
+ if (status != Initialized)
+ throw new IllegalStateException("not initializing");
+ if (filename != null)
+ outputBaclavaFile = validateFilename(filename);
+ else
+ outputBaclavaFile = null;
+ outputBaclava = filename;
+ }
+
+ @Override
+ public void setGenerateProvenance(boolean prov) {
+ doProvenance = prov;
+ }
+
+ @Override
+ public void setStatus(RemoteStatus newStatus)
+ throws IllegalStateTransitionException, RemoteException,
+ ImplementationException, StillWorkingOnItException {
+ if (status == newStatus)
+ return;
+
+ switch (newStatus) {
+ case Initialized:
+ throw new IllegalStateTransitionException(
+ "may not move back to start");
+ case Operating:
+ switch (status) {
+ case Initialized:
+ boolean started;
+ try {
+ started = createWorker();
+ } catch (Exception e) {
+ throw new ImplementationException(
+ "problem creating executing workflow", e);
+ }
+ if (!started)
+ throw new StillWorkingOnItException(
+ "workflow start in process");
+ break;
+ case Stopped:
+ try {
+ core.startWorker();
+ } catch (Exception e) {
+ throw new ImplementationException(
+ "problem continuing workflow run", e);
+ }
+ break;
+ case Finished:
+ throw new IllegalStateTransitionException("already finished");
+ default:
+ break;
+ }
+ status = Operating;
+ break;
+ case Stopped:
+ switch (status) {
+ case Initialized:
+ throw new IllegalStateTransitionException(
+ "may only stop from Operating");
+ case Operating:
+ try {
+ core.stopWorker();
+ } catch (Exception e) {
+ throw new ImplementationException(
+ "problem stopping workflow run", e);
+ }
+ break;
+ case Finished:
+ throw new IllegalStateTransitionException("already finished");
+ default:
+ break;
+ }
+ status = Stopped;
+ break;
+ case Finished:
+ switch (status) {
+ case Operating:
+ case Stopped:
+ try {
+ core.killWorker();
+ if (finish == null)
+ finish = new Date();
+ } catch (Exception e) {
+ throw new ImplementationException(
+ "problem killing workflow run", e);
+ }
+ default:
+ break;
+ }
+ status = Finished;
+ break;
+ }
+ }
+
+ private boolean createWorker() throws Exception {
+ start = new Date();
+ char[] pw = keystorePassword;
+ keystorePassword = null;
+ /*
+ * Do not clear the keystorePassword array here; its ownership is
+ * *transferred* to the worker core which doesn't copy it but *does*
+ * clear it after use.
+ */
+ return core.initWorker(this, executeWorkflowCommand, workflow, base,
+ inputBaclavaFile, inputRealFiles, inputValues, inputDelimiters,
+ outputBaclavaFile, securityDirectory, pw, doProvenance,
+ environment, masterToken, runtimeSettings);
+ }
+
+ @Override
+ public Date getFinishTimestamp() {
+ return finish == null ? null : new Date(finish.getTime());
+ }
+
+ @Override
+ public Date getStartTimestamp() {
+ return start == null ? null : new Date(start.getTime());
+ }
+
+ @Override
+ public void setInteractionServiceDetails(URL feed, URL webdav, URL publish) {
+ interactionFeedURL = feed;
+ webdavURL = webdav;
+ publishURL = publish;
+ }
+
+ @Override
+ public void ping() {
+ // Do nothing here; this *should* be empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java
----------------------------------------------------------------------
diff --git a/server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java b/server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java
new file mode 100644
index 0000000..03ee69d
--- /dev/null
+++ b/server-worker/src/main/java/org/taverna/server/localworker/impl/TavernaRunManager.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright (C) 2010-2011 The University of Manchester
+ *
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.localworker.impl;
+
+import static java.lang.Runtime.getRuntime;
+import static java.lang.System.exit;
+import static java.lang.System.getProperty;
+import static java.lang.System.out;
+import static java.lang.System.setProperty;
+import static java.lang.System.setSecurityManager;
+import static java.rmi.registry.LocateRegistry.getRegistry;
+import static org.taverna.server.localworker.api.Constants.DEATH_DELAY;
+import static org.taverna.server.localworker.api.Constants.LOCALHOST;
+import static org.taverna.server.localworker.api.Constants.RMI_HOST_PROP;
+import static org.taverna.server.localworker.api.Constants.SECURITY_POLICY_FILE;
+import static org.taverna.server.localworker.api.Constants.SEC_POLICY_PROP;
+import static org.taverna.server.localworker.api.Constants.UNSECURE_PROP;
+
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.rmi.RMISecurityManager;
+import java.rmi.RemoteException;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.taverna.server.localworker.api.RunAccounting;
+import org.taverna.server.localworker.api.Worker;
+import org.taverna.server.localworker.api.WorkerFactory;
+import org.taverna.server.localworker.remote.RemoteRunFactory;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.server.UsageRecordReceiver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import uk.org.taverna.scufl2.api.io.WorkflowBundleIO;
+
+/**
+ * The registered factory for runs, this class is responsible for constructing
+ * runs that are suitable for particular users. It is also the entry point for
+ * this whole process.
+ *
+ * @author Donal Fellows
+ * @see LocalWorker
+ */
+@SuppressWarnings("serial")
+public class TavernaRunManager extends UnicastRemoteObject implements
+ RemoteRunFactory, RunAccounting, WorkerFactory {
+ String command;
+ Map<String, String> seedEnvironment = new HashMap<>();
+ List<String> javaInitParams = new ArrayList<>();
+ private WorkflowBundleIO io;
+ private int activeRuns = 0;
+ // Hacks!
+ public static String interactionHost;
+ public static String interactionPort;
+ public static String interactionWebdavPath;
+ public static String interactionFeedPath;
+
+ /**
+ * How to get the actual workflow document from the XML document that it is
+ * contained in.
+ *
+ * @param containerDocument
+ * The document sent from the web interface.
+ * @return The element describing the workflow, as expected by the Taverna
+ * command line executor.
+ */
+ protected Element unwrapWorkflow(Document containerDocument) {
+ return (Element) containerDocument.getDocumentElement().getFirstChild();
+ }
+
+ private static final String usage = "java -jar server.worker.jar workflowExecScript ?-Ekey=val...? ?-Jconfig? UUID";
+
+ /**
+ * An RMI-enabled factory for runs.
+ *
+ * @param command
+ * What command to call to actually run a run.
+ * @throws RemoteException
+ * If anything goes wrong during creation of the instance.
+ */
+ public TavernaRunManager(String command) throws RemoteException {
+ this.command = command;
+ this.io = new WorkflowBundleIO();
+ }
+
+ @Override
+ public RemoteSingleRun make(byte[] workflow, String creator,
+ UsageRecordReceiver urReceiver, UUID id) throws RemoteException {
+ if (creator == null)
+ throw new RemoteException("no creator");
+ try {
+ URI wfid = io.readBundle(new ByteArrayInputStream(workflow), null)
+ .getMainWorkflow().getWorkflowIdentifier();
+ out.println("Creating run from workflow <" + wfid + "> for <"
+ + creator + ">");
+ return new LocalWorker(command, workflow, urReceiver, id,
+ seedEnvironment, javaInitParams, this);
+ } catch (RemoteException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RemoteException("bad instance construction", e);
+ }
+ }
+
+ private static boolean shuttingDown;
+ private static String factoryName;
+ private static Registry registry;
+
+ static synchronized void unregisterFactory() {
+ if (!shuttingDown) {
+ shuttingDown = true;
+ try {
+ if (factoryName != null && registry != null)
+ registry.unbind(factoryName);
+ } catch (Exception e) {
+ e.printStackTrace(out);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ unregisterFactory();
+ new Thread(new DelayedDeath()).start();
+ }
+
+ static class DelayedDeath implements Runnable {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(DEATH_DELAY);
+ } catch (InterruptedException e) {
+ } finally {
+ exit(0);
+ }
+ }
+ }
+
+ private void addArgument(String arg) {
+ if (arg.startsWith("-E")) {
+ String trimmed = arg.substring(2);
+ int idx = trimmed.indexOf('=');
+ if (idx > 0) {
+ addEnvironmentDefinition(trimmed.substring(0, idx),
+ trimmed.substring(idx + 1));
+ return;
+ }
+ } else if (arg.startsWith("-D")) {
+ if (arg.indexOf('=') > 0) {
+ addJavaParameter(arg);
+ return;
+ }
+ } else if (arg.startsWith("-J")) {
+ addJavaParameter(arg.substring(2));
+ return;
+ }
+ throw new IllegalArgumentException("argument \"" + arg
+ + "\" must start with -D, -E or -J; "
+ + "-D and -E must contain a \"=\"");
+ }
+
+ /**
+ * @param args
+ * The arguments from the command line invocation.
+ * @throws Exception
+ * If we can't connect to the RMI registry, or if we can't read
+ * the workflow, or if we can't build the worker instance, or
+ * register it. Also if the arguments are wrong.
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2)
+ throw new Exception("wrong # args: must be \"" + usage + "\"");
+ if (!getProperty(UNSECURE_PROP, "no").equals("yes")) {
+ setProperty(SEC_POLICY_PROP, LocalWorker.class.getClassLoader()
+ .getResource(SECURITY_POLICY_FILE).toExternalForm());
+ setProperty(RMI_HOST_PROP, LOCALHOST);
+ }
+ setSecurityManager(new RMISecurityManager());
+ factoryName = args[args.length - 1];
+ TavernaRunManager man = new TavernaRunManager(args[0]);
+ for (int i = 1; i < args.length - 1; i++)
+ man.addArgument(args[i]);
+ registry = getRegistry(LOCALHOST);
+
+ registry.bind(factoryName, man);
+ getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ unregisterFactory();
+ }
+ });
+ out.println("registered RemoteRunFactory with ID " + factoryName);
+ }
+
+ private void addJavaParameter(String string) {
+ this.javaInitParams.add(string);
+ }
+
+ private void addEnvironmentDefinition(String key, String value) {
+ this.seedEnvironment.put(key, value);
+ }
+
+ @Override
+ public void setInteractionServiceDetails(String host, String port,
+ String webdavPath, String feedPath) throws RemoteException {
+ if (host == null || port == null || webdavPath == null
+ || feedPath == null)
+ throw new IllegalArgumentException("all params must be non-null");
+ interactionHost = host;
+ interactionPort = port;
+ interactionWebdavPath = webdavPath;
+ interactionFeedPath = feedPath;
+ }
+
+ @Override
+ public synchronized int countOperatingRuns() {
+ return (activeRuns < 0 ? 0 : activeRuns);
+ }
+
+ @Override
+ public synchronized void runStarted() {
+ activeRuns++;
+ }
+
+ @Override
+ public synchronized void runCeased() {
+ activeRuns--;
+ }
+
+ @Override
+ public Worker makeInstance() throws Exception {
+ return new WorkerCore(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java
----------------------------------------------------------------------
diff --git a/server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java b/server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java
new file mode 100644
index 0000000..1a6cff8
--- /dev/null
+++ b/server-worker/src/main/java/org/taverna/server/localworker/impl/WorkerCore.java
@@ -0,0 +1,918 @@
+/*
+ * Copyright (C) 2010-2012 The University of Manchester
+ *
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.localworker.impl;
+
+import static java.io.File.createTempFile;
+import static java.io.File.pathSeparator;
+import static java.lang.Boolean.parseBoolean;
+import static java.lang.Double.parseDouble;
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
+import static java.lang.Runtime.getRuntime;
+import static java.lang.System.out;
+import static java.net.InetAddress.getLocalHost;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FileUtils.sizeOfDirectory;
+import static org.apache.commons.io.FileUtils.write;
+import static org.apache.commons.io.IOUtils.copy;
+import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_DIRECTORY;
+import static org.taverna.server.localworker.api.Constants.CREDENTIAL_MANAGER_PASSWORD;
+import static org.taverna.server.localworker.api.Constants.DEATH_TIME;
+import static org.taverna.server.localworker.api.Constants.DEFAULT_LISTENER_NAME;
+import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD;
+import static org.taverna.server.localworker.api.Constants.START_WAIT_TIME;
+import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING;
+import static org.taverna.server.localworker.api.Constants.TIME;
+import static org.taverna.server.localworker.impl.Status.Aborted;
+import static org.taverna.server.localworker.impl.Status.Completed;
+import static org.taverna.server.localworker.impl.Status.Failed;
+import static org.taverna.server.localworker.impl.Status.Held;
+import static org.taverna.server.localworker.impl.Status.Started;
+import static org.taverna.server.localworker.impl.TavernaRunManager.interactionFeedPath;
+import static org.taverna.server.localworker.impl.TavernaRunManager.interactionHost;
+import static org.taverna.server.localworker.impl.TavernaRunManager.interactionPort;
+import static org.taverna.server.localworker.impl.TavernaRunManager.interactionWebdavPath;
+import static org.taverna.server.localworker.impl.WorkerCore.pmap;
+import static org.taverna.server.localworker.remote.RemoteStatus.Finished;
+import static org.taverna.server.localworker.remote.RemoteStatus.Initialized;
+import static org.taverna.server.localworker.remote.RemoteStatus.Operating;
+import static org.taverna.server.localworker.remote.RemoteStatus.Stopped;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.ws.Holder;
+
+import org.ogf.usage.JobUsageRecord;
+import org.taverna.server.localworker.api.RunAccounting;
+import org.taverna.server.localworker.api.Worker;
+import org.taverna.server.localworker.impl.utils.TimingOutTask;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.server.UsageRecordReceiver;
+
+/**
+ * The core class that connects to a Taverna command-line workflow execution
+ * engine. This implementation always registers a single listener, &lquo;
+ * <tt>io</tt> &rquo;, with two properties representing the stdout and stderr of
+ * the run and one representing the exit code. The listener is
+ * remote-accessible. It does not support attaching any other listeners.
+ *
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+public class WorkerCore extends UnicastRemoteObject implements Worker,
+ RemoteListener {
+ @Nonnull
+ static final Map<String, Property> pmap = new HashMap<>();
+ /**
+ * Regular expression to extract the detailed timing information from the
+ * output of /usr/bin/time
+ */
+ @Nonnull
+ private static final Pattern TimeRE;
+ static {
+ final String TIMERE = "([0-9.:]+)";
+ final String TERMS = "(real|user|system|sys|elapsed)";
+ TimeRE = Pattern.compile(TIMERE + " *" + TERMS + "[ \t]*" + TIMERE
+ + " *" + TERMS + "[ \t]*" + TIMERE + " *" + TERMS);
+ }
+
+ /**
+ * Environment variables to remove before any fork (because they're large or
+ * potentially leaky).
+ */
+ // TODO Conduct a proper survey of what to remove
+ @Nonnull
+ private static final String[] ENVIRONMENT_TO_REMOVE = { "SUDO_COMMAND",
+ "SUDO_USER", "SUDO_GID", "SUDO_UID", "DISPLAY", "LS_COLORS",
+ "XFILESEARCHPATH", "SSH_AGENT_PID", "SSH_AUTH_SOCK" };
+
+ @Nullable
+ Process subprocess;
+ @Nonnull
+ final StringWriter stdout;
+ @Nonnull
+ final StringWriter stderr;
+ @Nullable
+ Integer exitCode;
+ boolean readyToSendEmail;
+ @Nullable
+ String emailAddress;
+ @Nullable
+ Date start;
+ @Nonnull
+ final RunAccounting accounting;
+ @Nonnull
+ final Holder<Integer> pid;
+
+ private boolean finished;
+ @Nullable
+ private JobUsageRecord ur;
+ @Nullable
+ private File wd;
+ @Nullable
+ private UsageRecordReceiver urreceiver;
+ @Nullable
+ private File workflowFile;
+ private boolean stopped;
+
+ /**
+ * @param accounting
+ * Object that looks after how many runs are executing.
+ * @throws RemoteException
+ */
+ public WorkerCore(@Nonnull RunAccounting accounting) throws RemoteException {
+ super();
+ stdout = new StringWriter();
+ stderr = new StringWriter();
+ pid = new Holder<>();
+ this.accounting = accounting;
+ }
+
+ private int getPID() {
+ synchronized (pid) {
+ if (pid.value == null)
+ return -1;
+ return pid.value;
+ }
+ }
+
+ /**
+ * Fire up the workflow. This causes a transition into the operating state.
+ *
+ * @param executeWorkflowCommand
+ * The command to run to execute the workflow.
+ * @param workflow
+ * The workflow document to execute.
+ * @param workingDir
+ * What directory to use as the working directory.
+ * @param inputBaclava
+ * The baclava file to use for inputs, or <tt>null</tt> to use
+ * the other <b>input*</b> arguments' values.
+ * @param inputFiles
+ * A mapping of input names to files that supply them. Note that
+ * we assume that nothing mapped here will be mapped in
+ * <b>inputValues</b>.
+ * @param inputValues
+ * A mapping of input names to values to supply to them. Note
+ * that we assume that nothing mapped here will be mapped in
+ * <b>inputFiles</b>.
+ * @param outputBaclava
+ * What baclava file to write the output from the workflow into,
+ * or <tt>null</tt> to have it written into the <tt>out</tt>
+ * subdirectory.
+ * @param token
+ * The name of the workflow run.
+ * @return <tt>true</tt> if the worker started, or <tt>false</tt> if a
+ * timeout occurred.
+ * @throws IOException
+ * If any of quite a large number of things goes wrong.
+ */
+ @Override
+ public boolean initWorker(
+ @Nonnull final LocalWorker local,
+ @Nonnull final String executeWorkflowCommand,
+ @Nonnull final byte[] workflow,
+ @Nonnull final File workingDir,
+ @Nullable final File inputBaclava,
+ @Nonnull final Map<String, File> inputFiles,
+ @Nonnull final Map<String, String> inputValues,
+ @Nonnull final Map<String, String> inputDelimiters,
+ @Nullable final File outputBaclava,
+ @Nonnull final File securityDir,
+ @Nullable final char[] password,
+ final boolean generateProvenance,
+ @Nonnull final Map<String, String> environment,
+ @Nullable final String token,
+ @Nonnull final List<String> runtime) throws IOException {
+ try {
+ new TimingOutTask() {
+ @Override
+ public void doIt() throws IOException {
+ startExecutorSubprocess(
+ createProcessBuilder(local, executeWorkflowCommand,
+ workflow, workingDir, inputBaclava,
+ inputFiles, inputValues, inputDelimiters,
+ outputBaclava, securityDir, password,
+ generateProvenance, environment, token,
+ runtime), password);
+ }
+ }.doOrTimeOut(START_WAIT_TIME);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return subprocess != null;
+ }
+
+ private void startExecutorSubprocess(@Nonnull ProcessBuilder pb,
+ @Nullable char[] password) throws IOException {
+ // Start the subprocess
+ out.println("starting " + pb.command() + " in directory "
+ + pb.directory() + " with environment " + pb.environment());
+ subprocess = pb.start();
+ if (subprocess == null)
+ throw new IOException("unknown failure creating process");
+ start = new Date();
+ accounting.runStarted();
+
+ // Capture its stdout and stderr
+ new AsyncCopy(subprocess.getInputStream(), stdout, pid);
+ new AsyncCopy(subprocess.getErrorStream(), stderr);
+ if (password != null)
+ new PasswordWriterThread(subprocess, password);
+ }
+
+ /**
+ * Assemble the process builder. Does not launch the subprocess.
+ *
+ * @param local
+ * The local worker container.
+ * @param executeWorkflowCommand
+ * The reference to the workflow engine implementation.
+ * @param workflow
+ * The workflow to execute.
+ * @param workingDir
+ * The working directory to use.
+ * @param inputBaclava
+ * What file to read a baclava document from (or <tt>null</tt>)
+ * @param inputFiles
+ * The mapping from inputs to files.
+ * @param inputValues
+ * The mapping from inputs to literal values.
+ * @param outputBaclava
+ * What file to write a baclava document to (or <tt>null</tt>)
+ * @param securityDir
+ * The credential manager directory.
+ * @param password
+ * The password for the credential manager.
+ * @param environment
+ * The seed environment
+ * @param token
+ * The run identifier that the server wants to use.
+ * @param runtime
+ * Any runtime parameters to Java.
+ * @return The configured process builder.
+ * @throws IOException
+ * If file handling fails
+ * @throws UnsupportedEncodingException
+ * If we can't encode any text (unlikely)
+ * @throws FileNotFoundException
+ * If we can't write the workflow out (unlikely)
+ */
+ @Nonnull
+ ProcessBuilder createProcessBuilder(@Nonnull LocalWorker local,
+ @Nonnull String executeWorkflowCommand, @Nonnull byte[] workflow,
+ @Nonnull File workingDir, @Nullable File inputBaclava,
+ @Nonnull Map<String, File> inputFiles,
+ @Nonnull Map<String, String> inputValues,
+ @Nonnull Map<String, String> inputDelimiters,
+ @Nullable File outputBaclava, @Nonnull File securityDir,
+ @Nonnull char[] password, boolean generateProvenance,
+ @Nonnull Map<String, String> environment, @Nonnull String token,
+ @Nonnull List<String> runtime) throws IOException,
+ UnsupportedEncodingException, FileNotFoundException {
+ ProcessBuilder pb = new ProcessBuilder();
+ pb.command().add(TIME);
+ /*
+ * WARNING! HERE THERE BE DRAGONS! BE CAREFUL HERE!
+ *
+ * Work around _Maven_ bug with permissions in zip files! The executable
+ * bit is stripped by Maven's handling of file permissions, and there's
+ * no practical way to work around it without massively increasing the
+ * pain in other ways. Only want this on Unix - Windows isn't affected
+ * by this - so we use the file separator as a proxy for whether this is
+ * a true POSIX system. Ugly! Ugly ugly ugly...
+ *
+ * http://jira.codehaus.org/browse/MASSEMBLY-337 is relevant, but not
+ * the whole story as we don't want to use a non-standard packaging
+ * method as there's a real chance of it going wrong in an unexpected
+ * way then. Other parts of the story are that the executable bit isn't
+ * preserved when unpacking with the dependency plugin, and there's no
+ * way to be sure that the servlet container will preserve the bit
+ * either (as that's probably using a Java-based ZIP engine).
+ */
+ if (File.separatorChar == '/')
+ pb.command().add("/bin/sh");
+ pb.command().add(executeWorkflowCommand);
+ if (runtime != null)
+ pb.command().addAll(runtime);
+
+ // Enable verbose logging
+ pb.command().add("-logfile");
+ pb.command().add(
+ new File(new File(workingDir, "logs"), "detail.log")
+ .getAbsolutePath());
+
+ if (securityDir != null) {
+ pb.command().add(CREDENTIAL_MANAGER_DIRECTORY);
+ pb.command().add(securityDir.getAbsolutePath());
+ out.println("security dir location: " + securityDir);
+ }
+ if (password != null) {
+ pb.command().add(CREDENTIAL_MANAGER_PASSWORD);
+ out.println("password of length " + password.length
+ + " will be written to subprocess stdin");
+ }
+
+ // Add arguments denoting inputs
+ if (inputBaclava != null) {
+ pb.command().add("-inputdoc");
+ pb.command().add(inputBaclava.getAbsolutePath());
+ if (!inputBaclava.exists())
+ throw new IOException("input baclava file doesn't exist");
+ } else {
+ for (Entry<String, File> port : inputFiles.entrySet()) {
+ if (port.getValue() == null)
+ continue;
+ pb.command().add("-inputfile");
+ pb.command().add(port.getKey());
+ pb.command().add(port.getValue().getAbsolutePath());
+ if (!port.getValue().exists())
+ throw new IOException("input file for port \"" + port
+ + "\" doesn't exist");
+ }
+ for (Entry<String, String> port : inputValues.entrySet()) {
+ if (port.getValue() == null)
+ continue;
+ pb.command().add("-inputfile");
+ pb.command().add(port.getKey());
+ File f = createTempFile(".tav_in_", null, workingDir);
+ pb.command().add(f.getAbsolutePath());
+ write(f, port.getValue(), "UTF-8");
+ }
+ for (Entry<String, String> delim : inputDelimiters.entrySet()) {
+ if (delim.getValue() == null)
+ continue;
+ pb.command().add("-inputdelimiter");
+ pb.command().add(delim.getKey());
+ pb.command().add(delim.getValue());
+ }
+ }
+
+ // Add arguments denoting outputs
+ if (outputBaclava != null) {
+ pb.command().add("-outputdoc");
+ pb.command().add(outputBaclava.getAbsolutePath());
+ if (!outputBaclava.getParentFile().exists())
+ throw new IOException(
+ "parent directory of output baclava file does not exist");
+ if (outputBaclava.exists())
+ throw new IOException("output baclava file exists");
+ // Provenance cannot be supported when using baclava output
+ } else {
+ File out = new File(workingDir, "out");
+ if (!out.mkdir())
+ throw new IOException("failed to make output directory \"out\"");
+ // Taverna needs the dir to *not* exist now
+ forceDelete(out);
+ pb.command().add("-outputdir");
+ pb.command().add(out.getAbsolutePath());
+ // Enable provenance generation
+ if (generateProvenance) {
+ pb.command().add("-embedded");
+ pb.command().add("-provenance");
+ pb.command().add("-provbundle");
+ pb.command().add("out.bundle.zip");
+ }
+ }
+
+ // Add an argument holding the workflow
+ File tmp = createTempFile(".wf_", ".scufl2", workingDir);
+ try (OutputStream os = new FileOutputStream(tmp)) {
+ os.write(workflow);
+ }
+ pb.command().add(workflowFile.getAbsolutePath());
+
+ // Indicate what working directory to use
+ pb.directory(workingDir);
+ wd = workingDir;
+
+ Map<String, String> env = pb.environment();
+ for (String name : ENVIRONMENT_TO_REMOVE)
+ env.remove(name);
+
+ // Merge any options we have had imposed on us from outside
+ env.putAll(environment);
+
+ // Patch the environment to deal with TAVUTILS-17
+ assert env.get("PATH") != null;
+ env.put("PATH", new File(System.getProperty("java.home"), "bin")
+ + pathSeparator + env.get("PATH"));
+ // Patch the environment to deal with TAVSERV-189
+ env.put("TAVERNA_APPHOME", workingDir.getCanonicalPath());
+ // Patch the environment to deal with TAVSERV-224
+ env.put("TAVERNA_RUN_ID", token);
+ if (interactionHost != null || local.interactionFeedURL != null
+ || local.webdavURL != null) {
+ env.put("INTERACTION_HOST", makeInterHost(local.interactionFeedURL));
+ env.put("INTERACTION_PORT", makeInterPort(local.interactionFeedURL));
+ env.put("INTERACTION_FEED", makeInterPath(local.interactionFeedURL));
+ env.put("INTERACTION_WEBDAV",
+ local.webdavURL != null ? local.webdavURL.getPath()
+ : interactionWebdavPath);
+ String pub = makeInterPublish(local.publishURL);
+ if (pub != null && !pub.isEmpty())
+ env.put("INTERACTION_PUBLISH", pub);
+ }
+ return pb;
+ }
+
+ @Nullable
+ private static String makeInterHost(@Nullable URL url) {
+ if (url == null)
+ return interactionHost;
+ return url.getProtocol() + "://" + url.getHost();
+ }
+
+ @Nullable
+ private static String makeInterPort(@Nullable URL url) {
+ if (url == null)
+ return interactionPort;
+ int port = url.getPort();
+ if (port == -1)
+ port = url.getDefaultPort();
+ return Integer.toString(port);
+ }
+
+ @Nullable
+ private static String makeInterPublish(@Nullable URL url)
+ throws IOException {
+ if (url == null)
+ return null;
+ try {
+ URI uri = url.toURI();
+ int port = uri.getPort();
+ if (port == -1)
+ return uri.getScheme() + "://" + uri.getHost();
+ else
+ return uri.getScheme() + "://" + uri.getHost() + ":" + port;
+ } catch (URISyntaxException e) {
+ throw new IOException("problem constructing publication url", e);
+ }
+ }
+
+ @Nullable
+ private static String makeInterPath(@Nullable URL url) {
+ if (url == null)
+ return interactionFeedPath;
+ return url.getPath();
+ }
+
+ /**
+ * Kills off the subprocess if it exists and is alive.
+ */
+ @Override
+ public void killWorker() {
+ if (!finished && subprocess != null) {
+ final Holder<Integer> code = new Holder<>();
+ for (TimingOutTask tot : new TimingOutTask[] { new TimingOutTask() {
+ /** Check if the workflow terminated of its own accord */
+ @Override
+ public void doIt() throws IOException {
+ code.value = subprocess.exitValue();
+ accounting.runCeased();
+ buildUR(code.value == 0 ? Completed : Failed, code.value);
+ }
+ }, new TimingOutTask() {
+ /** Tell the workflow to stop */
+ @Override
+ public void doIt() throws IOException {
+ code.value = killNicely();
+ accounting.runCeased();
+ buildUR(code.value == 0 ? Completed : Aborted, code.value);
+ }
+ }, new TimingOutTask() {
+ /** Kill the workflow, kill it with fire */
+ @Override
+ public void doIt() throws IOException {
+ code.value = killHard();
+ accounting.runCeased();
+ buildUR(code.value == 0 ? Completed : Aborted, code.value);
+ }
+ } }) {
+ try {
+ tot.doOrTimeOut(DEATH_TIME);
+ } catch (Exception e) {
+ }
+ if (code.value != null)
+ break;
+ }
+ finished = true;
+ setExitCode(code.value);
+ readyToSendEmail = true;
+ }
+ }
+
+ /**
+ * Integrated spot to handle writing/logging of the exit code.
+ *
+ * @param code
+ * The exit code.
+ */
+ private void setExitCode(int code) {
+ exitCode = code;
+ if (code > 256 - 8) {
+ out.println("workflow aborted, Raven issue = " + (code - 256));
+ } else if (code > 128) {
+ out.println("workflow aborted, signal=" + (code - 128));
+ } else {
+ out.println("workflow exited, code=" + code);
+ }
+ }
+
+ @Nonnull
+ private JobUsageRecord newUR() throws DatatypeConfigurationException {
+ try {
+ if (wd != null)
+ return new JobUsageRecord(wd.getName());
+ } catch (RuntimeException e) {
+ }
+ return new JobUsageRecord("unknown");
+ }
+
+ /**
+ * Fills in the accounting information from the exit code and stderr.
+ *
+ * @param exitCode
+ * The exit code from the program.
+ */
+ private void buildUR(@Nonnull Status status, int exitCode) {
+ try {
+ Date now = new Date();
+ long user = -1, sys = -1, real = -1;
+ Matcher m = TimeRE.matcher(stderr.toString());
+ ur = newUR();
+ while (m.find())
+ for (int i = 1; i < 6; i += 2)
+ if (m.group(i + 1).equals("user"))
+ user = parseDuration(m.group(i));
+ else if (m.group(i + 1).equals("sys")
+ || m.group(i + 1).equals("system"))
+ sys = parseDuration(m.group(i));
+ else if (m.group(i + 1).equals("real")
+ || m.group(i + 1).equals("elapsed"))
+ real = parseDuration(m.group(i));
+ if (user != -1)
+ ur.addCpuDuration(user).setUsageType("user");
+ if (sys != -1)
+ ur.addCpuDuration(sys).setUsageType("system");
+ ur.addUser(System.getProperty("user.name"), null);
+ ur.addStartAndEnd(start, now);
+ if (real != -1)
+ ur.addWallDuration(real);
+ else
+ ur.addWallDuration(now.getTime() - start.getTime());
+ ur.setStatus(status.toString());
+ ur.addHost(getLocalHost().getHostName());
+ ur.addResource("exitcode", Integer.toString(exitCode));
+ ur.addDisk(sizeOfDirectory(wd)).setStorageUnit("B");
+ if (urreceiver != null)
+ urreceiver.acceptUsageRecord(ur.marshal());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private long parseDuration(@Nonnull String durationString) {
+ try {
+ return (long) (parseDouble(durationString) * 1000);
+ } catch (NumberFormatException nfe) {
+ // Not a double; maybe MM:SS.mm or HH:MM:SS.mm
+ }
+ long dur = 0;
+ for (String d : durationString.split(":"))
+ try {
+ dur = 60 * dur + parseLong(d);
+ } catch (NumberFormatException nfe) {
+ // Assume that only one thing is fractional, and that it is last
+ return 60000 * dur + (long) (parseDouble(d) * 1000);
+ }
+ return dur * 1000;
+ }
+
+ private void signal(@Nonnull String signal) throws Exception {
+ int pid = getPID();
+ if (pid > 0
+ && getRuntime().exec("kill -" + signal + " " + pid).waitFor() == 0)
+ return;
+ throw new Exception("failed to send signal " + signal + " to process "
+ + pid);
+ }
+
+ @Nullable
+ private Integer killNicely() {
+ try {
+ signal("TERM");
+ return subprocess.waitFor();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Nullable
+ private Integer killHard() {
+ try {
+ signal("QUIT");
+ return subprocess.waitFor();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * Move the worker out of the stopped state and back to operating.
+ *
+ * @throws Exception
+ * if it fails.
+ */
+ @Override
+ public void startWorker() throws Exception {
+ signal("CONT");
+ stopped = false;
+ }
+
+ /**
+ * Move the worker into the stopped state from the operating state.
+ *
+ * @throws Exception
+ * if it fails.
+ */
+ @Override
+ public void stopWorker() throws Exception {
+ signal("STOP");
+ stopped = true;
+ }
+
+ /**
+ * @return The status of the workflow run. Note that this can be an
+ * expensive operation.
+ */
+ @Override
+ public RemoteStatus getWorkerStatus() {
+ if (subprocess == null)
+ return Initialized;
+ if (finished)
+ return Finished;
+ try {
+ setExitCode(subprocess.exitValue());
+ } catch (IllegalThreadStateException e) {
+ if (stopped)
+ return Stopped;
+ return Operating;
+ }
+ finished = true;
+ readyToSendEmail = true;
+ accounting.runCeased();
+ buildUR(exitCode.intValue() == 0 ? Completed : Failed, exitCode);
+ return Finished;
+ }
+
+ @Override
+ public String getConfiguration() {
+ return "";
+ }
+
+ @Override
+ public String getName() {
+ return DEFAULT_LISTENER_NAME;
+ }
+
+ @Override
+ public String getProperty(String propName) throws RemoteException {
+ switch (Property.is(propName)) {
+ case STDOUT:
+ return stdout.toString();
+ case STDERR:
+ return stderr.toString();
+ case EXIT_CODE:
+ return (exitCode == null) ? "" : exitCode.toString();
+ case EMAIL:
+ return emailAddress;
+ case READY_TO_NOTIFY:
+ return Boolean.toString(readyToSendEmail);
+ case USAGE:
+ try {
+ JobUsageRecord toReturn;
+ if (subprocess == null) {
+ toReturn = newUR();
+ toReturn.setStatus(Held.toString());
+ } else if (ur == null) {
+ toReturn = newUR();
+ toReturn.setStatus(Started.toString());
+ toReturn.addStartAndEnd(start, new Date());
+ toReturn.addUser(System.getProperty("user.name"), null);
+ } else {
+ toReturn = ur;
+ }
+ /*
+ * Note that this record is not to be pushed to the server. That
+ * is done elsewhere (when a proper record is produced)
+ */
+ return toReturn.marshal();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return "";
+ }
+ default:
+ throw new RemoteException("unknown property");
+ }
+ }
+
+ @Override
+ public String getType() {
+ return DEFAULT_LISTENER_NAME;
+ }
+
+ @Override
+ public String[] listProperties() {
+ return Property.names();
+ }
+
+ @Override
+ public void setProperty(String propName, String value)
+ throws RemoteException {
+ switch (Property.is(propName)) {
+ case EMAIL:
+ emailAddress = value;
+ return;
+ case READY_TO_NOTIFY:
+ readyToSendEmail = parseBoolean(value);
+ return;
+ case STDOUT:
+ case STDERR:
+ case EXIT_CODE:
+ case USAGE:
+ throw new RemoteException("property is read only");
+ default:
+ throw new RemoteException("unknown property");
+ }
+ }
+
+ @Override
+ public RemoteListener getDefaultListener() {
+ return this;
+ }
+
+ @Override
+ public void setURReceiver(@Nonnull UsageRecordReceiver receiver) {
+ urreceiver = receiver;
+ }
+
+ @Override
+ public void deleteLocalResources() throws ImplementationException {
+ try {
+ if (workflowFile != null && workflowFile.getParentFile().exists())
+ forceDelete(workflowFile);
+ } catch (IOException e) {
+ throw new ImplementationException("problem deleting workflow file",
+ e);
+ }
+ }
+}
+
+/**
+ * An engine for asynchronously copying from an {@link InputStream} to a
+ * {@link Writer}.
+ *
+ * @author Donal Fellows
+ */
+class AsyncCopy extends Thread {
+ @Nonnull
+ private BufferedReader from;
+ @Nonnull
+ private Writer to;
+ @Nullable
+ private Holder<Integer> pidHolder;
+
+ AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to)
+ throws UnsupportedEncodingException {
+ this(from, to, null);
+ }
+
+ AsyncCopy(@Nonnull InputStream from, @Nonnull Writer to,
+ @Nullable Holder<Integer> pid) throws UnsupportedEncodingException {
+ this.from = new BufferedReader(new InputStreamReader(from,
+ SYSTEM_ENCODING));
+ this.to = to;
+ this.pidHolder = pid;
+ setDaemon(true);
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (pidHolder != null) {
+ String line = from.readLine();
+ if (line.matches("^pid:\\d+$"))
+ synchronized (pidHolder) {
+ pidHolder.value = parseInt(line.substring(4));
+ }
+ else
+ to.write(line + System.getProperty("line.separator"));
+ }
+ copy(from, to);
+ } catch (IOException e) {
+ }
+ }
+}
+
+/**
+ * A helper for asynchronously writing a password to a subprocess's stdin.
+ *
+ * @author Donal Fellows
+ */
+class PasswordWriterThread extends Thread {
+ private OutputStream to;
+ private char[] chars;
+
+ PasswordWriterThread(@Nonnull Process to, @Nonnull char[] chars) {
+ this.to = to.getOutputStream();
+ assert chars != null;
+ this.chars = chars;
+ setDaemon(true);
+ start();
+ }
+
+ @Override
+ public void run() {
+ try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(to,
+ SYSTEM_ENCODING))) {
+ pw.println(chars);
+ } catch (UnsupportedEncodingException e) {
+ // Not much we can do here
+ e.printStackTrace();
+ } finally {
+ /*
+ * We don't trust GC to clear password from memory. We also take
+ * care not to clear the default password!
+ */
+ if (chars != KEYSTORE_PASSWORD)
+ Arrays.fill(chars, '\00');
+ }
+ }
+}
+
+enum Property {
+ STDOUT("stdout"), STDERR("stderr"), EXIT_CODE("exitcode"), READY_TO_NOTIFY(
+ "readyToNotify"), EMAIL("notificationAddress"), USAGE("usageRecord");
+
+ private String s;
+
+ private Property(String s) {
+ this.s = s;
+ pmap.put(s, this);
+ }
+
+ @Override
+ public String toString() {
+ return s;
+ }
+
+ public static Property is(@Nonnull String s) {
+ return pmap.get(s);
+ }
+
+ @Nonnull
+ public static String[] names() {
+ return pmap.keySet().toArray(new String[pmap.size()]);
+ }
+}
+
+enum Status {
+ Aborted, Completed, Failed, Held, Queued, Started, Suspended
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java
----------------------------------------------------------------------
diff --git a/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java b/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java
new file mode 100644
index 0000000..fbc3a72
--- /dev/null
+++ b/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/FilenameVerifier.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright (C) 2010-2011 The University of Manchester
+ *
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.localworker.impl.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility class that handles filename validation on different target platforms.
+ *
+ * @author Donal Fellows.
+ */
+public abstract class FilenameVerifier {
+ private FilenameVerifier(){}
+
+ static final boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().contains("win");
+
+ @SuppressWarnings("serial")
+ private static final Set<String> ILLEGAL_NAMES = new HashSet<String>(){{
+ add("");
+ add("..");
+ add(".");
+ if (IS_WINDOWS) {
+ add("con");
+ add("prn");
+ add("nul");
+ add("aux");
+ for (int i = 1; i <= 9; i++) {
+ add("com" + i);
+ add("lpt" + i);
+ }
+ }
+ }};
+ @SuppressWarnings("serial")
+ private static final Set<Character> ILLEGAL_CHARS = new HashSet<Character>(){{
+ add('/');
+ for (char i=0 ; i<32 ; i++)
+ add(i);
+ if (IS_WINDOWS) {
+ add('\\');
+ add('>');
+ add('<');
+ add(':');
+ add('"');
+ add('|');
+ add('?');
+ add('*');
+ } else {
+ add(' '); // whitespace; too much trouble from these
+ add('\t');
+ add('\r');
+ add('\n');
+ }
+ }};
+ @SuppressWarnings("serial")
+ private static final Set<String> ILLEGAL_PREFIXES = new HashSet<String>(){{
+ if (IS_WINDOWS) {
+ add("con.");
+ add("prn.");
+ add("nul.");
+ add("aux.");
+ for (int i = 1; i <= 9; i++) {
+ add("com" + i + ".");
+ add("lpt" + i + ".");
+ }
+ }
+ }};
+ @SuppressWarnings("serial")
+ private static final Set<String> ILLEGAL_SUFFIXES = new HashSet<String>(){{
+ if (IS_WINDOWS) {
+ add(" ");
+ add(".");
+ }
+ }};
+
+ /**
+ * Construct a file handle, applying platform-specific filename validation
+ * rules in the process.
+ *
+ * @param dir
+ * The directory acting as a root, which is assumed to be
+ * correctly named. May be <tt>null</tt>.
+ * @param names
+ * The names of filename fragments to apply the checks to. Must
+ * have at least one value.
+ * @return The file handle. Never <tt>null</tt>.
+ * @throws IOException
+ * If validation fails.
+ */
+ public static File getValidatedFile(File dir, String... names)
+ throws IOException {
+ if (names.length == 0)
+ throw new IOException("empty filename");
+ File f = dir;
+ for (String name : names) {
+ String low = name.toLowerCase();
+ if (ILLEGAL_NAMES.contains(low))
+ throw new IOException("illegal filename");
+ for (char c : ILLEGAL_CHARS)
+ if (low.indexOf(c) >= 0)
+ throw new IOException("illegal filename");
+ for (String s : ILLEGAL_PREFIXES)
+ if (low.startsWith(s))
+ throw new IOException("illegal filename");
+ for (String s : ILLEGAL_SUFFIXES)
+ if (low.endsWith(s))
+ throw new IOException("illegal filename");
+ f = new File(f, name);
+ }
+ assert f != null;
+ return f;
+ }
+
+ /**
+ * Create a file handle where the underlying file must exist.
+ *
+ * @param dir
+ * The directory that will contain the file.
+ * @param name
+ * The name of the file; will be validated.
+ * @return The handle.
+ * @throws IOException
+ * If validation fails or the file doesn't exist.
+ */
+ public static File getValidatedExistingFile(File dir, String name)
+ throws IOException {
+ File f = getValidatedFile(dir, name);
+ if (!f.exists())
+ throw new IOException("doesn't exist");
+ return f;
+ }
+
+ /**
+ * Create a file handle where the underlying file must <i>not</i> exist.
+ *
+ * @param dir
+ * The directory that will contain the file.
+ * @param name
+ * The name of the file; will be validated.
+ * @return The handle. The file will not be created by this method.
+ * @throws IOException
+ * If validation fails or the file does exist.
+ */
+ public static File getValidatedNewFile(File dir, String name)
+ throws IOException {
+ File f = getValidatedFile(dir, name);
+ if (f.exists())
+ throw new IOException("already exists");
+ return f;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java
----------------------------------------------------------------------
diff --git a/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java b/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java
new file mode 100644
index 0000000..3dd3ac1
--- /dev/null
+++ b/server-worker/src/main/java/org/taverna/server/localworker/impl/utils/TimingOutTask.java
@@ -0,0 +1,40 @@
+package org.taverna.server.localworker.impl.utils;
+
+import javax.annotation.Nullable;
+
+/**
+ * A class that handles running a task that can take some time.
+ *
+ * @author Donal Fellows
+ *
+ */
+public abstract class TimingOutTask extends Thread {
+ public abstract void doIt() throws Exception;
+
+ @Nullable
+ private Exception ioe;
+
+ @Override
+ public final void run() {
+ try {
+ doIt();
+ } catch (Exception ioe) {
+ this.ioe = ioe;
+ }
+ }
+
+ public TimingOutTask() {
+ this.setDaemon(true);
+ }
+
+ public void doOrTimeOut(long timeout) throws Exception {
+ start();
+ try {
+ join(timeout);
+ } catch (InterruptedException e) {
+ interrupt();
+ }
+ if (ioe != null)
+ throw ioe;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/main/resources/security.policy
----------------------------------------------------------------------
diff --git a/server-worker/src/main/resources/security.policy b/server-worker/src/main/resources/security.policy
new file mode 100644
index 0000000..5b5c322
--- /dev/null
+++ b/server-worker/src/main/resources/security.policy
@@ -0,0 +1,11 @@
+//keystore "signers.jks";
+//grant signedBy "taverna" {
+// permission java.util.PropertyPermission "*", "read,write";
+// permission java.lang.RuntimePermission "shutdownHooks";
+// permission java.lang.RuntimePermission "exitVM";
+// permission java.io.FilePermission "<<ALL FILES>>", "read,write,execute,delete";
+// permission java.net.SocketPermission "localhost:1024-" "accept,connect,listen";
+//};
+grant {
+ permission java.security.AllPermission "*:*";
+};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java
----------------------------------------------------------------------
diff --git a/server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java b/server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java
new file mode 100644
index 0000000..7bcd92e
--- /dev/null
+++ b/server-worker/src/test/java/org/taverna/server/localworker/impl/LocalWorkerTest.java
@@ -0,0 +1,551 @@
+/*
+ * Copyright (C) 2010-2012 The University of Manchester
+ *
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.localworker.impl;
+
+import static java.util.UUID.randomUUID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.taverna.server.localworker.impl.LocalWorker.DO_MKDIR;
+
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.taverna.server.localworker.api.Worker;
+import org.taverna.server.localworker.api.WorkerFactory;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.server.UsageRecordReceiver;
+
+public class LocalWorkerTest {
+ LocalWorker lw;
+ static List<String> events;
+
+ public static RemoteStatus returnThisStatus = RemoteStatus.Operating;
+
+ static class DummyWorker implements Worker {
+ @Override
+ public RemoteListener getDefaultListener() {
+ return new RemoteListener() {
+ @Override
+ public String getConfiguration() {
+ return "RLCONFIG";
+ }
+
+ @Override
+ public String getName() {
+ return "RLNAME";
+ }
+
+ @Override
+ public String getProperty(String propName) {
+ return "RLPROP[" + propName + "]";
+ }
+
+ @Override
+ public String getType() {
+ return "RLTYPE";
+ }
+
+ @Override
+ public String[] listProperties() {
+ return new String[] { "RLP1", "RLP2" };
+ }
+
+ @Override
+ public void setProperty(String propName, String value) {
+ events.add("setProperty[");
+ events.add(propName);
+ events.add(value);
+ events.add("]");
+ }
+ };
+ }
+
+ @Override
+ public RemoteStatus getWorkerStatus() {
+ events.add("status=" + returnThisStatus);
+ return returnThisStatus;
+ }
+
+ @Override
+ public boolean initWorker(LocalWorker local,
+ String executeWorkflowCommand, byte[] workflow,
+ File workingDir, File inputBaclava,
+ Map<String, File> inputFiles, Map<String, String> inputValues,
+ Map<String, String> delimiters, File outputBaclava, File cmdir,
+ char[] cmpass, boolean doprov, Map<String, String> env,
+ String id, List<String> conf) throws Exception {
+ events.add("init[");
+ events.add(executeWorkflowCommand);
+ events.add(new String(workflow, "UTF-8"));
+ int dirLen = workingDir.getName().length();
+ events.add(Integer.toString(dirLen));
+ events.add(inputBaclava == null ? "<null>" : inputBaclava
+ .toString().substring(dirLen));
+ Map<String, String> in = new TreeMap<>();
+ for (Entry<String, File> name : inputFiles.entrySet())
+ in.put(name.getKey(), name.getValue() == null ? "<null>" : name
+ .getValue().getName());
+ events.add(in.toString());
+ events.add(new TreeMap<>(inputValues).toString());
+ events.add(outputBaclava == null ? "<null>" : outputBaclava
+ .getName());
+ // TODO: check cmdir and cmpass
+ // TODO: check doprov
+ // TODO: log env
+ // TODO: check delimiters
+ events.add("]");
+ return true;
+ }
+
+ @Override
+ public void killWorker() throws Exception {
+ events.add("kill");
+ }
+
+ @Override
+ public void startWorker() throws Exception {
+ events.add("start");
+ }
+
+ @Override
+ public void stopWorker() throws Exception {
+ events.add("stop");
+ }
+
+ @Override
+ public void setURReceiver(UsageRecordReceiver receiver) {
+ // We just ignore this
+ }
+
+ @Override
+ public void deleteLocalResources() throws ImplementationException {
+ // Nothing to do here
+ }
+ }
+
+ WorkerFactory factory = new WorkerFactory() {
+ @Override
+ public Worker makeInstance() throws Exception {
+ return new DummyWorker();
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ lw = new LocalWorker("XWC", "WF".getBytes("UTF-8"), null, randomUUID(),
+ new HashMap<String, String>(), new ArrayList<String>(), factory);
+ events = new ArrayList<>();
+ returnThisStatus = RemoteStatus.Operating;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ lw.destroy();
+ }
+
+ private List<String> l(String... strings) {
+ return Arrays.asList(strings);
+ }
+
+ // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+
+ @Test
+ public void testDestroy1() throws Exception {
+ lw.destroy();
+ assertEquals(l(), events);
+ }
+
+ @Test
+ public void testDestroy2() throws Exception {
+ lw.setStatus(RemoteStatus.Operating);
+ lw.destroy();
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "kill"), events);
+ }
+
+ @Test
+ public void testDestroy3() throws Exception {
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Stopped);
+ lw.destroy();
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "stop", "kill"), events);
+ }
+
+ @Test
+ public void testDestroy4() throws Exception {
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Finished);
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "kill"), events);
+ lw.destroy();
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "kill"), events);
+ }
+
+ @Test
+ public void testAddListener() {
+ Throwable t = null;
+ try {
+ lw.addListener(null);
+ } catch (Throwable caught) {
+ t = caught;
+ }
+ assertNotNull(t);
+ assertSame(ImplementationException.class, t.getClass());
+ assertNotNull(t.getMessage());
+ assertEquals("not implemented", t.getMessage());
+ }
+
+ @Test
+ public void testGetInputBaclavaFile() throws Exception {
+ assertNull(lw.getInputBaclavaFile());
+ lw.setInputBaclavaFile("IBaclava");
+ assertNotNull(lw.getInputBaclavaFile());
+ assertEquals("IBaclava", lw.getInputBaclavaFile());
+ lw.makeInput("FOO").setValue("BAR");
+ assertNull(lw.getInputBaclavaFile());
+ }
+
+ @Test
+ public void testGetInputsWithValue() throws Exception {
+ assertEquals(0, lw.getInputs().size());
+
+ lw.makeInput("FOO").setValue("BAR");
+
+ assertEquals(1, lw.getInputs().size());
+ assertEquals("FOO", lw.getInputs().get(0).getName());
+ assertNull(lw.getInputs().get(0).getFile());
+ assertNotNull(lw.getInputs().get(0).getValue());
+
+ lw.setInputBaclavaFile("BLAH");
+
+ assertEquals(1, lw.getInputs().size());
+ assertNull(lw.getInputs().get(0).getFile());
+ assertNull(lw.getInputs().get(0).getValue());
+ }
+
+ @Test
+ public void testGetInputsWithFile() throws Exception {
+ assertEquals(0, lw.getInputs().size());
+
+ lw.makeInput("BAR").setFile("FOO");
+
+ assertEquals(1, lw.getInputs().size());
+ assertEquals("BAR", lw.getInputs().get(0).getName());
+ assertNotNull(lw.getInputs().get(0).getFile());
+ assertNull(lw.getInputs().get(0).getValue());
+
+ lw.setInputBaclavaFile("BLAH");
+
+ assertEquals(1, lw.getInputs().size());
+ assertNull(lw.getInputs().get(0).getFile());
+ assertNull(lw.getInputs().get(0).getValue());
+ }
+
+ @Test
+ public void testGetListenerTypes() {
+ assertEquals("[]", lw.getListenerTypes().toString());
+ }
+
+ @Test
+ public void testGetListeners() throws Exception {
+ assertEquals(1, lw.getListeners().size());
+ RemoteListener rl = lw.getListeners().get(0);
+ assertEquals("RLNAME", rl.getName());
+ assertEquals("RLCONFIG", rl.getConfiguration());
+ assertEquals("RLTYPE", rl.getType());
+ assertEquals("[RLP1, RLP2]", Arrays.asList(rl.listProperties())
+ .toString());
+ assertEquals("RLPROP[RLP1]", rl.getProperty("RLP1"));
+ assertEquals("RLPROP[RLP2]", rl.getProperty("RLP2"));
+ rl.setProperty("FOOBAR", "BARFOO");
+ assertEquals(l("setProperty[", "FOOBAR", "BARFOO", "]"), events);
+ }
+
+ @Test
+ public void testGetOutputBaclavaFile() throws Exception {
+ assertNull(lw.getOutputBaclavaFile());
+ lw.setOutputBaclavaFile("notnull");
+ assertEquals("notnull", lw.getOutputBaclavaFile());
+ lw.setOutputBaclavaFile(null);
+ assertNull(lw.getOutputBaclavaFile());
+ }
+
+ @Test
+ public void testGetSecurityContext() throws Exception {
+ boolean md = DO_MKDIR;
+ LocalWorker.DO_MKDIR = false; // HACK! Work around Hudson problem...
+ try {
+ assertNotNull(lw.getSecurityContext());
+ } finally {
+ LocalWorker.DO_MKDIR = md;
+ }
+ }
+
+ @Test
+ public void testGetStatusInitial() {
+ assertEquals(RemoteStatus.Initialized, lw.getStatus());
+ }
+
+ @Test
+ public void testGetStatus() throws Exception {
+ assertEquals(RemoteStatus.Initialized, lw.getStatus());
+ returnThisStatus = RemoteStatus.Operating;
+ assertEquals(RemoteStatus.Initialized, lw.getStatus());
+ lw.setStatus(RemoteStatus.Operating);
+ assertEquals(RemoteStatus.Operating, lw.getStatus());
+ assertEquals(RemoteStatus.Operating, lw.getStatus());
+ returnThisStatus = RemoteStatus.Finished;
+ assertEquals(RemoteStatus.Finished, lw.getStatus());
+ returnThisStatus = RemoteStatus.Stopped;
+ assertEquals(RemoteStatus.Finished, lw.getStatus());
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "status=Operating", "status=Operating",
+ "status=Finished"), events);
+ }
+
+ @Test
+ public void testGetWorkingDirectory() throws Exception {
+ RemoteDirectory rd = lw.getWorkingDirectory();
+ assertNotNull(rd);
+ assertNotNull(rd.getContents());
+ assertNull(rd.getContainingDirectory());
+ assertNotNull(rd.getName());
+ assertEquals(-1, rd.getName().indexOf('/'));
+ assertFalse("..".equals(rd.getName()));
+ assertEquals("", rd.getName());
+ }
+
+ @Test
+ public void testValidateFilename() throws Exception {
+ lw.validateFilename("foobar");
+ lw.validateFilename("foo/bar");
+ lw.validateFilename("foo.bar");
+ lw.validateFilename("foo..bar");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad0() throws Exception {
+ lw.validateFilename("./.");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad1() throws Exception {
+ lw.validateFilename("/");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad2() throws Exception {
+ lw.validateFilename("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad3() throws Exception {
+ lw.validateFilename(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad4() throws Exception {
+ lw.validateFilename("..");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidateFilenameBad5() throws Exception {
+ lw.validateFilename("foo/../bar");
+ }
+
+ @Test
+ public void testMakeInput() throws Exception {
+ assertEquals(0, lw.getInputs().size());
+
+ RemoteInput ri = lw.makeInput("TEST");
+
+ assertNotNull(ri);
+ assertEquals(1, lw.getInputs().size());
+ assertNotSame(ri, lw.getInputs().get(0)); // different delegates
+ assertEquals("TEST", ri.getName());
+ assertNull(ri.getFile());
+ assertNull(ri.getValue());
+
+ lw.setInputBaclavaFile("bad");
+ ri.setFile("good");
+ assertEquals("good", ri.getFile());
+ assertNull(lw.getInputBaclavaFile());
+ ri.setValue("very good");
+ assertEquals("very good", ri.getValue());
+ assertNull(ri.getFile());
+ assertNull(lw.getInputBaclavaFile());
+
+ lw.makeInput("TEST2");
+ assertEquals(2, lw.getInputs().size());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMakeInputFileSanity() throws Exception {
+ lw.makeInput("foo").setFile("/../bar");
+ }
+
+ @Test
+ public void testMakeListener() {
+ Throwable t = null;
+ try {
+ lw.makeListener("?", "?");
+ } catch (Throwable caught) {
+ t = caught;
+ }
+ assertNotNull(t);
+ assertSame(RemoteException.class, t.getClass());
+ assertNotNull(t.getMessage());
+ assertEquals("listener manufacturing unsupported", t.getMessage());
+ }
+
+ @Test
+ public void testSetInputBaclavaFile1() throws Exception {
+ assertNull(lw.getInputBaclavaFile());
+ lw.setInputBaclavaFile("eg");
+ assertEquals("eg", lw.getInputBaclavaFile());
+ }
+
+ @Test
+ public void testSetInputBaclavaFile2() throws Exception {
+ RemoteInput ri = lw.makeInput("foo");
+ ri.setValue("bar");
+ assertEquals("bar", ri.getValue());
+ lw.setInputBaclavaFile("eg");
+ assertNull(ri.getValue());
+ }
+
+ @Test
+ public void testSetOutputBaclavaFile1() throws Exception {
+ assertNull(lw.outputBaclava);
+ lw.setOutputBaclavaFile("foobar");
+ assertEquals("foobar", lw.outputBaclava);
+ assertEquals("foobar", lw.getOutputBaclavaFile());
+ lw.setOutputBaclavaFile("foo/bar");
+ assertEquals("foo/bar", lw.outputBaclava);
+ assertEquals("foo/bar", lw.getOutputBaclavaFile());
+ lw.setOutputBaclavaFile(null);
+ assertNull(lw.outputBaclava);
+ assertNull(lw.getOutputBaclavaFile());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetOutputBaclavaFile2() throws Exception {
+ lw.setOutputBaclavaFile("/foobar");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetOutputBaclavaFile3() throws Exception {
+ lw.setOutputBaclavaFile("foo/../bar");
+ }
+
+ @Test
+ public void testSetStatus0() throws Exception {
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Stopped);
+ lw.setStatus(RemoteStatus.Stopped);
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Finished);
+ lw.setStatus(RemoteStatus.Finished);
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "stop", "start", "kill"), events);
+ }
+
+ @Test
+ public void testSetStatus1() throws Exception {
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Stopped);
+ lw.setStatus(RemoteStatus.Finished);
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>", "{}", "{}", "<null>",
+ "]", "stop", "kill"), events);
+ }
+
+ @Test
+ public void testSetStatus2() throws Exception {
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Finished);
+ assertEquals(l(), events);
+ }
+
+ @Test(expected = IllegalStateTransitionException.class)
+ public void testSetStatus3() throws Exception {
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Finished);
+ lw.setStatus(RemoteStatus.Initialized);
+ }
+
+ @Test(expected = IllegalStateTransitionException.class)
+ public void testSetStatus4() throws Exception {
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Initialized);
+ }
+
+ @Test(expected = IllegalStateTransitionException.class)
+ public void testSetStatus5() throws Exception {
+ lw.setStatus(RemoteStatus.Initialized);
+ lw.setStatus(RemoteStatus.Stopped);
+ }
+
+ @Test(expected = IllegalStateTransitionException.class)
+ public void testSetStatus6() throws Exception {
+ lw.setStatus(RemoteStatus.Finished);
+ lw.setStatus(RemoteStatus.Stopped);
+ }
+
+ @Test(expected = IllegalStateTransitionException.class)
+ public void testSetStatus7() throws Exception {
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Stopped);
+ lw.setStatus(RemoteStatus.Initialized);
+ }
+
+ @Test
+ public void testLifecycle() throws Exception {
+ lw.makeInput("foo").setFile("foofile");
+ lw.makeInput("bar").setValue("barvalue");
+ lw.setOutputBaclavaFile("spong");
+ lw.setOutputBaclavaFile("boo");
+ lw.setStatus(RemoteStatus.Operating);
+ lw.setStatus(RemoteStatus.Finished);
+ // Assumes order of map, so fragile but works...
+ assertEquals(
+ l("init[", "XWC", "WF", "36", "<null>",
+ "{bar=<null>, foo=foofile}",
+ "{bar=barvalue, foo=null}", "boo", "]", "kill"), events);
+ }
+}