You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/23 11:20:17 UTC

[10/26] incubator-taverna-server git commit: Revert "temporarily empty repository"

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java
new file mode 100644
index 0000000..22158d5
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RemoteRunDelegate.java
@@ -0,0 +1,967 @@
+/*
+ * Copyright (C) 2010-2013 The University of Manchester
+ * 
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.master.worker;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Calendar.MINUTE;
+import static java.util.Collections.sort;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.logging.LogFactory.getLog;
+import static org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename;
+import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PipedOutputStream;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteDirectoryEntry;
+import org.taverna.server.localworker.remote.RemoteFile;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.remote.StillWorkingOnItException;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.BadPropertyValueException;
+import org.taverna.server.master.exceptions.BadStateChangeException;
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.exceptions.NoListenerException;
+import org.taverna.server.master.exceptions.OverloadedException;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Directory;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+import org.taverna.server.master.interfaces.File;
+import org.taverna.server.master.interfaces.Input;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Bridging shim between the WebApp world and the RMI world.
+ * 
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+public class RemoteRunDelegate implements TavernaRun {
+	private transient Log log = getLog("Taverna.Server.Worker");
+	transient TavernaSecurityContext secContext;
+	Date creationInstant;
+	Workflow workflow;
+	Date expiry;
+	HashSet<String> readers;
+	HashSet<String> writers;
+	HashSet<String> destroyers;
+	transient String id;
+	transient RemoteSingleRun run;
+	transient RunDBSupport db;
+	transient FactoryBean factory;
+	boolean doneTransitionToFinished;
+	boolean generateProvenance;// FIXME expose
+	String name;
+	private static final String ELLIPSIS = "...";
+
+	public RemoteRunDelegate(Date creationInstant, Workflow workflow,
+			RemoteSingleRun rsr, int defaultLifetime, RunDBSupport db, UUID id,
+			boolean generateProvenance, FactoryBean factory) {
+		if (rsr == null)
+			throw new IllegalArgumentException("remote run must not be null");
+		this.creationInstant = creationInstant;
+		this.workflow = workflow;
+		Calendar c = Calendar.getInstance();
+		c.add(MINUTE, defaultLifetime);
+		this.expiry = c.getTime();
+		this.run = rsr;
+		this.db = db;
+		this.generateProvenance = generateProvenance;
+		this.factory = factory;
+		try {
+			this.name = "";
+			String ci = " " + creationInstant;
+			String n = workflow.getName();
+			if (n.length() > NAME_LENGTH - ci.length())
+				n = n.substring(0,
+						NAME_LENGTH - ci.length() - ELLIPSIS.length())
+						+ ELLIPSIS;
+			this.name = n + ci;
+		} catch (Exception e) {
+			// Ignore; it's just a name, not something important.
+		}
+		if (id != null)
+			this.id = id.toString();
+	}
+
+	RemoteRunDelegate() {
+	}
+
+	/**
+	 * Get the types of listener supported by this run.
+	 * 
+	 * @return A list of listener type names.
+	 * @throws RemoteException
+	 *             If anything goes wrong.
+	 */
+	public List<String> getListenerTypes() throws RemoteException {
+		return run.getListenerTypes();
+	}
+
+	@Override
+	public void addListener(Listener listener) {
+		if (listener instanceof ListenerDelegate)
+			try {
+				run.addListener(((ListenerDelegate) listener).getRemote());
+			} catch (RemoteException e) {
+				log.warn("communication problem adding listener", e);
+			} catch (ImplementationException e) {
+				log.warn("implementation problem adding listener", e);
+			}
+		else
+			log.fatal("bad listener " + listener.getClass()
+					+ "; not applicable remotely!");
+	}
+
+	@Override
+	public String getId() {
+		if (id == null)
+			id = randomUUID().toString();
+		return id;
+	}
+
+	/**
+	 * Attach a listener to a workflow run and return its local delegate.
+	 * 
+	 * @param type
+	 *            The type of listener to create.
+	 * @param config
+	 *            The configuration of the listener.
+	 * @return The local delegate of the listener.
+	 * @throws NoListenerException
+	 *             If anything goes wrong.
+	 */
+	public Listener makeListener(String type, String config)
+			throws NoListenerException {
+		try {
+			return new ListenerDelegate(run.makeListener(type, config));
+		} catch (RemoteException e) {
+			throw new NoListenerException("failed to make listener", e);
+		}
+	}
+
+	@Override
+	public void destroy() {
+		try {
+			run.destroy();
+		} catch (RemoteException | ImplementationException e) {
+			log.warn("failed to destroy run", e);
+		}
+	}
+
+	@Override
+	public Date getExpiry() {
+		return new Date(expiry.getTime());
+	}
+
+	@Override
+	public List<Listener> getListeners() {
+		List<Listener> listeners = new ArrayList<>();
+		try {
+			for (RemoteListener rl : run.getListeners())
+				listeners.add(new ListenerDelegate(rl));
+		} catch (RemoteException e) {
+			log.warn("failed to get listeners", e);
+		}
+		return listeners;
+	}
+
+	@Override
+	public TavernaSecurityContext getSecurityContext() {
+		return secContext;
+	}
+
+	@Override
+	public Status getStatus() {
+		try {
+			switch (run.getStatus()) {
+			case Initialized:
+				return Status.Initialized;
+			case Operating:
+				return Status.Operating;
+			case Stopped:
+				return Status.Stopped;
+			case Finished:
+				return Status.Finished;
+			}
+		} catch (RemoteException e) {
+			log.warn("problem getting remote status", e);
+		}
+		return Status.Finished;
+	}
+
+	@Override
+	public Workflow getWorkflow() {
+		return workflow;
+	}
+
+	@Override
+	public Directory getWorkingDirectory() throws FilesystemAccessException {
+		try {
+			return new DirectoryDelegate(run.getWorkingDirectory());
+		} catch (Throwable e) {
+			if (e.getCause() != null)
+				e = e.getCause();
+			throw new FilesystemAccessException(
+					"problem getting main working directory handle", e);
+		}
+	}
+
+	@Override
+	public void setExpiry(Date d) {
+		if (d.after(new Date()))
+			expiry = new Date(d.getTime());
+		db.flushToDisk(this);
+	}
+
+	@Override
+	public String setStatus(Status s) throws BadStateChangeException {
+		try {
+			log.info("setting status of run " + id + " to " + s);
+			switch (s) {
+			case Initialized:
+				run.setStatus(RemoteStatus.Initialized);
+				break;
+			case Operating:
+				if (run.getStatus() == RemoteStatus.Initialized) {
+					if (!factory.isAllowingRunsToStart())
+						throw new OverloadedException();
+					secContext.conveySecurity();
+				}
+				run.setGenerateProvenance(generateProvenance);
+				run.setStatus(RemoteStatus.Operating);
+				factory.getMasterEventFeed()
+						.started(
+								this,
+								"started run execution",
+								"The execution of run '" + getName()
+										+ "' has started.");
+				break;
+			case Stopped:
+				run.setStatus(RemoteStatus.Stopped);
+				break;
+			case Finished:
+				run.setStatus(RemoteStatus.Finished);
+				break;
+			}
+			return null;
+		} catch (IllegalStateTransitionException e) {
+			throw new BadStateChangeException(e.getMessage());
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e.getMessage(), e.getCause());
+		} catch (GeneralSecurityException | IOException e) {
+			throw new BadStateChangeException(e.getMessage(), e);
+		} catch (ImplementationException e) {
+			if (e.getCause() != null)
+				throw new BadStateChangeException(e.getMessage(), e.getCause());
+			throw new BadStateChangeException(e.getMessage(), e);
+		} catch (StillWorkingOnItException e) {
+			log.info("still working on setting status of run " + id + " to "
+					+ s, e);
+			return e.getMessage();
+		} catch (InterruptedException e) {
+			throw new BadStateChangeException(
+					"interrupted while waiting to insert notification into database");
+		}
+	}
+
+	static void checkBadFilename(String filename)
+			throws FilesystemAccessException {
+		if (filename.startsWith("/"))
+			throw new FilesystemAccessException("filename may not be absolute");
+		if (Arrays.asList(filename.split("/")).contains(".."))
+			throw new FilesystemAccessException(
+					"filename may not refer to parent");
+	}
+
+	@Override
+	public String getInputBaclavaFile() {
+		try {
+			return run.getInputBaclavaFile();
+		} catch (RemoteException e) {
+			log.warn("problem when fetching input baclava file", e);
+			return null;
+		}
+	}
+
+	@Override
+	public List<Input> getInputs() {
+		ArrayList<Input> inputs = new ArrayList<>();
+		try {
+			for (RemoteInput ri : run.getInputs())
+				inputs.add(new RunInput(ri));
+		} catch (RemoteException e) {
+			log.warn("problem when fetching list of workflow inputs", e);
+		}
+		return inputs;
+	}
+
+	@Override
+	public String getOutputBaclavaFile() {
+		try {
+			return run.getOutputBaclavaFile();
+		} catch (RemoteException e) {
+			log.warn("problem when fetching output baclava file", e);
+			return null;
+		}
+	}
+
+	@Override
+	public Input makeInput(String name) throws BadStateChangeException {
+		try {
+			return new RunInput(run.makeInput(name));
+		} catch (RemoteException e) {
+			throw new BadStateChangeException("failed to make input", e);
+		}
+	}
+
+	@Override
+	public void setInputBaclavaFile(String filename)
+			throws FilesystemAccessException, BadStateChangeException {
+		checkBadFilename(filename);
+		try {
+			run.setInputBaclavaFile(filename);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException(
+					"cannot set input baclava file name", e);
+		}
+	}
+
+	@Override
+	public void setOutputBaclavaFile(String filename)
+			throws FilesystemAccessException, BadStateChangeException {
+		checkBadFilename(filename);
+		try {
+			run.setOutputBaclavaFile(filename);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException(
+					"cannot set output baclava file name", e);
+		}
+	}
+
+	@Override
+	public Date getCreationTimestamp() {
+		return creationInstant == null ? null : new Date(
+				creationInstant.getTime());
+	}
+
+	@Override
+	public Date getFinishTimestamp() {
+		try {
+			return run.getFinishTimestamp();
+		} catch (RemoteException e) {
+			log.info("failed to get finish timestamp", e);
+			return null;
+		}
+	}
+
+	@Override
+	public Date getStartTimestamp() {
+		try {
+			return run.getStartTimestamp();
+		} catch (RemoteException e) {
+			log.info("failed to get finish timestamp", e);
+			return null;
+		}
+	}
+
+	/**
+	 * @param readers
+	 *            the readers to set
+	 */
+	public void setReaders(Set<String> readers) {
+		this.readers = new HashSet<>(readers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the readers
+	 */
+	public Set<String> getReaders() {
+		return readers == null ? new HashSet<String>()
+				: unmodifiableSet(readers);
+	}
+
+	/**
+	 * @param writers
+	 *            the writers to set
+	 */
+	public void setWriters(Set<String> writers) {
+		this.writers = new HashSet<>(writers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the writers
+	 */
+	public Set<String> getWriters() {
+		return writers == null ? new HashSet<String>()
+				: unmodifiableSet(writers);
+	}
+
+	/**
+	 * @param destroyers
+	 *            the destroyers to set
+	 */
+	public void setDestroyers(Set<String> destroyers) {
+		this.destroyers = new HashSet<>(destroyers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the destroyers
+	 */
+	public Set<String> getDestroyers() {
+		return destroyers == null ? new HashSet<String>()
+				: unmodifiableSet(destroyers);
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+		out.writeUTF(secContext.getOwner().getName());
+		out.writeObject(secContext.getFactory());
+		out.writeObject(new MarshalledObject<>(run));
+	}
+
+	@Override
+	public boolean getGenerateProvenance() {
+		return generateProvenance;
+	}
+
+	@Override
+	public void setGenerateProvenance(boolean generateProvenance) {
+		this.generateProvenance = generateProvenance;
+		db.flushToDisk(this);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException,
+			ClassNotFoundException {
+		in.defaultReadObject();
+		if (log == null)
+			log = getLog("Taverna.Server.LocalWorker");
+		final String creatorName = in.readUTF();
+		SecurityContextFactory factory = (SecurityContextFactory) in
+				.readObject();
+		try {
+			secContext = factory.create(this,
+					new UsernamePrincipal(creatorName));
+		} catch (RuntimeException | IOException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new SecurityContextReconstructionException(e);
+		}
+		run = ((MarshalledObject<RemoteSingleRun>) in.readObject()).get();
+	}
+
+	public void setSecurityContext(TavernaSecurityContext tavernaSecurityContext) {
+		secContext = tavernaSecurityContext;
+	}
+
+	@Override
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public void setName(@Nonnull String name) {
+		if (name.length() > RunConnection.NAME_LENGTH)
+			this.name = name.substring(0, RunConnection.NAME_LENGTH);
+		else
+			this.name = name;
+		db.flushToDisk(this);
+	}
+
+	@Override
+	public void ping() throws UnknownRunException {
+		try {
+			run.ping();
+		} catch (RemoteException e) {
+			throw new UnknownRunException(e);
+		}
+	}
+}
+
+abstract class DEDelegate implements DirectoryEntry {
+	Log log = getLog("Taverna.Server.Worker");
+	private RemoteDirectoryEntry entry;
+	private String name;
+	private String full;
+	private Date cacheModTime;
+	private long cacheQueryTime = 0L;
+
+	DEDelegate(RemoteDirectoryEntry entry) {
+		this.entry = entry;
+	}
+
+	@Override
+	public void destroy() throws FilesystemAccessException {
+		try {
+			entry.destroy();
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to delete directory entry", e);
+		}
+	}
+
+	@Override
+	public String getFullName() {
+		if (full != null)
+			return full;
+		String n = getName();
+		RemoteDirectoryEntry re = entry;
+		try {
+			while (true) {
+				RemoteDirectory parent = re.getContainingDirectory();
+				if (parent == null)
+					break;
+				n = parent.getName() + "/" + n;
+				re = parent;
+			}
+		} catch (RemoteException e) {
+			log.warn("failed to generate full name", e);
+		}
+		return (full = n);
+	}
+
+	@Override
+	public String getName() {
+		if (name == null)
+			try {
+				name = entry.getName();
+			} catch (RemoteException e) {
+				log.error("failed to get name", e);
+			}
+		return name;
+	}
+
+	@Override
+	public Date getModificationDate() {
+		if (cacheModTime == null || currentTimeMillis() - cacheQueryTime < 5000)
+			try {
+				cacheModTime = entry.getModificationDate();
+				cacheQueryTime = currentTimeMillis();
+			} catch (RemoteException e) {
+				log.error("failed to get modification time", e);
+			}
+		return cacheModTime;
+	}
+
+	@Override
+	public int compareTo(DirectoryEntry de) {
+		return getFullName().compareTo(de.getFullName());
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return o != null && o instanceof DEDelegate
+				&& getFullName().equals(((DEDelegate) o).getFullName());
+	}
+
+	@Override
+	public int hashCode() {
+		return getFullName().hashCode();
+	}
+}
+
+class DirectoryDelegate extends DEDelegate implements Directory {
+	RemoteDirectory rd;
+
+	DirectoryDelegate(RemoteDirectory dir) {
+		super(dir);
+		rd = dir;
+	}
+
+	@Override
+	public Collection<DirectoryEntry> getContents()
+			throws FilesystemAccessException {
+		ArrayList<DirectoryEntry> result = new ArrayList<>();
+		try {
+			for (RemoteDirectoryEntry rde : rd.getContents()) {
+				if (rde instanceof RemoteDirectory)
+					result.add(new DirectoryDelegate((RemoteDirectory) rde));
+				else
+					result.add(new FileDelegate((RemoteFile) rde));
+			}
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to get directory contents", e);
+		}
+		return result;
+	}
+
+	@Override
+	public Collection<DirectoryEntry> getContentsByDate()
+			throws FilesystemAccessException {
+		ArrayList<DirectoryEntry> result = new ArrayList<>(getContents());
+		sort(result, new DateComparator());
+		return result;
+	}
+
+	static class DateComparator implements Comparator<DirectoryEntry> {
+		@Override
+		public int compare(DirectoryEntry a, DirectoryEntry b) {
+			return a.getModificationDate().compareTo(b.getModificationDate());
+		}
+	}
+
+	@Override
+	public File makeEmptyFile(Principal actor, String name)
+			throws FilesystemAccessException {
+		try {
+			return new FileDelegate(rd.makeEmptyFile(name));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to make empty file", e);
+		}
+	}
+
+	@Override
+	public Directory makeSubdirectory(Principal actor, String name)
+			throws FilesystemAccessException {
+		try {
+			return new DirectoryDelegate(rd.makeSubdirectory(name));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to make subdirectory",
+					e);
+		}
+	}
+
+	@Override
+	public ZipStream getContentsAsZip() throws FilesystemAccessException {
+		ZipStream zs = new ZipStream();
+
+		final ZipOutputStream zos;
+		try {
+			zos = new ZipOutputStream(new PipedOutputStream(zs));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("problem building zip stream",
+					e);
+		}
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					zipDirectory(rd, null, zos);
+				} catch (IOException e) {
+					log.warn("problem when zipping directory", e);
+				} finally {
+					closeQuietly(zos);
+				}
+			}
+		});
+		t.setDaemon(true);
+		t.start();
+		return zs;
+	}
+
+	/**
+	 * Compresses a directory tree into a ZIP.
+	 * 
+	 * @param dir
+	 *            The directory to compress.
+	 * @param base
+	 *            The base name of the directory (or <tt>null</tt> if this is
+	 *            the root directory of the ZIP).
+	 * @param zos
+	 *            Where to write the compressed data.
+	 * @throws RemoteException
+	 *             If some kind of problem happens with the remote delegates.
+	 * @throws IOException
+	 *             If we run into problems with reading or writing data.
+	 */
+	void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos)
+			throws RemoteException, IOException {
+		for (RemoteDirectoryEntry rde : dir.getContents()) {
+			String name = rde.getName();
+			if (base != null)
+				name = base + "/" + name;
+			if (rde instanceof RemoteDirectory) {
+				RemoteDirectory rd = (RemoteDirectory) rde;
+				zipDirectory(rd, name, zos);
+			} else {
+				RemoteFile rf = (RemoteFile) rde;
+				zos.putNextEntry(new ZipEntry(name));
+				try {
+					int off = 0;
+					while (true) {
+						byte[] c = rf.getContents(off, 64 * 1024);
+						if (c == null || c.length == 0)
+							break;
+						zos.write(c);
+						off += c.length;
+					}
+				} finally {
+					zos.closeEntry();
+				}
+			}
+		}
+	}
+}
+
+class FileDelegate extends DEDelegate implements File {
+	RemoteFile rf;
+
+	FileDelegate(RemoteFile f) {
+		super(f);
+		this.rf = f;
+	}
+
+	@Override
+	public byte[] getContents(int offset, int length)
+			throws FilesystemAccessException {
+		try {
+			return rf.getContents(offset, length);
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to read file contents",
+					e);
+		}
+	}
+
+	@Override
+	public long getSize() throws FilesystemAccessException {
+		try {
+			return rf.getSize();
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to get file length", e);
+		}
+	}
+
+	@Override
+	public void setContents(byte[] data) throws FilesystemAccessException {
+		try {
+			rf.setContents(data);
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to write file contents", e);
+		}
+	}
+
+	@Override
+	public void appendContents(byte[] data) throws FilesystemAccessException {
+		try {
+			rf.appendContents(data);
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to write file contents", e);
+		}
+	}
+
+	@Override
+	public void copy(File from) throws FilesystemAccessException {
+		FileDelegate fromFile;
+		try {
+			fromFile = (FileDelegate) from;
+		} catch (ClassCastException e) {
+			throw new FilesystemAccessException("different types of File?!");
+		}
+
+		try {
+			rf.copy(fromFile.rf);
+		} catch (Exception e) {
+			throw new FilesystemAccessException("failed to copy file contents",
+					e);
+		}
+		return;
+	}
+}
+
+class ListenerDelegate implements Listener {
+	private Log log = getLog("Taverna.Server.Worker");
+	private RemoteListener r;
+	String conf;
+
+	ListenerDelegate(RemoteListener l) {
+		r = l;
+	}
+
+	RemoteListener getRemote() {
+		return r;
+	}
+
+	@Override
+	public String getConfiguration() {
+		try {
+			if (conf == null)
+				conf = r.getConfiguration();
+		} catch (RemoteException e) {
+			log.warn("failed to get configuration", e);
+		}
+		return conf;
+	}
+
+	@Override
+	public String getName() {
+		try {
+			return r.getName();
+		} catch (RemoteException e) {
+			log.warn("failed to get name", e);
+			return "UNKNOWN NAME";
+		}
+	}
+
+	@Override
+	public String getProperty(String propName) throws NoListenerException {
+		try {
+			return r.getProperty(propName);
+		} catch (RemoteException e) {
+			throw new NoListenerException("no such property: " + propName, e);
+		}
+	}
+
+	@Override
+	public String getType() {
+		try {
+			return r.getType();
+		} catch (RemoteException e) {
+			log.warn("failed to get type", e);
+			return "UNKNOWN TYPE";
+		}
+	}
+
+	@Override
+	public String[] listProperties() {
+		try {
+			return r.listProperties();
+		} catch (RemoteException e) {
+			log.warn("failed to list properties", e);
+			return new String[0];
+		}
+	}
+
+	@Override
+	public void setProperty(String propName, String value)
+			throws NoListenerException, BadPropertyValueException {
+		try {
+			r.setProperty(propName, value);
+		} catch (RemoteException e) {
+			log.warn("failed to set property", e);
+			if (e.getCause() != null
+					&& e.getCause() instanceof RuntimeException)
+				throw new NoListenerException("failed to set property",
+						e.getCause());
+			if (e.getCause() != null && e.getCause() instanceof Exception)
+				throw new BadPropertyValueException("failed to set property",
+						e.getCause());
+			throw new BadPropertyValueException("failed to set property", e);
+		}
+	}
+}
+
+class RunInput implements Input {
+	private final RemoteInput i;
+
+	RunInput(RemoteInput remote) {
+		this.i = remote;
+	}
+
+	@Override
+	public String getFile() {
+		try {
+			return i.getFile();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public String getName() {
+		try {
+			return i.getName();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public String getValue() {
+		try {
+			return i.getValue();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void setFile(String file) throws FilesystemAccessException,
+			BadStateChangeException {
+		checkBadFilename(file);
+		try {
+			i.setFile(file);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException("cannot set file for input", e);
+		}
+	}
+
+	@Override
+	public void setValue(String value) throws BadStateChangeException {
+		try {
+			i.setValue(value);
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e);
+		}
+	}
+
+	@Override
+	public String getDelimiter() {
+		try {
+			return i.getDelimiter();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void setDelimiter(String delimiter) throws BadStateChangeException {
+		try {
+			if (delimiter != null)
+				delimiter = delimiter.substring(0, 1);
+			i.setDelimiter(delimiter);
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e);
+		}
+	}
+}
+
+@SuppressWarnings("serial")
+class SecurityContextReconstructionException extends RuntimeException {
+	public SecurityContextReconstructionException(Throwable t) {
+		super("failed to rebuild security context", t);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java
new file mode 100644
index 0000000..cf55ea0
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RunConnection.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2010-2013 The University of Manchester
+ * 
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.master.worker;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY;
+import static org.taverna.server.master.worker.RunConnection.SCHEMA;
+import static org.taverna.server.master.worker.RunConnection.TABLE;
+import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY;
+
+import java.io.IOException;
+import java.rmi.MarshalledObject;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.master.common.Credential;
+import org.taverna.server.master.common.Trust;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The representation of the connections to the runs that actually participates
+ * in the persistence system.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceCapable(table = TABLE, schema = SCHEMA)
+@Queries({
+		@Query(name = "count", language = "SQL", value = COUNT_QUERY, unique = "true", resultClass = Integer.class),
+		@Query(name = "names", language = "SQL", value = NAMES_QUERY, unique = "false", resultClass = String.class),
+		@Query(name = "unterminated", language = "SQL", value = UNTERMINATED_QUERY, unique = "false", resultClass = String.class),
+		@Query(name = "timedout", language = "SQL", value = TIMEOUT_QUERY, unique = "false", resultClass = String.class) })
+public class RunConnection {
+	static final String SCHEMA = "TAVERNA";
+	static final String TABLE = "RUN_CONNECTION";
+	private static final String FULL_NAME = SCHEMA + "." + TABLE;
+	static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME;
+	static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME;
+	static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME
+			+ "   WHERE expiry < CURRENT_TIMESTAMP";
+	static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME
+			+ "   WHERE doneTransitionToFinished = 0";
+	static final int NAME_LENGTH = 48; 
+
+	@PrimaryKey
+	@Column(length = 40)
+	private String id;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = NAME_LENGTH)
+	private String name;
+
+	@Persistent(defaultFetchGroup = "true")
+	private Date creationInstant;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Workflow workflow;
+
+	@Persistent(defaultFetchGroup = "true")
+	private Date expiry;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_READERS", column = "ID")
+	private String[] readers;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_WRITERS", column = "ID")
+	private String[] writers;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_DESTROYERS", column = "ID")
+	private String[] destroyers;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private MarshalledObject<RemoteSingleRun> run;
+
+	@Persistent(defaultFetchGroup = "true")
+	private int doneTransitionToFinished;
+
+	@Persistent(defaultFetchGroup = "true")
+	private int generateProvenance;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = 128)
+	String owner;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = 36)
+	private String securityToken;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private SecurityContextFactory securityContextFactory;
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Credential[] credentials;
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Trust[] trust;
+
+	private static final String[] STRING_ARY = new String[0];
+
+	public String getId() {
+		return id;
+	}
+
+	public boolean isFinished() {
+		return doneTransitionToFinished != 0;
+	}
+
+	public void setFinished(boolean finished) {
+		doneTransitionToFinished = (finished ? 1 : 0);
+	}
+
+	public boolean isProvenanceGenerated() {
+		return generateProvenance != 0;
+	}
+
+	public void setProvenanceGenerated(boolean generate) {
+		generateProvenance = (generate ? 1 : 0);
+	}
+
+	/**
+	 * Manufacture a persistent representation of the given workflow run. Must
+	 * be called within the context of a transaction.
+	 * 
+	 * @param rrd
+	 *            The remote delegate of the workflow run.
+	 * @return The persistent object.
+	 * @throws IOException
+	 *             If serialisation fails.
+	 */
+	@Nonnull
+	public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd)
+			throws IOException {
+		RunConnection rc = new RunConnection();
+		rc.id = rrd.id;
+		rc.makeChanges(rrd);
+		return rc;
+	}
+
+	private static List<String> list(String[] ary) {
+		if (ary == null)
+			return emptyList();
+		return asList(ary);
+	}
+
+	/**
+	 * Get the remote run delegate for a particular persistent connection. Must
+	 * be called within the context of a transaction.
+	 * 
+	 * @param db
+	 *            The database facade.
+	 * @return The delegate object.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nonnull
+	public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db)
+			throws Exception {
+		RemoteRunDelegate rrd = new RemoteRunDelegate();
+		rrd.id = getId();
+		rrd.creationInstant = creationInstant;
+		rrd.workflow = workflow;
+		rrd.expiry = expiry;
+		rrd.readers = new HashSet<>(list(readers));
+		rrd.writers = new HashSet<>(list(writers));
+		rrd.destroyers = new HashSet<>(list(destroyers));
+		rrd.run = run.get();
+		rrd.doneTransitionToFinished = isFinished();
+		rrd.generateProvenance = isProvenanceGenerated();
+		rrd.secContext = securityContextFactory.create(rrd,
+				new UsernamePrincipal(owner));
+		((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust);
+		rrd.db = db;
+		rrd.factory = db.getFactory();
+		rrd.name = name;
+		return rrd;
+	}
+
+	/**
+	 * Flush changes from a remote run delegate to the database. Must be called
+	 * within the context of a transaction.
+	 * 
+	 * @param rrd
+	 *            The remote run delegate object that has potential changes.
+	 * @throws IOException
+	 *             If anything goes wrong in serialization.
+	 */
+	public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		// Properties that are set exactly once
+		if (creationInstant == null) {
+			creationInstant = rrd.getCreationTimestamp();
+			workflow = rrd.getWorkflow();
+			run = new MarshalledObject<>(rrd.run);
+			securityContextFactory = rrd.getSecurityContext().getFactory();
+			owner = rrd.getSecurityContext().getOwner().getName();
+			securityToken = ((org.taverna.server.master.worker.SecurityContextFactory) securityContextFactory)
+					.issueNewPassword();
+		}
+		// Properties that are set multiple times
+		expiry = rrd.getExpiry();
+		readers = rrd.getReaders().toArray(STRING_ARY);
+		writers = rrd.getWriters().toArray(STRING_ARY);
+		destroyers = rrd.getDestroyers().toArray(STRING_ARY);
+		credentials = rrd.getSecurityContext().getCredentials();
+		trust = rrd.getSecurityContext().getTrusted();
+		if (rrd.name.length() > NAME_LENGTH)
+			this.name = rrd.name.substring(0, NAME_LENGTH);
+		else
+			this.name = rrd.name;
+		setFinished(rrd.doneTransitionToFinished);
+		setProvenanceGenerated(rrd.generateProvenance);
+	}
+
+	public String getSecurityToken() {
+		return securityToken;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java
new file mode 100644
index 0000000..2aa7ed1
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDBSupport.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2010-2011 The University of Manchester
+ * 
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.master.worker;
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.taverna.server.master.notification.NotificationEngine;
+
+/**
+ * The interface to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public interface RunDBSupport {
+	/**
+	 * Scan each run to see if it has finished yet and issue registered
+	 * notifications if it has.
+	 */
+	void checkForFinishNow();
+
+	/**
+	 * Remove currently-expired runs from this database.
+	 */
+	void cleanNow();
+
+	/**
+	 * How many runs are stored in the database.
+	 * 
+	 * @return The current size of the run table.
+	 */
+	int countRuns();
+
+	/**
+	 * Ensure that a run gets persisted in the database. It is assumed that the
+	 * value is already in there.
+	 * 
+	 * @param run
+	 *            The run to persist.
+	 */
+	void flushToDisk(@Nonnull RemoteRunDelegate run);
+
+	/**
+	 * Select an arbitrary representative run.
+	 * 
+	 * @return The selected run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nullable
+	RemoteRunDelegate pickArbitraryRun() throws Exception;
+
+	/**
+	 * Get a list of all the run names.
+	 * 
+	 * @return The names (i.e., UUIDs) of all the runs.
+	 */
+	@Nonnull
+	List<String> listRunNames();
+
+	/**
+	 * @param notificationEngine
+	 *            A reference to the notification fabric bean.
+	 */
+	void setNotificationEngine(NotificationEngine notificationEngine);
+
+	/**
+	 * @param notifier
+	 *            A reference to the bean that creates messages about workflow
+	 *            run termination.
+	 */
+	void setNotifier(CompletionNotifier notifier);
+
+	/**
+	 * @return A reference to the actual factory for remote runs.
+	 */
+	FactoryBean getFactory();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java
new file mode 100644
index 0000000..cedb4b5
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabase.java
@@ -0,0 +1,311 @@
+/*
+ * Copyright (C) 2010-2013 The University of Manchester
+ * 
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.master.worker;
+
+import static java.lang.Integer.parseInt;
+import static java.util.UUID.randomUUID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.RunStore;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.notification.NotificationEngine;
+import org.taverna.server.master.notification.NotificationEngine.Message;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The main facade bean that interfaces to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public class RunDatabase implements RunStore, RunDBSupport {
+	private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+	RunDatabaseDAO dao;
+	CompletionNotifier backupNotifier;
+	Map<String, CompletionNotifier> typedNotifiers;
+	private NotificationEngine notificationEngine;
+	@Autowired
+	private FactoryBean factory;
+	private Map<String, TavernaRun> cache = new HashMap<>();
+
+	@Override
+	@Required
+	public void setNotifier(CompletionNotifier n) {
+		backupNotifier = n;
+	}
+
+	public void setTypeNotifiers(List<CompletionNotifier> notifiers) {
+		typedNotifiers = new HashMap<>();
+		for (CompletionNotifier n : notifiers)
+			typedNotifiers.put(n.getName(), n);
+	}
+
+	@Required
+	@Override
+	public void setNotificationEngine(NotificationEngine notificationEngine) {
+		this.notificationEngine = notificationEngine;
+	}
+
+	@Required
+	public void setDao(RunDatabaseDAO dao) {
+		this.dao = dao;
+	}
+
+	@Override
+	public void checkForFinishNow() {
+		/*
+		 * Get which runs are actually newly finished; this requires getting the
+		 * candidates from the database and *then* doing the expensive requests
+		 * to the back end to find out the status.
+		 */
+		Map<String, RemoteRunDelegate> notifiable = new HashMap<>();
+		for (RemoteRunDelegate p : dao.getPotentiallyNotifiable())
+			if (p.getStatus() == Status.Finished)
+				notifiable.put(p.getId(), p);
+
+		// Check if there's nothing more to do
+		if (notifiable.isEmpty())
+			return;
+
+		/*
+		 * Tell the database about the ones we've got.
+		 */
+		dao.markFinished(notifiable.keySet());
+
+		/*
+		 * Send out the notifications. The notification addresses are stored in
+		 * the back-end engine, so this is *another* thing that can take time.
+		 */
+		for (RemoteRunDelegate rrd : notifiable.values())
+			for (Listener l : rrd.getListeners())
+				if (l.getName().equals("io")) {
+					try {
+						notifyFinished(rrd.id, l, rrd);
+					} catch (Exception e) {
+						log.warn("failed to do notification of completion", e);
+					}
+					break;
+				}
+	}
+
+	@Override
+	public void cleanNow() {
+		List<String> cleaned;
+		try {
+			cleaned = dao.doClean();
+		} catch (Exception e) {
+			log.warn("failure during deletion of expired runs", e);
+			return;
+		}
+		synchronized (cache) {
+			for (String id : cleaned)
+				cache.remove(id);
+		}
+	}
+
+	@Override
+	public int countRuns() {
+		return dao.countRuns();
+	}
+
+	@Override
+	public void flushToDisk(RemoteRunDelegate run) {
+		try {
+			dao.flushToDisk(run);
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"unexpected problem when persisting run record in database",
+					e);
+		}
+	}
+
+	@Override
+	public RemoteRunDelegate pickArbitraryRun() throws Exception {
+		return dao.pickArbitraryRun();
+	}
+
+	@Override
+	public List<String> listRunNames() {
+		return dao.listRunNames();
+	}
+
+	@Nullable
+	private TavernaRun get(String uuid) {
+		TavernaRun run = null;
+		synchronized (cache) {
+			run = cache.get(uuid);
+		}
+		try {
+			if (run != null)
+				run.ping();
+		} catch (UnknownRunException e) {
+			if (log.isDebugEnabled())
+				log.debug("stale mapping in cache?", e);
+			// Don't need to flush the cache; this happens when cleaning anyway
+			run = null;
+		}
+		if (run == null)
+			run = dao.get(uuid);
+		return run;
+	}
+
+	@Override
+	public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid)
+			throws UnknownRunException {
+		// Check first to see if the 'uuid' actually looks like a UUID; if
+		// not, throw it out immediately without logging an exception.
+		try {
+			UUID.fromString(uuid);
+		} catch (IllegalArgumentException e) {
+			if (log.isDebugEnabled())
+				log.debug("run ID does not look like UUID; rejecting...");
+			throw new UnknownRunException();
+		}
+		TavernaRun run = get(uuid);
+		if (run != null && (user == null || p.permitAccess(user, run)))
+			return run;
+		throw new UnknownRunException();
+	}
+
+	@Override
+	public TavernaRun getRun(String uuid) throws UnknownRunException {
+		TavernaRun run = get(uuid);
+		if (run != null)
+			return run;
+		throw new UnknownRunException();
+	}
+
+	@Override
+	public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+		synchronized (cache) {
+			Map<String, TavernaRun> cached = new HashMap<>();
+			for (Entry<String, TavernaRun> e : cache.entrySet()) {
+				TavernaRun r = e.getValue();
+				if (p.permitAccess(user, r))
+					cached.put(e.getKey(), r);
+			}
+			if (!cached.isEmpty())
+				return cached;
+		}
+		return dao.listRuns(user, p);
+	}
+
+	private void logLength(String message, Object obj) {
+		if (!log.isDebugEnabled())
+			return;
+		try {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+				oos.writeObject(obj);
+			}
+			log.debug(message + ": " + baos.size());
+		} catch (Exception e) {
+			log.warn("oops", e);
+		}
+	}
+
+	@Override
+	public String registerRun(TavernaRun run) {
+		if (!(run instanceof RemoteRunDelegate))
+			throw new IllegalArgumentException(
+					"run must be created by localworker package");
+		RemoteRunDelegate rrd = (RemoteRunDelegate) run;
+		if (rrd.id == null)
+			rrd.id = randomUUID().toString();
+		logLength("RemoteRunDelegate serialized length", rrd);
+		try {
+			dao.persistRun(rrd);
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"unexpected problem when persisting run record in database",
+					e);
+		}
+		synchronized (cache) {
+			cache.put(rrd.getId(), run);
+		}
+		return rrd.getId();
+	}
+
+	@Override
+	public void unregisterRun(String uuid) {
+		try {
+			if (dao.unpersistRun(uuid))
+				synchronized (cache) {
+					cache.remove(uuid);
+				}
+		} catch (RuntimeException e) {
+			if (log.isDebugEnabled())
+				log.debug("problem persisting the deletion of the run " + uuid,
+						e);
+		}
+	}
+
+	/**
+	 * Process the event that a run has finished.
+	 * 
+	 * @param name
+	 *            The name of the run.
+	 * @param io
+	 *            The io listener of the run (used to get information about the
+	 *            run).
+	 * @param run
+	 *            The handle to the run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	private void notifyFinished(final String name, Listener io,
+			final RemoteRunDelegate run) throws Exception {
+		String to = io.getProperty("notificationAddress");
+		final int code;
+		try {
+			code = parseInt(io.getProperty("exitcode"));
+		} catch (NumberFormatException nfe) {
+			// Ignore; not much we can do here...
+			return;
+		}
+
+		notificationEngine.dispatchMessage(run, to, new Message() {
+			private CompletionNotifier getNotifier(String type) {
+				CompletionNotifier n = typedNotifiers.get(type);
+				if (n == null)
+					n = backupNotifier;
+				return n;
+			}
+
+			@Override
+			public String getContent(String type) {
+				return getNotifier(type).makeCompletionMessage(name, run, code);
+			}
+
+			@Override
+			public String getTitle(String type) {
+				return getNotifier(type).makeMessageSubject(name, run, code);
+			}
+		});
+	}
+
+	@Override
+	public FactoryBean getFactory() {
+		return factory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java
new file mode 100644
index 0000000..51931c0
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RunDatabaseDAO.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright (C) 2010-2013 The University of Manchester
+ * 
+ * See the file "LICENSE" for license terms.
+ */
+package org.taverna.server.master.worker;
+
+import static org.taverna.server.master.worker.RunConnection.toDBform;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.utils.CallTimeLogger.PerfLogged;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * This handles storing runs, interfacing with the underlying state engine as
+ * necessary.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class RunDatabaseDAO extends JDOSupport<RunConnection> {
+	public RunDatabaseDAO() {
+		super(RunConnection.class);
+	}
+
+	private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+	private RunDatabase facade;
+
+	@Required
+	public void setFacade(RunDatabase facade) {
+		this.facade = facade;
+	}
+
+	// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+	@SuppressWarnings("unchecked")
+	private List<String> nameRuns() {
+		if (log.isDebugEnabled())
+			log.debug("fetching all run names");
+		return (List<String>) namedQuery("names").execute();
+	}
+
+	/**
+	 * @return The number of workflow runs in the database.
+	 */
+	@WithinSingleTransaction
+	public int countRuns() {
+		if (log.isDebugEnabled())
+			log.debug("counting the number of runs");
+		return (Integer) namedQuery("count").execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> expiredRuns() {
+		return (List<String>) namedQuery("timedout").execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> unterminatedRuns() {
+		return (List<String>) namedQuery("unterminated").execute();
+	}
+
+	@Nullable
+	private RunConnection pickRun(@Nonnull String name) {
+		if (log.isDebugEnabled())
+			log.debug("fetching the run called " + name);
+		try {
+			RunConnection rc = getById(name);
+			if (rc == null)
+				log.warn("no result for " + name);
+			return rc;
+		} catch (RuntimeException e) {
+			log.warn("problem in fetch", e);
+			throw e;
+		}
+	}
+
+	@Nullable
+	@WithinSingleTransaction
+	public String getSecurityToken(@Nonnull String name) {
+		RunConnection rc = getById(name);
+		if (rc == null)
+			return null;
+		return rc.getSecurityToken();
+	}
+
+	private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		persist(toDBform(rrd));
+	}
+
+	@Nonnull
+	private List<RunConnection> allRuns() {
+		try {
+			List<RunConnection> rcs = new ArrayList<>();
+			List<String> names = nameRuns();
+			for (String id : names) {
+				try {
+					if (id != null)
+						rcs.add(pickRun(id));
+				} catch (RuntimeException e) {
+					continue;
+				}
+			}
+			return rcs;
+		} catch (RuntimeException e) {
+			log.warn("problem in fetch", e);
+			throw e;
+		}
+	}
+
+	// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+	/**
+	 * Obtain a workflow run handle.
+	 * 
+	 * @param name
+	 *            The identifier of the run.
+	 * @return The run handle, or <tt>null</tt> if there is no such run.
+	 */
+	@Nullable
+	@WithinSingleTransaction
+	public TavernaRun get(String name) {
+		try {
+			RunConnection rc = pickRun(name);
+			return (rc == null) ? null : rc.fromDBform(facade);
+		} catch (Exception e) {
+			return null;
+		}
+	}
+
+	/**
+	 * Get the runs that a user can read things from.
+	 * 
+	 * @param user
+	 *            Who is asking?
+	 * @param p
+	 *            The policy that determines what they can see.
+	 * @return A mapping from run IDs to run handles.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+		Map<String, TavernaRun> result = new HashMap<>();
+		for (String id : nameRuns())
+			try {
+				RemoteRunDelegate rrd = pickRun(id).fromDBform(facade);
+				if (p.permitAccess(user, rrd))
+					result.put(id, rrd);
+			} catch (Exception e) {
+				continue;
+			}
+		return result;
+	}
+
+	/**
+	 * @return A list of the IDs for all workflow runs.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public List<String> listRunNames() {
+		List<String> runNames = new ArrayList<>();
+		for (RunConnection rc : allRuns())
+			if (rc.getId() != null)
+				runNames.add(rc.getId());
+		return runNames;
+	}
+
+	/**
+	 * @return An arbitrary, representative workflow run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nullable
+	@WithinSingleTransaction
+	public RemoteRunDelegate pickArbitraryRun() throws Exception {
+		for (RunConnection rc : allRuns()) {
+			if (rc.getId() == null)
+				continue;
+			return rc.fromDBform(facade);
+		}
+		return null;
+	}
+
+	/**
+	 * Make a workflow run persistent. Must only be called once per workflow
+	 * run.
+	 * 
+	 * @param rrd
+	 *            The workflow run to persist.
+	 * @throws IOException
+	 *             If anything goes wrong with serialisation of the run.
+	 */
+	@WithinSingleTransaction
+	public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		persist(rrd);
+	}
+
+	/**
+	 * Stop a workflow run from being persistent.
+	 * 
+	 * @param name
+	 *            The ID of the run.
+	 * @return Whether a deletion happened.
+	 */
+	@WithinSingleTransaction
+	public boolean unpersistRun(String name) {
+		RunConnection rc = pickRun(name);
+		if (rc != null)
+			delete(rc);
+		return rc != null;
+	}
+
+	/**
+	 * Ensure that the given workflow run is synchronized with the database.
+	 * 
+	 * @param run
+	 *            The run to synchronise.
+	 * @throws IOException
+	 *             If serialization of anything fails.
+	 */
+	@WithinSingleTransaction
+	public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException {
+		getById(run.id).makeChanges(run);
+	}
+
+	/**
+	 * Remove all workflow runs that have expired.
+	 * 
+	 * @return The ids of the deleted runs.
+	 */
+	@Nonnull
+	@PerfLogged
+	@WithinSingleTransaction
+	public List<String> doClean() {
+		if (log.isDebugEnabled())
+			log.debug("deleting runs that timed out before " + new Date());
+		List<String> toDelete = expiredRuns();
+		if (log.isDebugEnabled())
+			log.debug("found " + toDelete.size() + " runs to delete");
+		for (String id : toDelete) {
+			RunConnection rc = getById(id);
+			try {
+				rc.fromDBform(facade).run.destroy();
+			} catch (Exception e) {
+				if (log.isDebugEnabled())
+					log.debug("failed to delete execution resource for " + id,
+							e);
+			}
+			delete(rc);
+		}
+		return toDelete;
+	}
+
+	/**
+	 * @return A list of workflow runs that are candidates for doing
+	 *         notification of termination.
+	 */
+	@Nonnull
+	@PerfLogged
+	@WithinSingleTransaction
+	public List<RemoteRunDelegate> getPotentiallyNotifiable() {
+		List<RemoteRunDelegate> toNotify = new ArrayList<>();
+		for (String id : unterminatedRuns())
+			try {
+				RunConnection rc = getById(id);
+				toNotify.add(rc.fromDBform(facade));
+			} catch (Exception e) {
+				log.warn("failed to fetch connection token"
+						+ "for notification of completion check", e);
+			}
+		return toNotify;
+	}
+
+	@PerfLogged
+	@WithinSingleTransaction
+	public void markFinished(@Nonnull Set<String> terminated) {
+		for (String id : terminated) {
+			RunConnection rc = getById(id);
+			if (rc == null)
+				continue;
+			try {
+				rc.fromDBform(facade).doneTransitionToFinished = true;
+				rc.setFinished(true);
+			} catch (Exception e) {
+				log.warn("failed to note termination", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/0b04b1ab/server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java
----------------------------------------------------------------------
diff --git a/server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java b/server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java
new file mode 100644
index 0000000..29ac884
--- /dev/null
+++ b/server-webapp/src/main/java/org/taverna/server/master/worker/RunFactoryConfiguration.java
@@ -0,0 +1,395 @@
+package org.taverna.server.master.worker;
+
+import static org.springframework.jmx.support.MetricType.COUNTER;
+import static org.springframework.jmx.support.MetricType.GAUGE;
+import static org.taverna.server.master.TavernaServer.JMX_ROOT;
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.annotation.Order;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedMetric;
+import org.springframework.jmx.export.annotation.ManagedResource;
+import org.taverna.server.master.factories.ConfigurableRunFactory;
+import org.taverna.server.master.localworker.LocalWorkerState;
+
+@ManagedResource(objectName = JMX_ROOT + "Factory", description = "The factory for runs.")
+public abstract class RunFactoryConfiguration implements ConfigurableRunFactory {
+	protected Log log = LogFactory.getLog("Taverna.Server.Worker");
+	protected LocalWorkerState state;
+	protected RunDBSupport runDB;
+	private int totalRuns = 0;
+
+	@PreDestroy
+	void closeLog() {
+		log = null;
+	}
+
+	@Autowired(required = true)
+	@Order(0)
+	void setState(LocalWorkerState state) {
+		this.state = state;
+	}
+
+	@Autowired(required = true)
+	@Order(0)
+	void setRunDB(RunDBSupport runDB) {
+		this.runDB = runDB;
+	}
+
+	/**
+	 * Drop any current references to the registry of runs, and kill off that
+	 * process.
+	 */
+	protected abstract void reinitRegistry();
+
+	/**
+	 * Drop any current references to the run factory subprocess and kill it
+	 * off.
+	 */
+	protected abstract void reinitFactory();
+
+	/** Count the number of operating runs. */
+	protected abstract int operatingCount() throws Exception;
+
+	protected final synchronized void incrementRunCount() {
+		totalRuns++;
+	}
+
+	@Override
+	@ManagedAttribute(description = "Whether it is allowed to start a run executing.", currencyTimeLimit = 30)
+	public final boolean isAllowingRunsToStart() {
+		try {
+			return state.getOperatingLimit() > getOperatingCount();
+		} catch (Exception e) {
+			log.info("failed to get operating run count", e);
+			return false;
+		}
+	}
+
+	@Override
+	@ManagedAttribute(description = "The host holding the RMI registry to communicate via.")
+	public final String getRegistryHost() {
+		return state.getRegistryHost();
+	}
+
+	@Override
+	@ManagedAttribute(description = "The host holding the RMI registry to communicate via.")
+	public final void setRegistryHost(String host) {
+		state.setRegistryHost(host);
+		reinitRegistry();
+		reinitFactory();
+	}
+
+	@Override
+	@ManagedAttribute(description = "The port number of the RMI registry. Should not normally be set.")
+	public final int getRegistryPort() {
+		return state.getRegistryPort();
+	}
+
+	@Override
+	@ManagedAttribute(description = "The port number of the RMI registry. Should not normally be set.")
+	public final void setRegistryPort(int port) {
+		state.setRegistryPort(port);
+		reinitRegistry();
+		reinitFactory();
+	}
+
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "What JAR do we use to start the RMI registry process?")
+	public final String getRmiRegistryJar() {
+		return state.getRegistryJar();
+	}
+
+	@Override
+	@ManagedAttribute(description = "What JAR do we use to start the RMI registry process?")
+	public final void setRmiRegistryJar(String rmiRegistryJar) {
+		state.setRegistryJar(rmiRegistryJar);
+		reinitRegistry();
+		reinitFactory();
+	}
+
+	@Override
+	@ManagedAttribute(description = "The maximum number of simultaneous runs supported by the server.", currencyTimeLimit = 300)
+	public final int getMaxRuns() {
+		return state.getMaxRuns();
+	}
+
+	@Override
+	@ManagedAttribute(description = "The maximum number of simultaneous runs supported by the server.", currencyTimeLimit = 300)
+	public final void setMaxRuns(int maxRuns) {
+		state.setMaxRuns(maxRuns);
+	}
+
+	/** @return How many minutes should a workflow live by default? */
+	@Override
+	@ManagedAttribute(description = "How many minutes should a workflow live by default?", currencyTimeLimit = 300)
+	public final int getDefaultLifetime() {
+		return state.getDefaultLifetime();
+	}
+
+	/**
+	 * Set how long a workflow should live by default.
+	 * 
+	 * @param defaultLifetime
+	 *            Default lifetime, in minutes.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many minutes should a workflow live by default?", currencyTimeLimit = 300)
+	public final void setDefaultLifetime(int defaultLifetime) {
+		state.setDefaultLifetime(defaultLifetime);
+	}
+
+	/**
+	 * @return How many milliseconds to wait between checks to see if a worker
+	 *         process has registered.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many milliseconds to wait between checks to see if a worker process has registered.", currencyTimeLimit = 300)
+	public final int getSleepTime() {
+		return state.getSleepMS();
+	}
+
+	/**
+	 * @param sleepTime
+	 *            How many milliseconds to wait between checks to see if a
+	 *            worker process has registered.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many milliseconds to wait between checks to see if a worker process has registered.", currencyTimeLimit = 300)
+	public final void setSleepTime(int sleepTime) {
+		state.setSleepMS(sleepTime);
+	}
+
+	/**
+	 * @return How many seconds to wait for a worker process to register itself.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many seconds to wait for a worker process to register itself.", currencyTimeLimit = 300)
+	public final int getWaitSeconds() {
+		return state.getWaitSeconds();
+	}
+
+	/**
+	 * @param seconds
+	 *            How many seconds to wait for a worker process to register
+	 *            itself.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many seconds to wait for a worker process to register itself.", currencyTimeLimit = 300)
+	public final void setWaitSeconds(int seconds) {
+		state.setWaitSeconds(seconds);
+	}
+
+	/** @return The script to run to start running a workflow. */
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "The script to run to start running a workflow.", currencyTimeLimit = 300)
+	public final String getExecuteWorkflowScript() {
+		return state.getExecuteWorkflowScript();
+	}
+
+	/**
+	 * @param executeWorkflowScript
+	 *            The script to run to start running a workflow.
+	 */
+	@Override
+	@ManagedAttribute(description = "The script to run to start running a workflow.", currencyTimeLimit = 300)
+	public final void setExecuteWorkflowScript(String executeWorkflowScript) {
+		state.setExecuteWorkflowScript(executeWorkflowScript);
+		reinitFactory();
+	}
+
+	/** @return The location of the JAR implementing the server worker processes. */
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "The location of the JAR implementing the server worker processes.")
+	public final String getServerWorkerJar() {
+		return state.getServerWorkerJar();
+	}
+
+	/**
+	 * @param serverWorkerJar
+	 *            The location of the JAR implementing the server worker
+	 *            processes.
+	 */
+	@Override
+	@ManagedAttribute(description = "The location of the JAR implementing the server worker processes.")
+	public final void setServerWorkerJar(String serverWorkerJar) {
+		state.setServerWorkerJar(serverWorkerJar);
+		reinitFactory();
+	}
+
+	/** @return The list of additional arguments used to make a worker process. */
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "The list of additional arguments used to make a worker process.", currencyTimeLimit = 300)
+	public final String[] getExtraArguments() {
+		return state.getExtraArgs();
+	}
+
+	/**
+	 * @param extraArguments
+	 *            The list of additional arguments used to make a worker
+	 *            process.
+	 */
+	@Override
+	@ManagedAttribute(description = "The list of additional arguments used to make a worker process.", currencyTimeLimit = 300)
+	public final void setExtraArguments(@Nonnull String[] extraArguments) {
+		state.setExtraArgs(extraArguments);
+		reinitFactory();
+	}
+
+	/** @return Which java executable to run. */
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "Which java executable to run.", currencyTimeLimit = 300)
+	public final String getJavaBinary() {
+		return state.getJavaBinary();
+	}
+
+	/**
+	 * @param javaBinary
+	 *            Which java executable to run.
+	 */
+	@Override
+	@ManagedAttribute(description = "Which java executable to run.", currencyTimeLimit = 300)
+	public final void setJavaBinary(@Nonnull String javaBinary) {
+		state.setJavaBinary(javaBinary);
+		reinitFactory();
+	}
+
+	/**
+	 * @return A file containing a password to use when running a program as
+	 *         another user (e.g., with sudo).
+	 */
+	@Nullable
+	@Override
+	@ManagedAttribute(description = "A file containing a password to use when running a program as another user (e.g., with sudo).", currencyTimeLimit = 300)
+	public final String getPasswordFile() {
+		return state.getPasswordFile();
+	}
+
+	/**
+	 * @param passwordFile
+	 *            A file containing a password to use when running a program as
+	 *            another user (e.g., with sudo).
+	 */
+	@Override
+	@ManagedAttribute(description = "A file containing a password to use when running a program as another user (e.g., with sudo).", currencyTimeLimit = 300)
+	public final void setPasswordFile(@Nullable String passwordFile) {
+		state.setPasswordFile(passwordFile);
+		reinitFactory();
+	}
+
+	/**
+	 * @return The location of the JAR implementing the secure-fork process.
+	 */
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "The location of the JAR implementing the secure-fork process.", currencyTimeLimit = 300)
+	public final String getServerForkerJar() {
+		return state.getServerForkerJar();
+	}
+
+	/**
+	 * @param serverForkerJar
+	 *            The location of the JAR implementing the secure-fork process.
+	 */
+	@Override
+	@ManagedAttribute(description = "The location of the JAR implementing the secure-fork process.", currencyTimeLimit = 300)
+	public final void setServerForkerJar(String forkerJarFilename) {
+		state.setServerForkerJar(forkerJarFilename);
+		reinitFactory();
+	}
+
+	/**
+	 * @return How many times has a workflow run been spawned by this engine.
+	 *         Restarts reset this counter.
+	 */
+	@Override
+	@ManagedMetric(description = "How many times has a workflow run been spawned by this engine.", currencyTimeLimit = 10, metricType = COUNTER, category = "throughput")
+	public final synchronized int getTotalRuns() {
+		return totalRuns;
+	}
+
+	/**
+	 * @return How many checks were done for the worker process the last time a
+	 *         spawn was tried.
+	 */
+	@Override
+	@ManagedAttribute(description = "How many checks were done for the worker process the last time a spawn was tried.", currencyTimeLimit = 60)
+	public abstract int getLastStartupCheckCount();
+
+	@Nonnull
+	@Override
+	@ManagedAttribute(description = "The names of the current runs.", currencyTimeLimit = 5)
+	public final String[] getCurrentRunNames() {
+		List<String> names = runDB.listRunNames();
+		return names.toArray(new String[names.size()]);
+	}
+
+	@Override
+	@ManagedAttribute(description = "What the factory subprocess's main RMI interface is registered as.", currencyTimeLimit = 60)
+	public abstract String getFactoryProcessName();
+
+	/**
+	 * @return What was the exit code from the last time the factory subprocess
+	 *         was killed?
+	 */
+	@Override
+	@ManagedAttribute(description = "What was the exit code from the last time the factory subprocess was killed?")
+	public abstract Integer getLastExitCode();
+
+	/**
+	 * @return The mapping of user names to RMI factory IDs.
+	 */
+	@Override
+	@ManagedAttribute(description = "The mapping of user names to RMI factory IDs.", currencyTimeLimit = 60)
+	public abstract String[] getFactoryProcessMapping();
+
+	@Override
+	@ManagedAttribute(description = "The maximum number of simultaneous operating runs supported by the server.", currencyTimeLimit = 300)
+	public final void setOperatingLimit(int operatingLimit) {
+		state.setOperatingLimit(operatingLimit);
+	}
+
+	@Override
+	@ManagedAttribute(description = "The maximum number of simultaneous operating runs supported by the server.", currencyTimeLimit = 300)
+	public final int getOperatingLimit() {
+		return state.getOperatingLimit();
+	}
+
+	/**
+	 * @return A count of the number of runs believed to actually be in the
+	 *         {@linkplain uk.org.taverna.server.master.common.Status#Operating
+	 *         operating} state.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Override
+	@ManagedMetric(description = "How many workflow runs are currently actually executing.", currencyTimeLimit = 10, metricType = GAUGE, category = "throughput")
+	public final int getOperatingCount() throws Exception {
+		return operatingCount();
+	}
+
+	@Override
+	@ManagedAttribute(description="Whether to tell a workflow to generate provenance bundles by default.")
+	public final void setGenerateProvenance(boolean genProv) {
+		state.setGenerateProvenance(genProv);
+	}
+
+	@Override
+	@ManagedAttribute(description="Whether to tell a workflow to generate provenance bundles by default.")
+	public final boolean getGenerateProvenance() {
+		return state.getGenerateProvenance();
+	}
+}