You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2018/01/09 23:30:43 UTC

[27/42] incubator-taverna-server git commit: package org.taverna -> org.apache.taverna

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
new file mode 100644
index 0000000..e32dcca
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
@@ -0,0 +1,475 @@
+/*
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.io.File.separator;
+import static java.lang.System.getProperty;
+import static java.rmi.registry.Registry.REGISTRY_PORT;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+import static org.taverna.server.master.defaults.Default.EXTRA_ARGUMENTS;
+import static org.taverna.server.master.defaults.Default.PASSWORD_FILE;
+import static org.taverna.server.master.defaults.Default.REGISTRY_JAR;
+import static org.taverna.server.master.defaults.Default.RMI_PREFIX;
+import static org.taverna.server.master.defaults.Default.RUN_LIFE_MINUTES;
+import static org.taverna.server.master.defaults.Default.RUN_OPERATING_LIMIT;
+import static org.taverna.server.master.defaults.Default.SECURE_FORK_IMPLEMENTATION_JAR;
+import static org.taverna.server.master.defaults.Default.SERVER_WORKER_IMPLEMENTATION_JAR;
+import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_POLL_SLEEP;
+import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_WAIT;
+import static org.taverna.server.master.localworker.PersistedState.KEY;
+import static org.taverna.server.master.localworker.PersistedState.makeInstance;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URI;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.defaults.Default;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.worker.WorkerModel;
+
+/**
+ * The persistent state of a local worker factory.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class LocalWorkerState extends JDOSupport<PersistedState> implements
+		WorkerModel {	
+	public LocalWorkerState() {
+		super(PersistedState.class);
+	}
+
+	private LocalWorkerState self;
+
+	@Required
+	public void setSelf(LocalWorkerState self) {
+		this.self = self;
+	}
+
+	/** Initial lifetime of runs, in minutes. */
+	int defaultLifetime;
+	/**
+	 * Maximum number of runs to exist at once. Note that this includes when
+	 * they are just existing for the purposes of file transfer (
+	 * {@link Status#Initialized}/{@link Status#Finished} states).
+	 */
+	int maxRuns;
+	/**
+	 * Prefix to use for RMI names.
+	 */
+	String factoryProcessNamePrefix;
+	/**
+	 * Full path name of the script used to start running a workflow; normally
+	 * expected to be "<i>somewhere/</i><tt>executeWorkflow.sh</tt>".
+	 */
+	String executeWorkflowScript;
+	/** Default value for {@link #executeWorkflowScript}. */
+	private transient String defaultExecuteWorkflowScript;
+	/**
+	 * Full path name of the file containing the password used to launch workers
+	 * as other users. The file is normally expected to contain a single line,
+	 * the password, and to be thoroughly locked down so only the user running
+	 * the server (e.g., "<tt>tomcat</tt>") can read it; it will probably reside
+	 * in either the user's home directory or in a system configuration
+	 * directory.
+	 */
+	String passwordFile;
+	/** Default value for {@link #passwordFile}. */
+	private transient String defaultPasswordFile = PASSWORD_FILE;
+	/**
+	 * The extra arguments to pass to the subprocess.
+	 */
+	String[] extraArgs;
+	/**
+	 * How long to wait for subprocess startup, in seconds.
+	 */
+	int waitSeconds;
+	/**
+	 * Polling interval to use during startup, in milliseconds.
+	 */
+	int sleepMS;
+	/**
+	 * Full path name to the worker process's implementation JAR.
+	 */
+	String serverWorkerJar;
+	private static final String DEFAULT_WORKER_JAR = LocalWorkerState.class
+			.getClassLoader().getResource(SERVER_WORKER_IMPLEMENTATION_JAR)
+			.getFile();
+	/**
+	 * Full path name to the Java binary to use to run the subprocess.
+	 */
+	String javaBinary;
+	private static final String DEFAULT_JAVA_BINARY = getProperty("java.home")
+			+ separator + "bin" + separator + "java";
+	/**
+	 * Full path name to the secure fork process's implementation JAR.
+	 */
+	String serverForkerJar;
+	private static final String DEFAULT_FORKER_JAR = LocalWorkerState.class
+			.getClassLoader().getResource(SECURE_FORK_IMPLEMENTATION_JAR)
+			.getFile();
+
+	String registryHost;
+	int registryPort;
+
+	int operatingLimit;
+
+	URI[] permittedWorkflows;
+	private String registryJar;
+	private static final String DEFAULT_REGISTRY_JAR = LocalWorkerState.class
+			.getClassLoader().getResource(REGISTRY_JAR).getFile();
+
+	@Override
+	public void setDefaultLifetime(int defaultLifetime) {
+		this.defaultLifetime = defaultLifetime;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public int getDefaultLifetime() {
+		return defaultLifetime < 1 ? RUN_LIFE_MINUTES : defaultLifetime;
+	}
+
+	@Override
+	public void setMaxRuns(int maxRuns) {
+		this.maxRuns = maxRuns;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public int getMaxRuns() {
+		return maxRuns < 1 ? Default.RUN_COUNT_MAX : maxRuns;
+	}
+
+	@Override
+	public int getOperatingLimit() {
+		return operatingLimit < 1 ? RUN_OPERATING_LIMIT : operatingLimit;
+	}
+
+	@Override
+	public void setOperatingLimit(int operatingLimit) {
+		this.operatingLimit = operatingLimit;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) {
+		this.factoryProcessNamePrefix = factoryProcessNamePrefix;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getFactoryProcessNamePrefix() {
+		return factoryProcessNamePrefix == null ? RMI_PREFIX
+				: factoryProcessNamePrefix;
+	}
+
+	@Override
+	public void setExecuteWorkflowScript(String executeWorkflowScript) {
+		this.executeWorkflowScript = executeWorkflowScript;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getExecuteWorkflowScript() {
+		return executeWorkflowScript == null ? defaultExecuteWorkflowScript
+				: executeWorkflowScript;
+	}
+
+	private static String guessWorkflowScript() {
+		File utilDir = new File(DEFAULT_WORKER_JAR).getParentFile();
+		File[] dirs = utilDir.listFiles(new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				// Support both taverna-commandline* (2.5) and
+				// taverna-command-line* (3.1)
+				return name.toLowerCase().startsWith("taverna-command");
+			}
+		});
+		if (dirs.length == 0) { 
+			throw new IllegalStateException("Can't find taverna-command* distro in " + utilDir);
+		}
+		File script = new File(dirs[0], "executeworkflow.sh");
+		if (! script.isFile()) {
+			throw new IllegalStateException("Can't find launcher script " + script);
+		}
+		return script.toString();
+	}
+
+	/**
+	 * Set what executeworkflow script to use by default. This is the value that
+	 * is used if not overridden by the administration interface.
+	 * 
+	 * @param defaultScript
+	 *            Full path to the script to use.
+	 */
+	public void setDefaultExecuteWorkflowScript(String defaultScript) {
+		if (defaultScript.startsWith("${") || defaultScript.equals("NONE")) {
+			this.defaultExecuteWorkflowScript = guessWorkflowScript();
+			return;
+		}
+		this.defaultExecuteWorkflowScript = defaultScript;
+	}
+
+	String getDefaultExecuteWorkflowScript() {
+		return defaultExecuteWorkflowScript;
+	}
+
+	@Override
+	public void setExtraArgs(String[] extraArgs) {
+		this.extraArgs = extraArgs.clone();
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String[] getExtraArgs() {
+		return extraArgs == null ? EXTRA_ARGUMENTS : extraArgs.clone();
+	}
+
+	@Override
+	public void setWaitSeconds(int waitSeconds) {
+		this.waitSeconds = waitSeconds;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public int getWaitSeconds() {
+		return waitSeconds < 1 ? SUBPROCESS_START_WAIT : waitSeconds;
+	}
+
+	@Override
+	public void setSleepMS(int sleepMS) {
+		this.sleepMS = sleepMS;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public int getSleepMS() {
+		return sleepMS < 1 ? SUBPROCESS_START_POLL_SLEEP : sleepMS;
+	}
+
+	@Override
+	public void setServerWorkerJar(String serverWorkerJar) {
+		this.serverWorkerJar = serverWorkerJar;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getServerWorkerJar() {
+		return serverWorkerJar == null ? DEFAULT_WORKER_JAR : serverWorkerJar;
+	}
+
+	@Override
+	public void setServerForkerJar(String serverForkerJar) {
+		this.serverForkerJar = serverForkerJar;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getServerForkerJar() {
+		return serverForkerJar == null ? DEFAULT_FORKER_JAR : serverForkerJar;
+	}
+
+	@Override
+	public void setJavaBinary(String javaBinary) {
+		this.javaBinary = javaBinary;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getJavaBinary() {
+		return javaBinary == null ? DEFAULT_JAVA_BINARY : javaBinary;
+	}
+
+	@Override
+	public void setPasswordFile(String passwordFile) {
+		this.passwordFile = passwordFile;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getPasswordFile() {
+		return passwordFile == null ? defaultPasswordFile : passwordFile;
+	}
+
+	void setDefaultPasswordFile(String defaultPasswordFile) {
+		this.defaultPasswordFile = defaultPasswordFile;
+	}
+
+	@Override
+	public void setRegistryHost(String registryHost) {
+		this.registryHost = (registryHost == null ? "" : registryHost);
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public String getRegistryHost() {
+		return (registryHost == null || registryHost.isEmpty()) ? null
+				: registryHost;
+	}
+
+	@Override
+	public void setRegistryPort(int registryPort) {
+		this.registryPort = ((registryPort < 1 || registryPort > 65534) ? REGISTRY_PORT
+				: registryPort);
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public int getRegistryPort() {
+		return registryPort == 0 ? REGISTRY_PORT : registryPort;
+	}
+
+	@Override
+	public String getRegistryJar() {
+		return registryJar == null ? DEFAULT_REGISTRY_JAR : registryJar;
+	}
+
+	@Override
+	public void setRegistryJar(String rmiRegistryJar) {
+		this.registryJar = (rmiRegistryJar == null || rmiRegistryJar.isEmpty()) ? null
+				: rmiRegistryJar;
+		if (loadedState)
+			self.store();
+	}
+
+	@Override
+	public List<URI> getPermittedWorkflowURIs() {
+		if (permittedWorkflows == null || permittedWorkflows.length == 0)
+			return emptyList();
+		return unmodifiableList(asList(permittedWorkflows));
+	}
+
+	@Override
+	public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) {
+		if (permittedWorkflows == null || permittedWorkflows.isEmpty())
+			this.permittedWorkflows = new URI[0];
+		else
+			this.permittedWorkflows = permittedWorkflows
+					.toArray(new URI[permittedWorkflows.size()]);
+		if (loadedState)
+			self.store();
+	}
+
+	public static final boolean DEFAULT_GENERATE_PROVENANCE = false;
+	private Boolean generateProvenance;
+
+	@Override
+	public boolean getGenerateProvenance() {
+		Boolean g = generateProvenance;
+		return g == null ? DEFAULT_GENERATE_PROVENANCE : (boolean) g;
+	}
+
+	@Override
+	public void setGenerateProvenance(boolean generate) {
+		this.generateProvenance = generate;
+		if (loadedState)
+			self.store();
+	}
+
+	// --------------------------------------------------------------
+
+	private boolean loadedState;
+
+	@PostConstruct
+	@WithinSingleTransaction
+	public void load() {
+		if (loadedState || !isPersistent())
+			return;
+		WorkerModel state = getById(KEY);
+		if (state == null) {
+			store();
+			return;
+		}
+
+		defaultLifetime = state.getDefaultLifetime();
+		executeWorkflowScript = state.getExecuteWorkflowScript();
+		extraArgs = state.getExtraArgs();
+		factoryProcessNamePrefix = state.getFactoryProcessNamePrefix();
+		javaBinary = state.getJavaBinary();
+		maxRuns = state.getMaxRuns();
+		serverWorkerJar = state.getServerWorkerJar();
+		serverForkerJar = state.getServerForkerJar();
+		passwordFile = state.getPasswordFile();
+		sleepMS = state.getSleepMS();
+		waitSeconds = state.getWaitSeconds();
+		registryHost = state.getRegistryHost();
+		registryPort = state.getRegistryPort();
+		operatingLimit = state.getOperatingLimit();
+		List<URI> pwu = state.getPermittedWorkflowURIs();
+		permittedWorkflows = (URI[]) pwu.toArray(new URI[pwu.size()]);
+		registryJar = state.getRegistryJar();
+		generateProvenance = state.getGenerateProvenance();
+
+		loadedState = true;
+	}
+
+	@WithinSingleTransaction
+	public void store() {
+		if (!isPersistent())
+			return;
+		WorkerModel state = getById(KEY);
+		if (state == null)
+			state = persist(makeInstance());
+
+		state.setDefaultLifetime(defaultLifetime);
+		state.setExecuteWorkflowScript(executeWorkflowScript);
+		state.setExtraArgs(extraArgs);
+		state.setFactoryProcessNamePrefix(factoryProcessNamePrefix);
+		state.setJavaBinary(javaBinary);
+		state.setMaxRuns(maxRuns);
+		state.setServerWorkerJar(serverWorkerJar);
+		state.setServerForkerJar(serverForkerJar);
+		state.setPasswordFile(passwordFile);
+		state.setSleepMS(sleepMS);
+		state.setWaitSeconds(waitSeconds);
+		state.setRegistryHost(registryHost);
+		state.setRegistryPort(registryPort);
+		state.setOperatingLimit(operatingLimit);
+		if (permittedWorkflows != null)
+			state.setPermittedWorkflowURIs(asList(permittedWorkflows));
+		state.setRegistryJar(registryJar);
+		if (generateProvenance != null)
+			state.setGenerateProvenance(generateProvenance);
+
+		loadedState = true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
new file mode 100644
index 0000000..3ed4c51
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
@@ -0,0 +1,270 @@
+/*
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+
+import org.taverna.server.master.worker.WorkerModel;
+
+/**
+ * The actual database connector for persisted local worker state.
+ * 
+ * @author Donal Fellows
+ */
+/*
+ * WARNING! If you change the name of this class, update persistence.xml as
+ * well!
+ */
+@PersistenceCapable(table = PersistedState.TABLE)
+class PersistedState implements WorkerModel {
+	static final String TABLE = "LOCALWORKERSTATE__PERSISTEDSTATE";
+
+	static PersistedState makeInstance() {
+		PersistedState o = new PersistedState();
+		o.ID = KEY;
+		return o;
+	}
+
+	@PrimaryKey(column = "ID")
+	protected int ID;
+
+	static final int KEY = 32;
+
+	@Persistent
+	private int defaultLifetime;
+	@Persistent
+	private int maxRuns;
+	@Persistent
+	private String factoryProcessNamePrefix;
+	@Persistent
+	private String executeWorkflowScript;
+	@Persistent(serialized = "true")
+	private String[] extraArgs;
+	@Persistent
+	private int waitSeconds;
+	@Persistent
+	private int sleepMS;
+	@Persistent
+	private String serverWorkerJar;
+	@Persistent
+	private String serverForkerJar;
+	@Persistent
+	private String registryJar;
+	@Persistent
+	private String passwordFile;
+	@Persistent
+	private String javaBinary;
+	@Persistent
+	private int registryPort;
+	@Persistent
+	private String registryHost;
+	@Persistent
+	private int operatingLimit;
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_PERMWFURI", column = "ID")
+	private String[] permittedWorkflows;
+	@Persistent
+	private int generateProvenance;
+
+	@Override
+	public void setDefaultLifetime(int defaultLifetime) {
+		this.defaultLifetime = defaultLifetime;
+	}
+
+	@Override
+	public int getDefaultLifetime() {
+		return defaultLifetime;
+	}
+
+	@Override
+	public void setMaxRuns(int maxRuns) {
+		this.maxRuns = maxRuns;
+	}
+
+	@Override
+	public int getMaxRuns() {
+		return maxRuns;
+	}
+
+	@Override
+	public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) {
+		this.factoryProcessNamePrefix = factoryProcessNamePrefix;
+	}
+
+	@Override
+	public String getFactoryProcessNamePrefix() {
+		return factoryProcessNamePrefix;
+	}
+
+	@Override
+	public void setExecuteWorkflowScript(String executeWorkflowScript) {
+		this.executeWorkflowScript = executeWorkflowScript;
+	}
+
+	@Override
+	public String getExecuteWorkflowScript() {
+		return executeWorkflowScript;
+	}
+
+	@Override
+	public void setExtraArgs(String[] extraArgs) {
+		this.extraArgs = extraArgs;
+	}
+
+	@Override
+	public String[] getExtraArgs() {
+		return extraArgs;
+	}
+
+	@Override
+	public void setWaitSeconds(int waitSeconds) {
+		this.waitSeconds = waitSeconds;
+	}
+
+	@Override
+	public int getWaitSeconds() {
+		return waitSeconds;
+	}
+
+	@Override
+	public void setSleepMS(int sleepMS) {
+		this.sleepMS = sleepMS;
+	}
+
+	@Override
+	public int getSleepMS() {
+		return sleepMS;
+	}
+
+	@Override
+	public void setServerWorkerJar(String serverWorkerJar) {
+		this.serverWorkerJar = serverWorkerJar;
+	}
+
+	@Override
+	public String getServerWorkerJar() {
+		return serverWorkerJar;
+	}
+
+	@Override
+	public void setJavaBinary(String javaBinary) {
+		this.javaBinary = javaBinary;
+	}
+
+	@Override
+	public String getJavaBinary() {
+		return javaBinary;
+	}
+
+	@Override
+	public void setRegistryPort(int registryPort) {
+		this.registryPort = registryPort;
+	}
+
+	@Override
+	public int getRegistryPort() {
+		return registryPort;
+	}
+
+	@Override
+	public void setRegistryHost(String registryHost) {
+		this.registryHost = registryHost;
+	}
+
+	@Override
+	public String getRegistryHost() {
+		return registryHost;
+	}
+
+	@Override
+	public void setServerForkerJar(String serverForkerJar) {
+		this.serverForkerJar = serverForkerJar;
+	}
+
+	@Override
+	public String getServerForkerJar() {
+		return serverForkerJar;
+	}
+
+	@Override
+	public void setPasswordFile(String passwordFile) {
+		this.passwordFile = passwordFile;
+	}
+
+	@Override
+	public String getPasswordFile() {
+		return passwordFile;
+	}
+
+	@Override
+	public void setOperatingLimit(int operatingLimit) {
+		this.operatingLimit = operatingLimit;
+	}
+
+	@Override
+	public int getOperatingLimit() {
+		return operatingLimit;
+	}
+
+	@Override
+	public List<URI> getPermittedWorkflowURIs() {
+		String[] pw = this.permittedWorkflows;
+		if (pw == null)
+			return new ArrayList<>();
+		List<URI> uris = new ArrayList<>(pw.length);
+		for (String uri : pw)
+			uris.add(URI.create(uri));
+		return uris;
+	}
+
+	@Override
+	public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) {
+		String[] pw = new String[permittedWorkflows.size()];
+		for (int i = 0; i < pw.length; i++)
+			pw[i] = permittedWorkflows.get(i).toString();
+		this.permittedWorkflows = pw;
+	}
+
+	@Override
+	public String getRegistryJar() {
+		return registryJar;
+	}
+
+	@Override
+	public void setRegistryJar(String registryJar) {
+		this.registryJar = registryJar;
+	}
+
+	@Override
+	public boolean getGenerateProvenance() {
+		return generateProvenance > 0;
+	}
+
+	@Override
+	public void setGenerateProvenance(boolean generateProvenance) {
+		this.generateProvenance = (generateProvenance ? 1 : 0);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
new file mode 100644
index 0000000..e563965
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
@@ -0,0 +1,78 @@
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Thread.interrupted;
+import static org.apache.commons.logging.LogFactory.getLog;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.commons.logging.Log;
+
+abstract class StreamLogger {
+	protected final Log log;
+	private Thread t;
+	private InputStream in;
+
+	protected StreamLogger(final String name, InputStream is) {
+		log = getLog("Taverna.Server.LocalWorker." + name);
+		in = is;
+		t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try (BufferedReader br = new BufferedReader(
+						new InputStreamReader(in))) {
+					String line;
+					while (!interrupted() && (line = br.readLine()) != null)
+						if (!line.isEmpty())
+							write(line);
+				} catch (IOException e) {
+					// Do nothing...
+				} catch (Exception e) {
+					log.warn("failure in reading from " + name, e);
+				}
+			}
+		}, name + ".StreamLogger");
+		t.setContextClassLoader(null);
+		t.setDaemon(true);
+		t.start();
+	}
+
+	/**
+	 * Write a line read from the subprocess to the log.
+	 * <p>
+	 * This needs to be implemented by subclasses in order for the log to be
+	 * correctly written with the class name.
+	 * 
+	 * @param msg
+	 *            The message to write. Guaranteed to have no newline characters
+	 *            in it and to be non-empty.
+	 */
+	protected abstract void write(String msg);
+
+	public void stop() {
+		log.info("trying to close down " + t.getName());
+		t.interrupt();
+		try {
+			in.close();
+		} catch (IOException e) {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
new file mode 100644
index 0000000..031ce34
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
@@ -0,0 +1,23 @@
+/*
+ */
+/**
+ * Implementation of a Taverna Server back-end that works by forking off
+ * workflow executors on the local system.
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
new file mode 100644
index 0000000..2a51496
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
@@ -0,0 +1,126 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.mail.MailSender;
+import org.springframework.mail.SimpleMailMessage;
+import org.springframework.mail.javamail.JavaMailSender;
+
+/**
+ * How to send a plain text message by email to someone.
+ * 
+ * @author Donal Fellows
+ */
+public class EmailDispatcher extends RateLimitedDispatcher {
+	@Override
+	public String getName() {
+		return "mailto";
+	}
+
+	/**
+	 * @param from
+	 *            Email address that the notification is to come from.
+	 */
+	@Required
+	public void setFrom(String from) {
+		this.from = valid(from, "");
+	}
+
+	/**
+	 * @param host
+	 *            The outgoing SMTP server address.
+	 */
+	@Required
+	public void setSmtpHost(String host) {
+		this.host = valid(host, "");
+	}
+
+	/**
+	 * @param contentType
+	 *            The content type of the message to be sent. For example, "
+	 *            <tt>text/plain</tt>".
+	 */
+	public void setMessageContentType(String contentType) {
+		this.contentType = contentType;
+	}
+
+	/**
+	 * @param sender
+	 *            the sender to set
+	 */
+	public void setSender(MailSender sender) {
+		this.sender = sender;
+	}
+
+	private String from;
+	private String host;
+	private MailSender sender;
+	@SuppressWarnings("unused")
+	private String contentType = TEXT_PLAIN;
+
+	/**
+	 * Try to perform the lookup of the email service. This is called during
+	 * configuration so that any failure happens at a useful, predictable time.
+	 */
+	@PostConstruct
+	public void tryLookup() {
+		if (!isAvailable()) {
+			log.warn("no mail support; disabling email dispatch");
+			sender = null;
+			return;
+		}
+		try {
+			if (sender instanceof JavaMailSender)
+				((JavaMailSender) sender).createMimeMessage();
+		} catch (Throwable t) {
+			log.warn("sender having problems constructing messages; "
+					+ "disabling...", t);
+			sender = null;
+		}
+	}
+
+	@Override
+	public void dispatch(String messageSubject, String messageContent, String to)
+			throws Exception {
+		// Simple checks for acceptability
+		if (!to.matches(".+@.+")) {
+			log.info("did not send email notification: improper email address \""
+					+ to + "\"");
+			return;
+		}
+
+		SimpleMailMessage message = new SimpleMailMessage();
+		message.setFrom(from);
+		message.setTo(to.trim());
+		message.setSubject(messageSubject);
+		message.setText(messageContent);
+		sender.send(message);
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return (host != null && !host.isEmpty() && sender != null
+				&& from != null && !from.isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
new file mode 100644
index 0000000..711f4e6
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
@@ -0,0 +1,153 @@
+/*
+ */
+
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jivesoftware.smack.Chat;
+import org.jivesoftware.smack.ConnectionConfiguration;
+import org.jivesoftware.smack.MessageListener;
+import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.packet.Message;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * Send notifications by Jabber/XMPP.
+ * 
+ * @author Donal Fellows
+ */
+public class JabberDispatcher implements MessageDispatcher {
+	@Override
+	public String getName() {
+		return "xmpp";
+	}
+
+	private Log log = LogFactory.getLog("Taverna.Server.Notification");
+	private XMPPConnection conn;
+	private String resource = "TavernaServer";
+	private String host = "";
+	private String user = "";
+	private String pass = "";
+
+	/**
+	 * @param resource
+	 *            The XMPP resource to use when connecting the server. This
+	 *            defaults to "<tt>TavernaServer</tt>".
+	 */
+	public void setResource(String resource) {
+		this.resource = resource;
+	}
+
+	/**
+	 * @param service
+	 *            The XMPP service URL.
+	 */
+	public void setHost(String service) {
+		if (service == null || service.trim().isEmpty()
+				|| service.trim().startsWith("$"))
+			this.host = "";
+		else
+			this.host = service.trim();
+	}
+
+	/**
+	 * @param user
+	 *            The user identity to use with the XMPP service.
+	 */
+	public void setUsername(String user) {
+		if (user == null || user.trim().isEmpty()
+				|| user.trim().startsWith("$"))
+			this.user = "";
+		else
+			this.user = user.trim();
+	}
+
+	/**
+	 * @param pass
+	 *            The password to use with the XMPP service.
+	 */
+	public void setPassword(String pass) {
+		if (pass == null || pass.trim().isEmpty()
+				|| pass.trim().startsWith("$"))
+			this.pass = "";
+		else
+			this.pass = pass.trim();
+	}
+
+	@PostConstruct
+	void setup() {
+		try {
+			if (host.isEmpty() || user.isEmpty() || pass.isEmpty()) {
+				log.info("disabling XMPP support; incomplete configuration");
+				conn = null;
+				return;
+			}
+			ConnectionConfiguration cfg = new ConnectionConfiguration(host);
+			cfg.setSendPresence(false);
+			XMPPConnection c = new XMPPConnection(cfg);
+			c.connect();
+			c.login(user, pass, resource);
+			conn = c;
+			log.info("connected to XMPP service <" + host + "> as user <"
+					+ user + ">");
+		} catch (Exception e) {
+			log.info("failed to connect to XMPP server", e);
+		}
+	}
+
+	@PreDestroy
+	public void close() {
+		if (conn != null)
+			conn.disconnect();
+		conn = null;
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return conn != null;
+	}
+
+	@Override
+	public void dispatch(TavernaRun ignored, String messageSubject,
+			String messageContent, String targetParameter) throws Exception {
+		Chat chat = conn.getChatManager().createChat(targetParameter,
+				new DroppingListener());
+		Message m = new Message();
+		m.addBody(null, messageContent);
+		m.setSubject(messageSubject);
+		chat.sendMessage(m);
+	}
+
+	static class DroppingListener implements MessageListener {
+		private Log log = LogFactory
+				.getLog("Taverna.Server.Notification.Jabber");
+
+		@Override
+		public void processMessage(Chat chat, Message message) {
+			if (log.isDebugEnabled())
+				log.debug("unexpectedly received XMPP message from <"
+						+ message.getFrom() + ">; ignoring");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
new file mode 100644
index 0000000..067d154
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
@@ -0,0 +1,158 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * A common object for handling dispatch of event-driven messages.
+ * 
+ * @author Donal Fellows
+ */
+public class NotificationEngine {
+	private Log log = LogFactory.getLog("Taverna.Server.Notification");
+	private Map<String, MessageDispatcher> dispatchers;
+	private List<MessageDispatcher> universalDispatchers;
+
+	/**
+	 * @param dispatchers
+	 *            The various dispatchers we want to install.
+	 */
+	@Required
+	public void setDispatchers(List<MessageDispatcher> dispatchers) {
+		this.dispatchers = new HashMap<>();
+		for (MessageDispatcher d : dispatchers)
+			this.dispatchers.put(d.getName(), d);
+	}
+
+	/**
+	 * @param dispatcherList
+	 *            A list of dispatch objects to always dispatch to.
+	 */
+	@Required
+	public void setUniversalDispatchers(List<MessageDispatcher> dispatcherList) {
+		this.universalDispatchers = dispatcherList;
+	}
+
+	private void dispatchToChosenTarget(TavernaRun originator, String scheme,
+			String target, Message message) throws Exception {
+		try {
+			MessageDispatcher d = dispatchers.get(scheme);
+			if (d != null && d.isAvailable())
+				d.dispatch(originator, message.getTitle(scheme),
+						message.getContent(scheme), target);
+			else
+				log.warn("no such notification dispatcher for " + scheme);
+		} catch (URISyntaxException e) {
+			// See if *someone* will handle the message
+			Exception e2 = null;
+			for (MessageDispatcher d : dispatchers.values())
+				try {
+					if (d.isAvailable()) {
+						d.dispatch(originator, message.getTitle(d.getName()),
+								message.getContent(d.getName()), scheme + ":"
+										+ target);
+						return;
+					}
+				} catch (Exception ex) {
+					if (log.isDebugEnabled())
+						log.debug("failed in pseudo-directed dispatch of "
+								+ scheme + ":" + target, ex);
+					e2 = ex;
+				}
+			if (e2 != null)
+				throw e2;
+		}
+	}
+
+	private void dispatchUniversally(TavernaRun originator, Message message)
+			throws Exception {
+		for (MessageDispatcher d : universalDispatchers)
+			try {
+				if (d.isAvailable())
+					d.dispatch(originator, message.getTitle(d.getName()),
+							message.getContent(d.getName()), null);
+			} catch (Exception e) {
+				log.warn("problem in universal dispatcher", e);
+			}
+	}
+
+	/**
+	 * Dispatch a message over the notification fabric.
+	 * 
+	 * @param originator
+	 *            What workflow run was the source of this message?
+	 * @param destination
+	 *            Where the message should get delivered to. The correct format
+	 *            of this is either as a URI of some form (where the scheme
+	 *            determines the dispatcher) or as an invalid URI in which case
+	 *            it is just tried against the possibilities to see if any
+	 *            succeeds.
+	 * @param subject
+	 *            The subject line of the message.
+	 * @param message
+	 *            The plain text body of the message.
+	 * @throws Exception
+	 *             If anything goes wrong with the dispatch process.
+	 */
+	public void dispatchMessage(TavernaRun originator, String destination,
+			Message message) throws Exception {
+		if (destination != null && !destination.trim().isEmpty()) {
+			try {
+				URI toURI = new URI(destination.trim());
+				dispatchToChosenTarget(originator, toURI.getScheme(),
+						toURI.getSchemeSpecificPart(), message);
+			} catch (URISyntaxException e) {
+				// Ignore
+			}
+		}
+		dispatchUniversally(originator, message);
+	}
+
+	/**
+	 * @return The message dispatchers that are actually available (i.e., not
+	 *         disabled by configuration somewhere).
+	 */
+	public List<String> listAvailableDispatchers() {
+		ArrayList<String> result = new ArrayList<>();
+		for (Map.Entry<String, MessageDispatcher> entry : dispatchers
+				.entrySet()) {
+			if (entry.getValue().isAvailable())
+				result.add(entry.getKey());
+		}
+		return result;
+	}
+
+	public interface Message {
+		String getContent(String type);
+
+		String getTitle(String type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
new file mode 100644
index 0000000..a41e23b
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
@@ -0,0 +1,102 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * Rate-limiting support. Some message fabrics simply should not be used to send
+ * a lot of messages.
+ * 
+ * @author Donal Fellows
+ */
+public abstract class RateLimitedDispatcher implements MessageDispatcher {
+	/** Pre-configured logger. */
+	protected Log log = LogFactory.getLog("Taverna.Server.Notification");
+	private int cooldownSeconds;
+	private Map<String, DateTime> lastSend = new HashMap<>();
+
+	String valid(String value, String def) {
+		if (value == null || value.trim().isEmpty()
+				|| value.trim().startsWith("${"))
+			return def;
+		else
+			return value.trim();
+	}
+
+	/**
+	 * Set how long must elapse between updates to the status of any particular
+	 * user. Calls before that time are just silently dropped.
+	 * 
+	 * @param cooldownSeconds
+	 *            Time to elapse, in seconds.
+	 */
+	public void setCooldownSeconds(int cooldownSeconds) {
+		this.cooldownSeconds = cooldownSeconds;
+	}
+
+	/**
+	 * Test whether the rate limiter allows the given user to send a message.
+	 * 
+	 * @param who
+	 *            Who wants to send the message?
+	 * @return <tt>true</tt> iff they are permitted.
+	 */
+	protected boolean isSendAllowed(String who) {
+		DateTime now = new DateTime();
+		synchronized (lastSend) {
+			DateTime last = lastSend.get(who);
+			if (last != null) {
+				if (!now.isAfter(last.plusSeconds(cooldownSeconds)))
+					return false;
+			}
+			lastSend.put(who, now);
+		}
+		return true;
+	}
+
+	@Override
+	public void dispatch(TavernaRun ignored, String messageSubject,
+			String messageContent, String target) throws Exception {
+		if (isSendAllowed(target))
+			dispatch(messageSubject, messageContent, target);
+	}
+
+	/**
+	 * Dispatch a message to a recipient that doesn't care what produced it.
+	 * 
+	 * @param messageSubject
+	 *            The subject of the message to send.
+	 * @param messageContent
+	 *            The plain-text content of the message to send.
+	 * @param target
+	 *            A description of where it is to go.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	public abstract void dispatch(String messageSubject, String messageContent,
+			String target) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
new file mode 100644
index 0000000..559f111
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
@@ -0,0 +1,171 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.defaults.Default.SMS_GATEWAY_URL;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.springframework.beans.factory.annotation.Required;
+
+/**
+ * Dispatch termination messages via SMS.
+ * 
+ * @author Donal Fellows
+ */
+public class SMSDispatcher extends RateLimitedDispatcher {
+	@Override
+	public String getName() {
+		return "sms";
+	}
+
+	private CloseableHttpClient client;
+	private URI service;
+	private String user = "", pass = "";
+	private String usernameField = "username", passwordField = "password",
+			destinationField = "to", messageField = "text";
+
+	/**
+	 * @param usernameField
+	 *            The name of the field that conveys the sending username; this
+	 *            is the <i>server</i>'s identity.
+	 */
+	@Required
+	public void setUsernameField(String usernameField) {
+		this.usernameField = usernameField;
+	}
+
+	/**
+	 * @param passwordField
+	 *            The field holding the password to authenticate the server to
+	 *            the SMS gateway.
+	 */
+	@Required
+	public void setPasswordField(String passwordField) {
+		this.passwordField = passwordField;
+	}
+
+	/**
+	 * @param destinationField
+	 *            The field holding the number to send the SMS to.
+	 */
+	@Required
+	public void setDestinationField(String destinationField) {
+		this.destinationField = destinationField;
+	}
+
+	/**
+	 * @param messageField
+	 *            The field holding the plain-text message to send.
+	 */
+	@Required
+	public void setMessageField(String messageField) {
+		this.messageField = messageField;
+	}
+
+	public void setService(String serviceURL) {
+		String s = valid(serviceURL, "");
+		if (s.isEmpty()) {
+			log.warn("did not get sms.service from servlet config; using default ("
+					+ SMS_GATEWAY_URL + ")");
+			s = SMS_GATEWAY_URL;
+		}
+		try {
+			service = new URI(s);
+		} catch (URISyntaxException e) {
+			service = null;
+		}
+	}
+
+	public void setUser(String user) {
+		this.user = valid(user, "");
+	}
+
+	public void setPassword(String pass) {
+		this.pass = valid(pass, "");
+	}
+
+	@PostConstruct
+	void init() {
+		client = HttpClientBuilder.create().build();
+	}
+
+	@PreDestroy
+	void close() throws IOException {
+		try {
+			if (client != null)
+				client.close();
+		} finally {
+			client = null;
+		}
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return service != null && !user.isEmpty() && !pass.isEmpty();
+	}
+
+	@Override
+	public void dispatch(String messageSubject, String messageContent,
+			String targetParameter) throws Exception {
+		// Sanity check
+		if (!targetParameter.matches("[^0-9]+"))
+			throw new Exception("invalid phone number");
+
+		if (!isSendAllowed("anyone"))
+			return;
+
+		// Build the message to send
+		List<NameValuePair> params = new ArrayList<>();
+		params.add(new BasicNameValuePair(usernameField, user));
+		params.add(new BasicNameValuePair(passwordField, pass));
+		params.add(new BasicNameValuePair(destinationField, targetParameter));
+		params.add(new BasicNameValuePair(messageField, messageContent));
+
+		// Send the message
+		HttpPost post = new HttpPost(service);
+		post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
+		HttpResponse response = client.execute(post);
+
+		// Log the response
+		HttpEntity entity = response.getEntity();
+		if (entity != null)
+			try (BufferedReader e = new BufferedReader(new InputStreamReader(
+					entity.getContent()))) {
+				log.info(e.readLine());
+			}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
new file mode 100644
index 0000000..a0269a5
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
@@ -0,0 +1,145 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Properties;
+
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.conf.Configuration;
+import twitter4j.conf.PropertyConfiguration;
+import twitter4j.auth.AuthorizationFactory;
+
+/**
+ * Super simple-minded twitter dispatcher. You need to tell it your consumer key
+ * and secret as part of the connection parameters, for example via a dispatcher
+ * URN of "<tt>twitter:fred:bloggs</tt>" where <tt>fred</tt> is the key and
+ * <tt>bloggs</tt> is the secret.
+ * 
+ * @author Donal Fellows
+ */
+public class TwitterDispatcher extends RateLimitedDispatcher {
+	@Override
+	public String getName() {
+		return "twitter";
+	}
+
+	public static final int MAX_MESSAGE_LENGTH = 140;
+	public static final char ELLIPSIS = '\u2026';
+
+	private String token = "";
+	private String secret = "";
+
+	public void setAccessToken(String token) {
+		this.token = valid(token, "");
+	}
+
+	public void setAccessSecret(String secret) {
+		this.secret = valid(secret, "");
+	}
+
+	private Properties getConfig() throws NotConfiguredException {
+		if (token.isEmpty() || secret.isEmpty())
+			throw new NotConfiguredException();
+		Properties p = new Properties();
+		p.setProperty(ACCESS_TOKEN_PROP, token);
+		p.setProperty(ACCESS_SECRET_PROP, secret);
+		return p;
+	}
+
+	public static final String ACCESS_TOKEN_PROP = "oauth.accessToken";
+	public static final String ACCESS_SECRET_PROP = "oauth.accessTokenSecret";
+
+	private Twitter getTwitter(String key, String secret) throws Exception {
+		if (key.isEmpty() || secret.isEmpty())
+			throw new NoCredentialsException();
+
+		Properties p = getConfig();
+		p.setProperty("oauth.consumerKey", key);
+		p.setProperty("oauth.consumerSecret", secret);
+
+		Configuration config = new PropertyConfiguration(p);
+		TwitterFactory factory = new TwitterFactory(config);
+		Twitter t = factory.getInstance(AuthorizationFactory
+				.getInstance(config));
+		// Verify that we can connect!
+		t.getOAuthAccessToken();
+		return t;
+	}
+
+	// TODO: Get secret from credential manager
+	@Override
+	public void dispatch(String messageSubject, String messageContent,
+			String targetParameter) throws Exception {
+		// messageSubject ignored
+		String[] target = targetParameter.split(":", 2);
+		if (target == null || target.length != 2)
+			throw new Exception("missing consumer key or secret");
+		String who = target[0];
+		if (!isSendAllowed(who))
+			return;
+		Twitter twitter = getTwitter(who, target[1]);
+
+		if (messageContent.length() > MAX_MESSAGE_LENGTH)
+			messageContent = messageContent
+					.substring(0, MAX_MESSAGE_LENGTH - 1) + ELLIPSIS;
+		twitter.updateStatus(messageContent);
+	}
+
+	@Override
+	public boolean isAvailable() {
+		try {
+			// Try to create the configuration and push it through as far as
+			// confirming that we can build an access object (even if it isn't
+			// bound to a user)
+			new TwitterFactory(new PropertyConfiguration(getConfig()))
+					.getInstance();
+			return true;
+		} catch (Exception e) {
+			return false;
+		}
+	}
+
+	/**
+	 * Indicates that the dispatcher has not been configured with service
+	 * credentials.
+	 * 
+	 * @author Donal Fellows
+	 */
+	@SuppressWarnings("serial")
+	public static class NotConfiguredException extends Exception {
+		NotConfiguredException() {
+			super("not configured with xAuth key and secret; "
+					+ "dispatch not possible");
+		}
+	}
+
+	/**
+	 * Indicates that the user did not supply their credentials.
+	 * 
+	 * @author Donal Fellows
+	 */
+	@SuppressWarnings("serial")
+	public static class NoCredentialsException extends Exception {
+		NoCredentialsException() {
+			super("no consumer key and secret present; "
+					+ "dispatch not possible");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
new file mode 100644
index 0000000..eda6d9d
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
@@ -0,0 +1,147 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.String.format;
+import static java.util.UUID.randomUUID;
+import static javax.ws.rs.core.UriBuilder.fromUri;
+import static org.taverna.server.master.common.Roles.USER;
+import static org.taverna.server.master.common.Uri.secure;
+
+import java.net.URI;
+import java.util.Date;
+
+import javax.annotation.security.RolesAllowed;
+import javax.servlet.ServletContext;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Entry;
+import org.apache.abdera.model.Feed;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.web.context.ServletContextAware;
+import org.taverna.server.master.TavernaServerSupport;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.UriBuilderFactory;
+import org.taverna.server.master.rest.TavernaServerREST.EventFeed;
+import org.taverna.server.master.utils.InvocationCounter.CallCounted;
+
+/**
+ * Simple REST handler that allows an Atom feed to be served up of events
+ * generated by workflow runs.
+ * 
+ * @author Donal Fellows
+ */
+public class AtomFeed implements EventFeed, UriBuilderFactory,
+		ServletContextAware {
+	/**
+	 * The name of a parameter that states what address we should claim that the
+	 * feed's internally-generated URIs are relative to. If not set, a default
+	 * will be guessed.
+	 */
+	public static final String PREFERRED_URI_PARAM = "taverna.preferredUserUri";
+	private EventDAO eventSource;
+	private TavernaServerSupport support;
+	private URI baseURI;
+	private Abdera abdera;
+	private String feedLanguage = "en";
+	private String uuid = randomUUID().toString();
+
+	@Required
+	public void setEventSource(EventDAO eventSource) {
+		this.eventSource = eventSource;
+	}
+
+	@Required
+	public void setSupport(TavernaServerSupport support) {
+		this.support = support;
+	}
+
+	public void setFeedLanguage(String language) {
+		this.feedLanguage = language;
+	}
+
+	public String getFeedLanguage() {
+		return feedLanguage;
+	}
+
+	@Required
+	public void setAbdera(Abdera abdera) {
+		this.abdera = abdera;
+	}
+
+	@Override
+	@CallCounted
+	@RolesAllowed(USER)
+	public Feed getFeed(UriInfo ui) {
+		Feed feed = abdera.getFactory().newFeed();
+		feed.setTitle("events relating to workflow runs").setLanguage(
+				feedLanguage);
+		String user = support.getPrincipal().toString()
+				.replaceAll("[^A-Za-z0-9]+", "");
+		feed.setId(format("urn:taverna-server:%s:%s", uuid, user));
+		org.joda.time.DateTime modification = null;
+		for (Event e : eventSource.getEvents(support.getPrincipal())) {
+			if (modification == null || e.getPublished().isAfter(modification))
+				modification = e.getPublished();
+			feed.addEntry(e.getEntry(abdera, feedLanguage));
+		}
+		if (modification == null)
+			feed.setUpdated(new Date());
+		else
+			feed.setUpdated(modification.toDate());
+		feed.addLink(ui.getAbsolutePath().toASCIIString(), "self");
+		return feed;
+	}
+
+	@Override
+	@CallCounted
+	@RolesAllowed(USER)
+	public Entry getEvent(String id) {
+		return eventSource.getEvent(support.getPrincipal(), id).getEntry(
+				abdera, feedLanguage);
+	}
+
+	@Override
+	public UriBuilder getRunUriBuilder(TavernaRun run) {
+		return secure(fromUri(getBaseUriBuilder().path("runs/{uuid}").build(
+				run.getId())));
+	}
+
+	@Override
+	public UriBuilder getBaseUriBuilder() {
+		return secure(fromUri(baseURI));
+	}
+
+	@Override
+	public String resolve(String uri) {
+		if (uri == null)
+			return null;
+		return secure(baseURI, uri).toString();
+	}
+
+	@Override
+	public void setServletContext(ServletContext servletContext) {
+		String base = servletContext.getInitParameter(PREFERRED_URI_PARAM);
+		if (base == null)
+			base = servletContext.getContextPath() + "/rest";
+		baseURI = URI.create(base);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
new file mode 100644
index 0000000..f1a9d62
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
@@ -0,0 +1,123 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.UUID.randomUUID;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Date;
+
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Index;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Entry;
+import org.joda.time.DateTime;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Parent class of all events that may appear on the feed for a workflow run.
+ * 
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+@PersistenceCapable(schema = "ATOM", table = "EVENTS")
+@Queries({
+		@Query(name = "eventsForUser", language = "SQL", value = "SELECT id FROM ATOM.EVENTS WHERE owner = ? ORDER BY published DESC", resultClass = String.class),
+		@Query(name = "eventForUserAndId", language = "SQL", value = "SELECT id FROM ATOM.EVENTS WHERE owner = ? AND id = ?", resultClass = String.class),
+		@Query(name = "eventsFromBefore", language = "SQL", value = "SELECT id FROM ATOM.EVENTS where published < ?", resultClass = String.class) })
+public class Event implements Serializable {
+	@Persistent(primaryKey = "true")
+	@Column(length = 48)
+	private String id;
+	@Persistent
+	private String owner;
+	@Persistent
+	@Index
+	private Date published;
+	@Persistent
+	private String message;
+	@Persistent
+	private String title;
+	@Persistent
+	private String link;
+
+	Event() {
+	}
+
+	/**
+	 * Initialise the identity of this event and the point at which it was
+	 * published.
+	 * 
+	 * @param idPrefix
+	 *            A prefix for the identity of this event.
+	 * @param owner
+	 *            Who is the owner of this event.
+	 */
+	Event(String idPrefix, URI workflowLink, UsernamePrincipal owner,
+			String title, String message) {
+		id = idPrefix + "." + randomUUID().toString();
+		published = new Date();
+		this.owner = owner.getName();
+		this.title = title;
+		this.message = message;
+		this.link = workflowLink.toASCIIString();
+	}
+
+	public final String getId() {
+		return id;
+	}
+
+	public final String getOwner() {
+		return owner;
+	}
+
+	public final DateTime getPublished() {
+		return new DateTime(published);
+	}
+
+	public String getMessage() {
+		return message;
+	}
+
+	public String getTitle() {
+		return title;
+	}
+
+	public String getLink() {
+		return link;
+	}
+
+	public Entry getEntry(Abdera abdera, String language) {
+		Entry entry = abdera.getFactory().newEntry();
+		entry.setId(id);
+		entry.setPublished(published);
+		entry.addAuthor(owner).setLanguage(language);
+		entry.setUpdated(published);
+		entry.setTitle(title).setLanguage(language);
+		entry.addLink(link, "related").setTitle("workflow run");
+		entry.setContent(message).setLanguage(language);
+		return entry;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
new file mode 100644
index 0000000..8bec456
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
@@ -0,0 +1,230 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Thread.interrupted;
+import static java.lang.Thread.sleep;
+import static java.util.Arrays.asList;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.UriBuilderFactory;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The database interface that supports the event feed.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class EventDAO extends JDOSupport<Event> implements MessageDispatcher {
+	public EventDAO() {
+		super(Event.class);
+	}
+
+	@Override
+	public String getName() {
+		return "atom";
+	}
+
+	private Log log = LogFactory.getLog("Taverna.Server.Atom");
+	private UriBuilderFactory ubf;
+	private int expiryAgeDays;
+
+	@Required
+	public void setExpiryAgeDays(int expiryAgeDays) {
+		this.expiryAgeDays = expiryAgeDays;
+	}
+
+	@Required
+	public void setUriBuilderFactory(UriBuilderFactory ubf) {
+		this.ubf = ubf;
+	}
+
+	/**
+	 * Get the given user's list of events.
+	 * 
+	 * @param user
+	 *            The identity of the user to get the events for.
+	 * @return A copy of the list of events currently known about.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public List<Event> getEvents(@Nonnull UsernamePrincipal user) {
+		List<String> ids = eventsForUser(user);
+		if (log.isDebugEnabled())
+			log.debug("found " + ids.size() + " events for user " + user);
+
+		List<Event> result = new ArrayList<>();
+		for (String id : ids) {
+			Event event = getById(id);
+			result.add(detach(event));
+		}
+		return result;
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> eventsForUser(UsernamePrincipal user) {
+		return (List<String>) namedQuery("eventsForUser").execute(
+				user.getName());
+	}
+
+	/**
+	 * Get a particular event.
+	 * 
+	 * @param user
+	 *            The identity of the user to get the event for.
+	 * @param id
+	 *            The handle of the event to look up.
+	 * @return A copy of the event.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public Event getEvent(@Nonnull UsernamePrincipal user, @Nonnull String id) {
+		List<String> ids = eventsForUserAndId(user, id);
+		if (log.isDebugEnabled())
+			log.debug("found " + ids.size() + " events for user " + user
+					+ " with id = " + id);
+
+		if (ids.size() != 1)
+			throw new IllegalArgumentException("no such id");
+		return detach(getById(ids.get(0)));
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> eventsForUserAndId(UsernamePrincipal user, String id) {
+		return (List<String>) namedQuery("eventForUserAndId").execute(
+				user.getName(), id);
+	}
+
+	/**
+	 * Delete a particular event.
+	 * 
+	 * @param id
+	 *            The identifier of the event to delete.
+	 */
+	@WithinSingleTransaction
+	public void deleteEventById(@Nonnull String id) {
+		delete(getById(id));
+	}
+
+	/**
+	 * Delete all events that have expired.
+	 */
+	@WithinSingleTransaction
+	public void deleteExpiredEvents() {
+		Date death = new DateTime().plusDays(-expiryAgeDays).toDate();
+		death = new Timestamp(death.getTime()); // UGLY SQL HACK
+
+		List<String> ids = eventsFromBefore(death);
+		if (log.isDebugEnabled() && !ids.isEmpty())
+			log.debug("found " + ids.size()
+					+ " events to be squelched (older than " + death + ")");
+
+		for (String id : ids)
+			delete(getById(id));
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> eventsFromBefore(Date death) {
+		return (List<String>) namedQuery("eventsFromBefore").execute(death);
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return true;
+	}
+
+	private BlockingQueue<Event> insertQueue = new ArrayBlockingQueue<>(16);
+
+	@Override
+	public void dispatch(TavernaRun originator, String messageSubject,
+			String messageContent, String targetParameter) throws Exception {
+		insertQueue.put(new Event("finish", ubf.getRunUriBuilder(originator)
+				.build(), originator.getSecurityContext().getOwner(),
+				messageSubject, messageContent));
+	}
+
+	public void started(TavernaRun originator, String messageSubject,
+			String messageContent) throws InterruptedException {
+		insertQueue.put(new Event("start", ubf.getRunUriBuilder(originator)
+				.build(), originator.getSecurityContext().getOwner(),
+				messageSubject, messageContent));
+	}
+
+	private Thread eventDaemon;
+	private boolean shuttingDown = false;
+
+	@Required
+	public void setSelf(final EventDAO dao) {
+		eventDaemon = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					while (!shuttingDown && !interrupted()) {
+						transferEvents(dao, new ArrayList<Event>(
+								asList(insertQueue.take())));
+						sleep(5000);
+					}
+				} catch (InterruptedException e) {
+				} finally {
+					transferEvents(dao, new ArrayList<Event>());
+				}
+			}
+		}, "ATOM event daemon");
+		eventDaemon.setContextClassLoader(null);
+		eventDaemon.setDaemon(true);
+		eventDaemon.start();
+	}
+
+	private void transferEvents(EventDAO dao, List<Event> e) {
+		insertQueue.drainTo(e);
+		dao.storeEvents(e);
+	}
+
+	@PreDestroy
+	void stopDaemon() {
+		shuttingDown = true;
+		if (eventDaemon != null)
+			eventDaemon.interrupt();
+	}
+
+	@WithinSingleTransaction
+	protected void storeEvents(List<Event> events) {
+		for (Event e : events)
+			persist(e);
+		log.info("stored " + events.size() + " notification events");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
new file mode 100644
index 0000000..0a1a52f
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
@@ -0,0 +1,42 @@
+/*
+ */
+/**
+ * This package contains the Atom feed implementation within Taverna Server.
+ * @author Donal Fellows
+ */
+@XmlSchema(namespace = FEED, elementFormDefault = QUALIFIED, attributeFormDefault = QUALIFIED, xmlns = {
+		@XmlNs(prefix = "xlink", namespaceURI = XLINK),
+		@XmlNs(prefix = "ts", namespaceURI = SERVER),
+		@XmlNs(prefix = "ts-rest", namespaceURI = SERVER_REST),
+		@XmlNs(prefix = "ts-soap", namespaceURI = SERVER_SOAP),
+		@XmlNs(prefix = "feed", namespaceURI = FEED),
+		@XmlNs(prefix = "admin", namespaceURI = ADMIN) })
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.xml.bind.annotation.XmlNsForm.QUALIFIED;
+import static org.taverna.server.master.common.Namespaces.ADMIN;
+import static org.taverna.server.master.common.Namespaces.FEED;
+import static org.taverna.server.master.common.Namespaces.SERVER;
+import static org.taverna.server.master.common.Namespaces.SERVER_REST;
+import static org.taverna.server.master.common.Namespaces.SERVER_SOAP;
+import static org.taverna.server.master.common.Namespaces.XLINK;
+
+import javax.xml.bind.annotation.XmlNs;
+import javax.xml.bind.annotation.XmlSchema;
+

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
new file mode 100644
index 0000000..43335cf
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
@@ -0,0 +1,23 @@
+/*
+ */
+/**
+ * The notification fabric and implementations of notification dispatchers
+ * that support subscription.
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
new file mode 100644
index 0000000..d912ac8
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
@@ -0,0 +1,24 @@
+/*
+ */
+/**
+ * The core of the implementation of Taverna Server, including the
+ * implementations of the SOAP and REST interfaces.
+ */
+package org.taverna.server.master;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
new file mode 100644
index 0000000..d9aef82
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
@@ -0,0 +1,41 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.MediaType.APPLICATION_ATOM_XML;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM;
+import static javax.ws.rs.core.MediaType.APPLICATION_XML;
+import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
+
+/**
+ * Miscellaneous content type constants.
+ * 
+ * @author Donal Fellows
+ */
+interface ContentTypes {
+	static final String URI_LIST = "text/uri-list";
+	static final String ZIP = "application/zip";
+	static final String TEXT = TEXT_PLAIN;
+	static final String XML = APPLICATION_XML;
+	static final String JSON = APPLICATION_JSON;
+	static final String BYTES = APPLICATION_OCTET_STREAM;
+	static final String ATOM = APPLICATION_ATOM_XML;
+	static final String ROBUNDLE = "application/vnd.wf4ever.robundle+zip";
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
new file mode 100644
index 0000000..42d4b0e
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
@@ -0,0 +1,74 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.common.Uri.secure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlSeeAlso;
+import javax.xml.bind.annotation.XmlType;
+
+import org.taverna.server.master.common.DirEntryReference;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+
+/**
+ * The result of a RESTful operation to list the contents of a directory. Done
+ * with JAXB.
+ * 
+ * @author Donal Fellows
+ */
+@XmlRootElement
+@XmlType(name = "DirectoryContents")
+@XmlSeeAlso(MakeOrUpdateDirEntry.class)
+public class DirectoryContents {
+	/**
+	 * The contents of the directory.
+	 */
+	@XmlElementRef
+	public List<DirEntryReference> contents;
+
+	/**
+	 * Make an empty directory description. Required for JAXB.
+	 */
+	public DirectoryContents() {
+		contents = new ArrayList<>();
+	}
+
+	/**
+	 * Make a directory description.
+	 * 
+	 * @param ui
+	 *            The factory for URIs.
+	 * @param collection
+	 *            The real directory contents that we are to describe.
+	 */
+	public DirectoryContents(UriInfo ui, Collection<DirectoryEntry> collection) {
+		contents = new ArrayList<>();
+		UriBuilder ub = secure(ui).path("{filename}");
+		for (DirectoryEntry e : collection)
+			contents.add(DirEntryReference.newInstance(ub, e));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
new file mode 100644
index 0000000..6b7aaff
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
@@ -0,0 +1,91 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.Response.ok;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.interfaces.File;
+
+/**
+ * Representation of a segment of a file to be read by JAX-RS.
+ * 
+ * @author Donal Fellows
+ */
+public class FileSegment {
+	/** The file to read a segment of. */
+	public final File file;
+	/** The offset of the first byte of the segment to read. */
+	public Integer from;
+	/** The offset of the first byte after the segment to read. */
+	public Integer to;
+
+	/**
+	 * Parse the HTTP Range header and determine what exact range of the file to
+	 * read.
+	 * 
+	 * @param f
+	 *            The file this refers to
+	 * @param range
+	 *            The content of the Range header.
+	 * @throws FilesystemAccessException
+	 *             If we can't determine the length of the file (shouldn't
+	 *             happen).
+	 */
+	public FileSegment(File f, String range) throws FilesystemAccessException {
+		file = f;
+		Matcher m = Pattern.compile("^\\s*bytes=(\\d*)-(\\d*)\\s*$").matcher(
+				range);
+		if (m.matches()) {
+			if (!m.group(1).isEmpty())
+				from = Integer.valueOf(m.group(1));
+			if (!m.group(2).isEmpty())
+				to = Integer.valueOf(m.group(2)) + 1;
+			int size = (int) f.getSize();
+			if (from == null) {
+				from = size - to;
+				to = size;
+			} else if (to == null)
+				to = size;
+			else if (to > size)
+				to = size;
+		}
+	}
+
+	/**
+	 * Convert to a response, as per RFC 2616.
+	 * 
+	 * @param type
+	 *            The expected type of the data.
+	 * @return A JAX-RS response.
+	 */
+	public Response toResponse(MediaType type) {
+		if (from == null && to == null)
+			return ok(file).type(type).build();
+		if (from >= to)
+			return ok("Requested range not satisfiable").status(416).build();
+		return ok(this).status(206).type(type).build();
+	}
+}
\ No newline at end of file