You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2018/01/09 23:30:37 UTC

[21/42] incubator-taverna-server git commit: package org.taverna -> org.apache.taverna

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
new file mode 100644
index 0000000..1868f94
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
@@ -0,0 +1,58 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * How to convert a notification about the completion of a job into a message.
+ * 
+ * @author Donal Fellows
+ */
+public interface CompletionNotifier {
+	/**
+	 * @return The name of this notifier.
+	 */
+	String getName();
+
+	/**
+	 * Called to get the content of a message that a workflow run has finished.
+	 * 
+	 * @param name
+	 *            The name of the run.
+	 * @param run
+	 *            What run are we talking about.
+	 * @param code
+	 *            What the exit code was.
+	 * @return The plain-text content of the message.
+	 */
+	String makeCompletionMessage(String name, RemoteRunDelegate run, int code);
+
+	/**
+	 * Called to get the subject of the message to dispatch.
+	 * 
+	 * @param name
+	 *            The name of the run.
+	 * @param run
+	 *            What run are we talking about.
+	 * @param code
+	 *            What the exit code was.
+	 * @return The plain-text subject of the message.
+	 */
+	String makeMessageSubject(String name, RemoteRunDelegate run, int code);
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
new file mode 100644
index 0000000..d38f0cc
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
@@ -0,0 +1,39 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.taverna.server.master.notification.atom.EventDAO;
+
+/**
+ * What the remote run really needs of its factory.
+ * 
+ * @author Donal Fellows
+ */
+public interface FactoryBean {
+	/**
+	 * @return Whether a run can actually be started at this time.
+	 */
+	boolean isAllowingRunsToStart();
+
+	/**
+	 * @return a handle to the master Atom event feed (<i>not</i> the per-run
+	 *         feed)
+	 */
+	EventDAO getMasterEventFeed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
new file mode 100644
index 0000000..649db64
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
@@ -0,0 +1,73 @@
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple password issuing bean.
+ * 
+ * @author Donal Fellows
+ */
+public class PasswordIssuer {
+	private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g',
+			'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
+			'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
+			'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
+			'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', '3', '4', '5', '6', '7',
+			'8', '9', '0', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')',
+			',', '.', '<', '>', '/', '?', ':', ';', '-', '_', '+', '[', ']',
+			'{', '}', '`', '~' };
+	private Log log = LogFactory.getLog("Taverna.Server.Worker");
+	private SecureRandom r;
+	private int length;
+
+	public PasswordIssuer() {
+		r = new SecureRandom();
+		log.info("constructing passwords with " + r.getAlgorithm());
+		setLength(8);
+	}
+
+	public PasswordIssuer(String algorithm) throws NoSuchAlgorithmException {
+		r = SecureRandom.getInstance(algorithm);
+		log.info("constructing passwords with " + r.getAlgorithm());
+		setLength(8);
+	}
+
+	public void setLength(int length) {
+		this.length = length;
+		log.info("issued password will be " + this.length
+				+ " symbols chosen from " + ALPHABET.length);
+	}
+
+	/**
+	 * Issue a password.
+	 * 
+	 * @return The new password.
+	 */
+	public String issue() {
+		StringBuilder sb = new StringBuilder();
+		for (int i = 0; i < length; i++)
+			sb.append(ALPHABET[r.nextInt(ALPHABET.length)]);
+		log.info("issued new password of length " + sb.length());
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
new file mode 100644
index 0000000..37d5760
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
@@ -0,0 +1,171 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.identity.WorkflowInternalAuthProvider.PREFIX;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.taverna.server.master.common.Roles;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.NoCreateException;
+import org.taverna.server.master.exceptions.NoDestroyException;
+import org.taverna.server.master.exceptions.NoUpdateException;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Basic policy implementation that allows any workflow to be instantiated by
+ * any user, but which does not permit users to access each others workflow
+ * runs. It also imposes a global limit on the number of workflow runs at once.
+ * 
+ * @author Donal Fellows
+ */
+class PolicyImpl implements Policy {
+	Log log = LogFactory.getLog("Taverna.Server.Worker.Policy");
+	private PolicyLimits limits;
+	private RunDBSupport runDB;
+
+	@Required
+	public void setLimits(PolicyLimits limits) {
+		this.limits = limits;
+	}
+
+	@Required
+	public void setRunDB(RunDBSupport runDB) {
+		this.runDB = runDB;
+	}
+
+	@Override
+	public int getMaxRuns() {
+		return limits.getMaxRuns();
+	}
+
+	@Override
+	public Integer getMaxRuns(UsernamePrincipal user) {
+		return null;
+	}
+
+	@Override
+	public int getOperatingLimit() {
+		return limits.getOperatingLimit();
+	}
+
+	@Override
+	public List<URI> listPermittedWorkflowURIs(UsernamePrincipal user) {
+		return limits.getPermittedWorkflowURIs();
+	}
+
+	private boolean isSelfAccess(String runId) {
+		Authentication auth = SecurityContextHolder.getContext()
+				.getAuthentication();
+		boolean self = false;
+		String id = null;
+		for (GrantedAuthority a : auth.getAuthorities()) {
+			String aa = a.getAuthority();
+			if (aa.equals(Roles.SELF)) {
+				self = true;
+				continue;
+			}
+			if (!aa.startsWith(PREFIX))
+				continue;
+			id = aa.substring(PREFIX.length());
+		}
+		return self && runId.equals(id);
+	}
+
+	@Override
+	public boolean permitAccess(UsernamePrincipal user, TavernaRun run) {
+		String username = user.getName();
+		TavernaSecurityContext context = run.getSecurityContext();
+		if (context.getOwner().getName().equals(username)) {
+			if (log.isDebugEnabled())
+				log.debug("granted access by " + user.getName() + " to "
+						+ run.getId());
+			return true;
+		}
+		if (isSelfAccess(run.getId())) {
+			if (log.isDebugEnabled())
+				log.debug("access by workflow to itself: " + run.getId());
+			return true;
+		}
+		if (log.isDebugEnabled())
+			log.debug("considering access by " + user.getName() + " to "
+					+ run.getId());
+		return context.getPermittedReaders().contains(username);
+	}
+
+	@Override
+	public void permitCreate(UsernamePrincipal user, Workflow workflow)
+			throws NoCreateException {
+		if (user == null)
+			throw new NoCreateException(
+					"anonymous workflow creation not allowed");
+		if (runDB.countRuns() >= getMaxRuns())
+			throw new NoCreateException("server load exceeded; please wait");
+	}
+
+	@Override
+	public synchronized void permitDestroy(UsernamePrincipal user, TavernaRun run)
+			throws NoDestroyException {
+		if (user == null)
+			throw new NoDestroyException();
+		String username = user.getName();
+		TavernaSecurityContext context = run.getSecurityContext();
+		if (context.getOwner() == null
+				|| context.getOwner().getName().equals(username))
+			return;
+		if (!context.getPermittedDestroyers().contains(username))
+			throw new NoDestroyException();
+	}
+
+	@Override
+	public void permitUpdate(UsernamePrincipal user, TavernaRun run)
+			throws NoUpdateException {
+		if (user == null)
+			throw new NoUpdateException(
+					"workflow run not owned by you and you're not granted access");
+		TavernaSecurityContext context = run.getSecurityContext();
+		if (context.getOwner().getName().equals(user.getName()))
+			return;
+		if (isSelfAccess(run.getId())) {
+			if (log.isDebugEnabled())
+				log.debug("update access by workflow to itself: " + run.getId());
+			return;
+		}
+		if (!context.getPermittedUpdaters().contains(user.getName()))
+			throw new NoUpdateException(
+					"workflow run not owned by you and you're not granted access");
+	}
+
+	@Override
+	public void setPermittedWorkflowURIs(UsernamePrincipal user,
+			List<URI> permitted) {
+		limits.setPermittedWorkflowURIs(permitted);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
new file mode 100644
index 0000000..43c0aa4
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
@@ -0,0 +1,56 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.util.List;
+
+import org.taverna.server.master.common.Status;
+
+/**
+ * The worker policy delegates certain limits to the state model of the
+ * particular worker.
+ * 
+ * @author Donal Fellows
+ */
+public interface PolicyLimits {
+	/**
+	 * @return the maximum number of extant workflow runs in any state
+	 */
+	int getMaxRuns();
+
+	/**
+	 * @return the maximum number of workflow runs in the
+	 *         {@linkplain Status#Operating operating} state.
+	 */
+	int getOperatingLimit();
+
+	/**
+	 * @return the list of URIs to workflows that may be used to create workflow
+	 *         runs. If empty or <tt>null</tt>, no restriction is present.
+	 */
+	List<URI> getPermittedWorkflowURIs();
+
+	/**
+	 * @param permitted
+	 *            the list of URIs to workflows that may be used to create
+	 *            workflow runs.
+	 */
+	void setPermittedWorkflowURIs(List<URI> permitted);
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
new file mode 100644
index 0000000..fb1ac47
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
@@ -0,0 +1,980 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Calendar.MINUTE;
+import static java.util.Collections.sort;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.logging.LogFactory.getLog;
+import static org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename;
+import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PipedOutputStream;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteDirectoryEntry;
+import org.taverna.server.localworker.remote.RemoteFile;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.remote.StillWorkingOnItException;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.BadPropertyValueException;
+import org.taverna.server.master.exceptions.BadStateChangeException;
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.exceptions.NoListenerException;
+import org.taverna.server.master.exceptions.OverloadedException;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Directory;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+import org.taverna.server.master.interfaces.File;
+import org.taverna.server.master.interfaces.Input;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Bridging shim between the WebApp world and the RMI world.
+ * 
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+public class RemoteRunDelegate implements TavernaRun {
+	private transient Log log = getLog("Taverna.Server.Worker");
+	transient TavernaSecurityContext secContext;
+	Date creationInstant;
+	Workflow workflow;
+	Date expiry;
+	HashSet<String> readers;
+	HashSet<String> writers;
+	HashSet<String> destroyers;
+	transient String id;
+	transient RemoteSingleRun run;
+	transient RunDBSupport db;
+	transient FactoryBean factory;
+	boolean doneTransitionToFinished;
+	boolean generateProvenance;// FIXME expose
+	String name;
+	private static final String ELLIPSIS = "...";
+
+	public RemoteRunDelegate(Date creationInstant, Workflow workflow,
+			RemoteSingleRun rsr, int defaultLifetime, RunDBSupport db, UUID id,
+			boolean generateProvenance, FactoryBean factory) {
+		if (rsr == null)
+			throw new IllegalArgumentException("remote run must not be null");
+		this.creationInstant = creationInstant;
+		this.workflow = workflow;
+		Calendar c = Calendar.getInstance();
+		c.add(MINUTE, defaultLifetime);
+		this.expiry = c.getTime();
+		this.run = rsr;
+		this.db = db;
+		this.generateProvenance = generateProvenance;
+		this.factory = factory;
+		try {
+			this.name = "";
+			String ci = " " + creationInstant;
+			String n = workflow.getName();
+			if (n.length() > NAME_LENGTH - ci.length())
+				n = n.substring(0,
+						NAME_LENGTH - ci.length() - ELLIPSIS.length())
+						+ ELLIPSIS;
+			this.name = n + ci;
+		} catch (Exception e) {
+			// Ignore; it's just a name, not something important.
+		}
+		if (id != null)
+			this.id = id.toString();
+	}
+
+	RemoteRunDelegate() {
+	}
+
+	/**
+	 * Get the types of listener supported by this run.
+	 * 
+	 * @return A list of listener type names.
+	 * @throws RemoteException
+	 *             If anything goes wrong.
+	 */
+	public List<String> getListenerTypes() throws RemoteException {
+		return run.getListenerTypes();
+	}
+
+	@Override
+	public void addListener(Listener listener) {
+		if (listener instanceof ListenerDelegate)
+			try {
+				run.addListener(((ListenerDelegate) listener).getRemote());
+			} catch (RemoteException e) {
+				log.warn("communication problem adding listener", e);
+			} catch (ImplementationException e) {
+				log.warn("implementation problem adding listener", e);
+			}
+		else
+			log.fatal("bad listener " + listener.getClass()
+					+ "; not applicable remotely!");
+	}
+
+	@Override
+	public String getId() {
+		if (id == null)
+			id = randomUUID().toString();
+		return id;
+	}
+
+	/**
+	 * Attach a listener to a workflow run and return its local delegate.
+	 * 
+	 * @param type
+	 *            The type of listener to create.
+	 * @param config
+	 *            The configuration of the listener.
+	 * @return The local delegate of the listener.
+	 * @throws NoListenerException
+	 *             If anything goes wrong.
+	 */
+	public Listener makeListener(String type, String config)
+			throws NoListenerException {
+		try {
+			return new ListenerDelegate(run.makeListener(type, config));
+		} catch (RemoteException e) {
+			throw new NoListenerException("failed to make listener", e);
+		}
+	}
+
+	@Override
+	public void destroy() {
+		try {
+			run.destroy();
+		} catch (RemoteException | ImplementationException e) {
+			log.warn("failed to destroy run", e);
+		}
+	}
+
+	@Override
+	public Date getExpiry() {
+		return new Date(expiry.getTime());
+	}
+
+	@Override
+	public List<Listener> getListeners() {
+		List<Listener> listeners = new ArrayList<>();
+		try {
+			for (RemoteListener rl : run.getListeners())
+				listeners.add(new ListenerDelegate(rl));
+		} catch (RemoteException e) {
+			log.warn("failed to get listeners", e);
+		}
+		return listeners;
+	}
+
+	@Override
+	public TavernaSecurityContext getSecurityContext() {
+		return secContext;
+	}
+
+	@Override
+	public Status getStatus() {
+		try {
+			switch (run.getStatus()) {
+			case Initialized:
+				return Status.Initialized;
+			case Operating:
+				return Status.Operating;
+			case Stopped:
+				return Status.Stopped;
+			case Finished:
+				return Status.Finished;
+			}
+		} catch (RemoteException e) {
+			log.warn("problem getting remote status", e);
+		}
+		return Status.Finished;
+	}
+
+	@Override
+	public Workflow getWorkflow() {
+		return workflow;
+	}
+
+	@Override
+	public Directory getWorkingDirectory() throws FilesystemAccessException {
+		try {
+			return new DirectoryDelegate(run.getWorkingDirectory());
+		} catch (Throwable e) {
+			if (e.getCause() != null)
+				e = e.getCause();
+			throw new FilesystemAccessException(
+					"problem getting main working directory handle", e);
+		}
+	}
+
+	@Override
+	public void setExpiry(Date d) {
+		if (d.after(new Date()))
+			expiry = new Date(d.getTime());
+		db.flushToDisk(this);
+	}
+
+	@Override
+	public String setStatus(Status s) throws BadStateChangeException {
+		try {
+			log.info("setting status of run " + id + " to " + s);
+			switch (s) {
+			case Initialized:
+				run.setStatus(RemoteStatus.Initialized);
+				break;
+			case Operating:
+				if (run.getStatus() == RemoteStatus.Initialized) {
+					if (!factory.isAllowingRunsToStart())
+						throw new OverloadedException();
+					secContext.conveySecurity();
+				}
+				run.setGenerateProvenance(generateProvenance);
+				run.setStatus(RemoteStatus.Operating);
+				factory.getMasterEventFeed()
+						.started(
+								this,
+								"started run execution",
+								"The execution of run '" + getName()
+										+ "' has started.");
+				break;
+			case Stopped:
+				run.setStatus(RemoteStatus.Stopped);
+				break;
+			case Finished:
+				run.setStatus(RemoteStatus.Finished);
+				break;
+			}
+			return null;
+		} catch (IllegalStateTransitionException e) {
+			throw new BadStateChangeException(e.getMessage());
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e.getMessage(), e.getCause());
+		} catch (GeneralSecurityException | IOException e) {
+			throw new BadStateChangeException(e.getMessage(), e);
+		} catch (ImplementationException e) {
+			if (e.getCause() != null)
+				throw new BadStateChangeException(e.getMessage(), e.getCause());
+			throw new BadStateChangeException(e.getMessage(), e);
+		} catch (StillWorkingOnItException e) {
+			log.info("still working on setting status of run " + id + " to "
+					+ s, e);
+			return e.getMessage();
+		} catch (InterruptedException e) {
+			throw new BadStateChangeException(
+					"interrupted while waiting to insert notification into database");
+		}
+	}
+
+	static void checkBadFilename(String filename)
+			throws FilesystemAccessException {
+		if (filename.startsWith("/"))
+			throw new FilesystemAccessException("filename may not be absolute");
+		if (Arrays.asList(filename.split("/")).contains(".."))
+			throw new FilesystemAccessException(
+					"filename may not refer to parent");
+	}
+
+	@Override
+	public String getInputBaclavaFile() {
+		try {
+			return run.getInputBaclavaFile();
+		} catch (RemoteException e) {
+			log.warn("problem when fetching input baclava file", e);
+			return null;
+		}
+	}
+
+	@Override
+	public List<Input> getInputs() {
+		ArrayList<Input> inputs = new ArrayList<>();
+		try {
+			for (RemoteInput ri : run.getInputs())
+				inputs.add(new RunInput(ri));
+		} catch (RemoteException e) {
+			log.warn("problem when fetching list of workflow inputs", e);
+		}
+		return inputs;
+	}
+
+	@Override
+	public String getOutputBaclavaFile() {
+		try {
+			return run.getOutputBaclavaFile();
+		} catch (RemoteException e) {
+			log.warn("problem when fetching output baclava file", e);
+			return null;
+		}
+	}
+
+	@Override
+	public Input makeInput(String name) throws BadStateChangeException {
+		try {
+			return new RunInput(run.makeInput(name));
+		} catch (RemoteException e) {
+			throw new BadStateChangeException("failed to make input", e);
+		}
+	}
+
+	@Override
+	public void setInputBaclavaFile(String filename)
+			throws FilesystemAccessException, BadStateChangeException {
+		checkBadFilename(filename);
+		try {
+			run.setInputBaclavaFile(filename);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException(
+					"cannot set input baclava file name", e);
+		}
+	}
+
+	@Override
+	public void setOutputBaclavaFile(String filename)
+			throws FilesystemAccessException, BadStateChangeException {
+		checkBadFilename(filename);
+		try {
+			run.setOutputBaclavaFile(filename);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException(
+					"cannot set output baclava file name", e);
+		}
+	}
+
+	@Override
+	public Date getCreationTimestamp() {
+		return creationInstant == null ? null : new Date(
+				creationInstant.getTime());
+	}
+
+	@Override
+	public Date getFinishTimestamp() {
+		try {
+			return run.getFinishTimestamp();
+		} catch (RemoteException e) {
+			log.info("failed to get finish timestamp", e);
+			return null;
+		}
+	}
+
+	@Override
+	public Date getStartTimestamp() {
+		try {
+			return run.getStartTimestamp();
+		} catch (RemoteException e) {
+			log.info("failed to get finish timestamp", e);
+			return null;
+		}
+	}
+
+	/**
+	 * @param readers
+	 *            the readers to set
+	 */
+	public void setReaders(Set<String> readers) {
+		this.readers = new HashSet<>(readers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the readers
+	 */
+	public Set<String> getReaders() {
+		return readers == null ? new HashSet<String>()
+				: unmodifiableSet(readers);
+	}
+
+	/**
+	 * @param writers
+	 *            the writers to set
+	 */
+	public void setWriters(Set<String> writers) {
+		this.writers = new HashSet<>(writers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the writers
+	 */
+	public Set<String> getWriters() {
+		return writers == null ? new HashSet<String>()
+				: unmodifiableSet(writers);
+	}
+
+	/**
+	 * @param destroyers
+	 *            the destroyers to set
+	 */
+	public void setDestroyers(Set<String> destroyers) {
+		this.destroyers = new HashSet<>(destroyers);
+		db.flushToDisk(this);
+	}
+
+	/**
+	 * @return the destroyers
+	 */
+	public Set<String> getDestroyers() {
+		return destroyers == null ? new HashSet<String>()
+				: unmodifiableSet(destroyers);
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+		out.writeUTF(secContext.getOwner().getName());
+		out.writeObject(secContext.getFactory());
+		out.writeObject(new MarshalledObject<>(run));
+	}
+
+	@Override
+	public boolean getGenerateProvenance() {
+		return generateProvenance;
+	}
+
+	@Override
+	public void setGenerateProvenance(boolean generateProvenance) {
+		this.generateProvenance = generateProvenance;
+		db.flushToDisk(this);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException,
+			ClassNotFoundException {
+		in.defaultReadObject();
+		if (log == null)
+			log = getLog("Taverna.Server.LocalWorker");
+		final String creatorName = in.readUTF();
+		SecurityContextFactory factory = (SecurityContextFactory) in
+				.readObject();
+		try {
+			secContext = factory.create(this,
+					new UsernamePrincipal(creatorName));
+		} catch (RuntimeException | IOException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new SecurityContextReconstructionException(e);
+		}
+		run = ((MarshalledObject<RemoteSingleRun>) in.readObject()).get();
+	}
+
+	public void setSecurityContext(TavernaSecurityContext tavernaSecurityContext) {
+		secContext = tavernaSecurityContext;
+	}
+
+	@Override
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public void setName(@Nonnull String name) {
+		if (name.length() > RunConnection.NAME_LENGTH)
+			this.name = name.substring(0, RunConnection.NAME_LENGTH);
+		else
+			this.name = name;
+		db.flushToDisk(this);
+	}
+
+	@Override
+	public void ping() throws UnknownRunException {
+		try {
+			run.ping();
+		} catch (RemoteException e) {
+			throw new UnknownRunException(e);
+		}
+	}
+}
+
+abstract class DEDelegate implements DirectoryEntry {
+	Log log = getLog("Taverna.Server.Worker");
+	private RemoteDirectoryEntry entry;
+	private String name;
+	private String full;
+	private Date cacheModTime;
+	private long cacheQueryTime = 0L;
+
+	DEDelegate(RemoteDirectoryEntry entry) {
+		this.entry = entry;
+	}
+
+	@Override
+	public void destroy() throws FilesystemAccessException {
+		try {
+			entry.destroy();
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to delete directory entry", e);
+		}
+	}
+
+	@Override
+	public String getFullName() {
+		if (full != null)
+			return full;
+		String n = getName();
+		RemoteDirectoryEntry re = entry;
+		try {
+			while (true) {
+				RemoteDirectory parent = re.getContainingDirectory();
+				if (parent == null)
+					break;
+				n = parent.getName() + "/" + n;
+				re = parent;
+			}
+		} catch (RemoteException e) {
+			log.warn("failed to generate full name", e);
+		}
+		return (full = n);
+	}
+
+	@Override
+	public String getName() {
+		if (name == null)
+			try {
+				name = entry.getName();
+			} catch (RemoteException e) {
+				log.error("failed to get name", e);
+			}
+		return name;
+	}
+
+	@Override
+	public Date getModificationDate() {
+		if (cacheModTime == null || currentTimeMillis() - cacheQueryTime < 5000)
+			try {
+				cacheModTime = entry.getModificationDate();
+				cacheQueryTime = currentTimeMillis();
+			} catch (RemoteException e) {
+				log.error("failed to get modification time", e);
+			}
+		return cacheModTime;
+	}
+
+	@Override
+	public int compareTo(DirectoryEntry de) {
+		return getFullName().compareTo(de.getFullName());
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return o != null && o instanceof DEDelegate
+				&& getFullName().equals(((DEDelegate) o).getFullName());
+	}
+
+	@Override
+	public int hashCode() {
+		return getFullName().hashCode();
+	}
+}
+
+class DirectoryDelegate extends DEDelegate implements Directory {
+	RemoteDirectory rd;
+
+	DirectoryDelegate(RemoteDirectory dir) {
+		super(dir);
+		rd = dir;
+	}
+
+	@Override
+	public Collection<DirectoryEntry> getContents()
+			throws FilesystemAccessException {
+		ArrayList<DirectoryEntry> result = new ArrayList<>();
+		try {
+			for (RemoteDirectoryEntry rde : rd.getContents()) {
+				if (rde instanceof RemoteDirectory)
+					result.add(new DirectoryDelegate((RemoteDirectory) rde));
+				else
+					result.add(new FileDelegate((RemoteFile) rde));
+			}
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to get directory contents", e);
+		}
+		return result;
+	}
+
+	@Override
+	public Collection<DirectoryEntry> getContentsByDate()
+			throws FilesystemAccessException {
+		ArrayList<DirectoryEntry> result = new ArrayList<>(getContents());
+		sort(result, new DateComparator());
+		return result;
+	}
+
+	static class DateComparator implements Comparator<DirectoryEntry> {
+		@Override
+		public int compare(DirectoryEntry a, DirectoryEntry b) {
+			return a.getModificationDate().compareTo(b.getModificationDate());
+		}
+	}
+
+	@Override
+	public File makeEmptyFile(Principal actor, String name)
+			throws FilesystemAccessException {
+		try {
+			return new FileDelegate(rd.makeEmptyFile(name));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to make empty file", e);
+		}
+	}
+
+	@Override
+	public Directory makeSubdirectory(Principal actor, String name)
+			throws FilesystemAccessException {
+		try {
+			return new DirectoryDelegate(rd.makeSubdirectory(name));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to make subdirectory",
+					e);
+		}
+	}
+
+	@Override
+	public ZipStream getContentsAsZip() throws FilesystemAccessException {
+		ZipStream zs = new ZipStream();
+
+		final ZipOutputStream zos;
+		try {
+			zos = new ZipOutputStream(new PipedOutputStream(zs));
+		} catch (IOException e) {
+			throw new FilesystemAccessException("problem building zip stream",
+					e);
+		}
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					zipDirectory(rd, null, zos);
+				} catch (IOException e) {
+					log.warn("problem when zipping directory", e);
+				} finally {
+					closeQuietly(zos);
+				}
+			}
+		});
+		t.setDaemon(true);
+		t.start();
+		return zs;
+	}
+
+	/**
+	 * Compresses a directory tree into a ZIP.
+	 * 
+	 * @param dir
+	 *            The directory to compress.
+	 * @param base
+	 *            The base name of the directory (or <tt>null</tt> if this is
+	 *            the root directory of the ZIP).
+	 * @param zos
+	 *            Where to write the compressed data.
+	 * @throws RemoteException
+	 *             If some kind of problem happens with the remote delegates.
+	 * @throws IOException
+	 *             If we run into problems with reading or writing data.
+	 */
+	void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos)
+			throws RemoteException, IOException {
+		for (RemoteDirectoryEntry rde : dir.getContents()) {
+			String name = rde.getName();
+			if (base != null)
+				name = base + "/" + name;
+			if (rde instanceof RemoteDirectory) {
+				RemoteDirectory rd = (RemoteDirectory) rde;
+				zipDirectory(rd, name, zos);
+			} else {
+				RemoteFile rf = (RemoteFile) rde;
+				zos.putNextEntry(new ZipEntry(name));
+				try {
+					int off = 0;
+					while (true) {
+						byte[] c = rf.getContents(off, 64 * 1024);
+						if (c == null || c.length == 0)
+							break;
+						zos.write(c);
+						off += c.length;
+					}
+				} finally {
+					zos.closeEntry();
+				}
+			}
+		}
+	}
+}
+
+class FileDelegate extends DEDelegate implements File {
+	RemoteFile rf;
+
+	FileDelegate(RemoteFile f) {
+		super(f);
+		this.rf = f;
+	}
+
+	@Override
+	public byte[] getContents(int offset, int length)
+			throws FilesystemAccessException {
+		try {
+			return rf.getContents(offset, length);
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to read file contents",
+					e);
+		}
+	}
+
+	@Override
+	public long getSize() throws FilesystemAccessException {
+		try {
+			return rf.getSize();
+		} catch (IOException e) {
+			throw new FilesystemAccessException("failed to get file length", e);
+		}
+	}
+
+	@Override
+	public void setContents(byte[] data) throws FilesystemAccessException {
+		try {
+			rf.setContents(data);
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to write file contents", e);
+		}
+	}
+
+	@Override
+	public void appendContents(byte[] data) throws FilesystemAccessException {
+		try {
+			rf.appendContents(data);
+		} catch (IOException e) {
+			throw new FilesystemAccessException(
+					"failed to write file contents", e);
+		}
+	}
+
+	@Override
+	public void copy(File from) throws FilesystemAccessException {
+		FileDelegate fromFile;
+		try {
+			fromFile = (FileDelegate) from;
+		} catch (ClassCastException e) {
+			throw new FilesystemAccessException("different types of File?!");
+		}
+
+		try {
+			rf.copy(fromFile.rf);
+		} catch (Exception e) {
+			throw new FilesystemAccessException("failed to copy file contents",
+					e);
+		}
+		return;
+	}
+}
+
+class ListenerDelegate implements Listener {
+	private Log log = getLog("Taverna.Server.Worker");
+	private RemoteListener r;
+	String conf;
+
+	ListenerDelegate(RemoteListener l) {
+		r = l;
+	}
+
+	RemoteListener getRemote() {
+		return r;
+	}
+
+	@Override
+	public String getConfiguration() {
+		try {
+			if (conf == null)
+				conf = r.getConfiguration();
+		} catch (RemoteException e) {
+			log.warn("failed to get configuration", e);
+		}
+		return conf;
+	}
+
+	@Override
+	public String getName() {
+		try {
+			return r.getName();
+		} catch (RemoteException e) {
+			log.warn("failed to get name", e);
+			return "UNKNOWN NAME";
+		}
+	}
+
+	@Override
+	public String getProperty(String propName) throws NoListenerException {
+		try {
+			return r.getProperty(propName);
+		} catch (RemoteException e) {
+			throw new NoListenerException("no such property: " + propName, e);
+		}
+	}
+
+	@Override
+	public String getType() {
+		try {
+			return r.getType();
+		} catch (RemoteException e) {
+			log.warn("failed to get type", e);
+			return "UNKNOWN TYPE";
+		}
+	}
+
+	@Override
+	public String[] listProperties() {
+		try {
+			return r.listProperties();
+		} catch (RemoteException e) {
+			log.warn("failed to list properties", e);
+			return new String[0];
+		}
+	}
+
+	@Override
+	public void setProperty(String propName, String value)
+			throws NoListenerException, BadPropertyValueException {
+		try {
+			r.setProperty(propName, value);
+		} catch (RemoteException e) {
+			log.warn("failed to set property", e);
+			if (e.getCause() != null
+					&& e.getCause() instanceof RuntimeException)
+				throw new NoListenerException("failed to set property",
+						e.getCause());
+			if (e.getCause() != null && e.getCause() instanceof Exception)
+				throw new BadPropertyValueException("failed to set property",
+						e.getCause());
+			throw new BadPropertyValueException("failed to set property", e);
+		}
+	}
+}
+
+class RunInput implements Input {
+	private final RemoteInput i;
+
+	RunInput(RemoteInput remote) {
+		this.i = remote;
+	}
+
+	@Override
+	public String getFile() {
+		try {
+			return i.getFile();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public String getName() {
+		try {
+			return i.getName();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public String getValue() {
+		try {
+			return i.getValue();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void setFile(String file) throws FilesystemAccessException,
+			BadStateChangeException {
+		checkBadFilename(file);
+		try {
+			i.setFile(file);
+		} catch (RemoteException e) {
+			throw new FilesystemAccessException("cannot set file for input", e);
+		}
+	}
+
+	@Override
+	public void setValue(String value) throws BadStateChangeException {
+		try {
+			i.setValue(value);
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e);
+		}
+	}
+
+	@Override
+	public String getDelimiter() {
+		try {
+			return i.getDelimiter();
+		} catch (RemoteException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void setDelimiter(String delimiter) throws BadStateChangeException {
+		try {
+			if (delimiter != null)
+				delimiter = delimiter.substring(0, 1);
+			i.setDelimiter(delimiter);
+		} catch (RemoteException e) {
+			throw new BadStateChangeException(e);
+		}
+	}
+}
+
+@SuppressWarnings("serial")
+class SecurityContextReconstructionException extends RuntimeException {
+	public SecurityContextReconstructionException(Throwable t) {
+		super("failed to rebuild security context", t);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
new file mode 100644
index 0000000..0c2b1a9
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
@@ -0,0 +1,252 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY;
+import static org.taverna.server.master.worker.RunConnection.SCHEMA;
+import static org.taverna.server.master.worker.RunConnection.TABLE;
+import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY;
+
+import java.io.IOException;
+import java.rmi.MarshalledObject;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.master.common.Credential;
+import org.taverna.server.master.common.Trust;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The representation of the connections to the runs that actually participates
+ * in the persistence system.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceCapable(table = TABLE, schema = SCHEMA)
+@Queries({
+		@Query(name = "count", language = "SQL", value = COUNT_QUERY, unique = "true", resultClass = Integer.class),
+		@Query(name = "names", language = "SQL", value = NAMES_QUERY, unique = "false", resultClass = String.class),
+		@Query(name = "unterminated", language = "SQL", value = UNTERMINATED_QUERY, unique = "false", resultClass = String.class),
+		@Query(name = "timedout", language = "SQL", value = TIMEOUT_QUERY, unique = "false", resultClass = String.class) })
+public class RunConnection {
+	static final String SCHEMA = "TAVERNA";
+	static final String TABLE = "RUN_CONNECTION";
+	private static final String FULL_NAME = SCHEMA + "." + TABLE;
+	static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME;
+	static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME;
+	static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME
+			+ "   WHERE expiry < CURRENT_TIMESTAMP";
+	static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME
+			+ "   WHERE doneTransitionToFinished = 0";
+	static final int NAME_LENGTH = 48; 
+
+	@PrimaryKey
+	@Column(length = 40)
+	private String id;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = NAME_LENGTH)
+	private String name;
+
+	@Persistent(defaultFetchGroup = "true")
+	private Date creationInstant;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Workflow workflow;
+
+	@Persistent(defaultFetchGroup = "true")
+	private Date expiry;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_READERS", column = "ID")
+	private String[] readers;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_WRITERS", column = "ID")
+	private String[] writers;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Join(table = TABLE + "_DESTROYERS", column = "ID")
+	private String[] destroyers;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private MarshalledObject<RemoteSingleRun> run;
+
+	@Persistent(defaultFetchGroup = "true")
+	private int doneTransitionToFinished;
+
+	@Persistent(defaultFetchGroup = "true")
+	private int generateProvenance;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = 128)
+	String owner;
+
+	@Persistent(defaultFetchGroup = "true")
+	@Column(length = 36)
+	private String securityToken;
+
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private SecurityContextFactory securityContextFactory;
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Credential[] credentials;
+	@Persistent(defaultFetchGroup = "true", serialized = "true")
+	@Column(jdbcType = "BLOB", sqlType = "BLOB")
+	private Trust[] trust;
+
+	private static final String[] STRING_ARY = new String[0];
+
+	public String getId() {
+		return id;
+	}
+
+	public boolean isFinished() {
+		return doneTransitionToFinished != 0;
+	}
+
+	public void setFinished(boolean finished) {
+		doneTransitionToFinished = (finished ? 1 : 0);
+	}
+
+	public boolean isProvenanceGenerated() {
+		return generateProvenance != 0;
+	}
+
+	public void setProvenanceGenerated(boolean generate) {
+		generateProvenance = (generate ? 1 : 0);
+	}
+
+	/**
+	 * Manufacture a persistent representation of the given workflow run. Must
+	 * be called within the context of a transaction.
+	 * 
+	 * @param rrd
+	 *            The remote delegate of the workflow run.
+	 * @return The persistent object.
+	 * @throws IOException
+	 *             If serialisation fails.
+	 */
+	@Nonnull
+	public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd)
+			throws IOException {
+		RunConnection rc = new RunConnection();
+		rc.id = rrd.id;
+		rc.makeChanges(rrd);
+		return rc;
+	}
+
+	private static List<String> list(String[] ary) {
+		if (ary == null)
+			return emptyList();
+		return asList(ary);
+	}
+
+	/**
+	 * Get the remote run delegate for a particular persistent connection. Must
+	 * be called within the context of a transaction.
+	 * 
+	 * @param db
+	 *            The database facade.
+	 * @return The delegate object.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nonnull
+	public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db)
+			throws Exception {
+		RemoteRunDelegate rrd = new RemoteRunDelegate();
+		rrd.id = getId();
+		rrd.creationInstant = creationInstant;
+		rrd.workflow = workflow;
+		rrd.expiry = expiry;
+		rrd.readers = new HashSet<>(list(readers));
+		rrd.writers = new HashSet<>(list(writers));
+		rrd.destroyers = new HashSet<>(list(destroyers));
+		rrd.run = run.get();
+		rrd.doneTransitionToFinished = isFinished();
+		rrd.generateProvenance = isProvenanceGenerated();
+		rrd.secContext = securityContextFactory.create(rrd,
+				new UsernamePrincipal(owner));
+		((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust);
+		rrd.db = db;
+		rrd.factory = db.getFactory();
+		rrd.name = name;
+		return rrd;
+	}
+
+	/**
+	 * Flush changes from a remote run delegate to the database. Must be called
+	 * within the context of a transaction.
+	 * 
+	 * @param rrd
+	 *            The remote run delegate object that has potential changes.
+	 * @throws IOException
+	 *             If anything goes wrong in serialization.
+	 */
+	public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		// Properties that are set exactly once
+		if (creationInstant == null) {
+			creationInstant = rrd.getCreationTimestamp();
+			workflow = rrd.getWorkflow();
+			run = new MarshalledObject<>(rrd.run);
+			securityContextFactory = rrd.getSecurityContext().getFactory();
+			owner = rrd.getSecurityContext().getOwner().getName();
+			securityToken = ((org.taverna.server.master.worker.SecurityContextFactory) securityContextFactory)
+					.issueNewPassword();
+		}
+		// Properties that are set multiple times
+		expiry = rrd.getExpiry();
+		readers = rrd.getReaders().toArray(STRING_ARY);
+		writers = rrd.getWriters().toArray(STRING_ARY);
+		destroyers = rrd.getDestroyers().toArray(STRING_ARY);
+		credentials = rrd.getSecurityContext().getCredentials();
+		trust = rrd.getSecurityContext().getTrusted();
+		if (rrd.name.length() > NAME_LENGTH)
+			this.name = rrd.name.substring(0, NAME_LENGTH);
+		else
+			this.name = rrd.name;
+		setFinished(rrd.doneTransitionToFinished);
+		setProvenanceGenerated(rrd.generateProvenance);
+	}
+
+	public String getSecurityToken() {
+		return securityToken;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
new file mode 100644
index 0000000..5fa96b8
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
@@ -0,0 +1,96 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.taverna.server.master.notification.NotificationEngine;
+
+/**
+ * The interface to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public interface RunDBSupport {
+	/**
+	 * Scan each run to see if it has finished yet and issue registered
+	 * notifications if it has.
+	 */
+	void checkForFinishNow();
+
+	/**
+	 * Remove currently-expired runs from this database.
+	 */
+	void cleanNow();
+
+	/**
+	 * How many runs are stored in the database.
+	 * 
+	 * @return The current size of the run table.
+	 */
+	int countRuns();
+
+	/**
+	 * Ensure that a run gets persisted in the database. It is assumed that the
+	 * value is already in there.
+	 * 
+	 * @param run
+	 *            The run to persist.
+	 */
+	void flushToDisk(@Nonnull RemoteRunDelegate run);
+
+	/**
+	 * Select an arbitrary representative run.
+	 * 
+	 * @return The selected run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nullable
+	RemoteRunDelegate pickArbitraryRun() throws Exception;
+
+	/**
+	 * Get a list of all the run names.
+	 * 
+	 * @return The names (i.e., UUIDs) of all the runs.
+	 */
+	@Nonnull
+	List<String> listRunNames();
+
+	/**
+	 * @param notificationEngine
+	 *            A reference to the notification fabric bean.
+	 */
+	void setNotificationEngine(NotificationEngine notificationEngine);
+
+	/**
+	 * @param notifier
+	 *            A reference to the bean that creates messages about workflow
+	 *            run termination.
+	 */
+	void setNotifier(CompletionNotifier notifier);
+
+	/**
+	 * @return A reference to the actual factory for remote runs.
+	 */
+	FactoryBean getFactory();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
new file mode 100644
index 0000000..65aec70
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
@@ -0,0 +1,324 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Integer.parseInt;
+import static java.util.UUID.randomUUID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.RunStore;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.notification.NotificationEngine;
+import org.taverna.server.master.notification.NotificationEngine.Message;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The main facade bean that interfaces to the database of runs.
+ * 
+ * @author Donal Fellows
+ */
+public class RunDatabase implements RunStore, RunDBSupport {
+	private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+	RunDatabaseDAO dao;
+	CompletionNotifier backupNotifier;
+	Map<String, CompletionNotifier> typedNotifiers;
+	private NotificationEngine notificationEngine;
+	@Autowired
+	private FactoryBean factory;
+	private Map<String, TavernaRun> cache = new HashMap<>();
+
+	@Override
+	@Required
+	public void setNotifier(CompletionNotifier n) {
+		backupNotifier = n;
+	}
+
+	public void setTypeNotifiers(List<CompletionNotifier> notifiers) {
+		typedNotifiers = new HashMap<>();
+		for (CompletionNotifier n : notifiers)
+			typedNotifiers.put(n.getName(), n);
+	}
+
+	@Required
+	@Override
+	public void setNotificationEngine(NotificationEngine notificationEngine) {
+		this.notificationEngine = notificationEngine;
+	}
+
+	@Required
+	public void setDao(RunDatabaseDAO dao) {
+		this.dao = dao;
+	}
+
+	@Override
+	public void checkForFinishNow() {
+		/*
+		 * Get which runs are actually newly finished; this requires getting the
+		 * candidates from the database and *then* doing the expensive requests
+		 * to the back end to find out the status.
+		 */
+		Map<String, RemoteRunDelegate> notifiable = new HashMap<>();
+		for (RemoteRunDelegate p : dao.getPotentiallyNotifiable())
+			if (p.getStatus() == Status.Finished)
+				notifiable.put(p.getId(), p);
+
+		// Check if there's nothing more to do
+		if (notifiable.isEmpty())
+			return;
+
+		/*
+		 * Tell the database about the ones we've got.
+		 */
+		dao.markFinished(notifiable.keySet());
+
+		/*
+		 * Send out the notifications. The notification addresses are stored in
+		 * the back-end engine, so this is *another* thing that can take time.
+		 */
+		for (RemoteRunDelegate rrd : notifiable.values())
+			for (Listener l : rrd.getListeners())
+				if (l.getName().equals("io")) {
+					try {
+						notifyFinished(rrd.id, l, rrd);
+					} catch (Exception e) {
+						log.warn("failed to do notification of completion", e);
+					}
+					break;
+				}
+	}
+
+	@Override
+	public void cleanNow() {
+		List<String> cleaned;
+		try {
+			cleaned = dao.doClean();
+		} catch (Exception e) {
+			log.warn("failure during deletion of expired runs", e);
+			return;
+		}
+		synchronized (cache) {
+			for (String id : cleaned)
+				cache.remove(id);
+		}
+	}
+
+	@Override
+	public int countRuns() {
+		return dao.countRuns();
+	}
+
+	@Override
+	public void flushToDisk(RemoteRunDelegate run) {
+		try {
+			dao.flushToDisk(run);
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"unexpected problem when persisting run record in database",
+					e);
+		}
+	}
+
+	@Override
+	public RemoteRunDelegate pickArbitraryRun() throws Exception {
+		return dao.pickArbitraryRun();
+	}
+
+	@Override
+	public List<String> listRunNames() {
+		return dao.listRunNames();
+	}
+
+	@Nullable
+	private TavernaRun get(String uuid) {
+		TavernaRun run = null;
+		synchronized (cache) {
+			run = cache.get(uuid);
+		}
+		try {
+			if (run != null)
+				run.ping();
+		} catch (UnknownRunException e) {
+			if (log.isDebugEnabled())
+				log.debug("stale mapping in cache?", e);
+			// Don't need to flush the cache; this happens when cleaning anyway
+			run = null;
+		}
+		if (run == null)
+			run = dao.get(uuid);
+		return run;
+	}
+
+	@Override
+	public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid)
+			throws UnknownRunException {
+		// Check first to see if the 'uuid' actually looks like a UUID; if
+		// not, throw it out immediately without logging an exception.
+		try {
+			UUID.fromString(uuid);
+		} catch (IllegalArgumentException e) {
+			if (log.isDebugEnabled())
+				log.debug("run ID does not look like UUID; rejecting...");
+			throw new UnknownRunException();
+		}
+		TavernaRun run = get(uuid);
+		if (run != null && (user == null || p.permitAccess(user, run)))
+			return run;
+		throw new UnknownRunException();
+	}
+
+	@Override
+	public TavernaRun getRun(String uuid) throws UnknownRunException {
+		TavernaRun run = get(uuid);
+		if (run != null)
+			return run;
+		throw new UnknownRunException();
+	}
+
+	@Override
+	public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+		synchronized (cache) {
+			Map<String, TavernaRun> cached = new HashMap<>();
+			for (Entry<String, TavernaRun> e : cache.entrySet()) {
+				TavernaRun r = e.getValue();
+				if (p.permitAccess(user, r))
+					cached.put(e.getKey(), r);
+			}
+			if (!cached.isEmpty())
+				return cached;
+		}
+		return dao.listRuns(user, p);
+	}
+
+	private void logLength(String message, Object obj) {
+		if (!log.isDebugEnabled())
+			return;
+		try {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+				oos.writeObject(obj);
+			}
+			log.debug(message + ": " + baos.size());
+		} catch (Exception e) {
+			log.warn("oops", e);
+		}
+	}
+
+	@Override
+	public String registerRun(TavernaRun run) {
+		if (!(run instanceof RemoteRunDelegate))
+			throw new IllegalArgumentException(
+					"run must be created by localworker package");
+		RemoteRunDelegate rrd = (RemoteRunDelegate) run;
+		if (rrd.id == null)
+			rrd.id = randomUUID().toString();
+		logLength("RemoteRunDelegate serialized length", rrd);
+		try {
+			dao.persistRun(rrd);
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"unexpected problem when persisting run record in database",
+					e);
+		}
+		synchronized (cache) {
+			cache.put(rrd.getId(), run);
+		}
+		return rrd.getId();
+	}
+
+	@Override
+	public void unregisterRun(String uuid) {
+		try {
+			if (dao.unpersistRun(uuid))
+				synchronized (cache) {
+					cache.remove(uuid);
+				}
+		} catch (RuntimeException e) {
+			if (log.isDebugEnabled())
+				log.debug("problem persisting the deletion of the run " + uuid,
+						e);
+		}
+	}
+
+	/**
+	 * Process the event that a run has finished.
+	 * 
+	 * @param name
+	 *            The name of the run.
+	 * @param io
+	 *            The io listener of the run (used to get information about the
+	 *            run).
+	 * @param run
+	 *            The handle to the run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	private void notifyFinished(final String name, Listener io,
+			final RemoteRunDelegate run) throws Exception {
+		String to = io.getProperty("notificationAddress");
+		final int code;
+		try {
+			code = parseInt(io.getProperty("exitcode"));
+		} catch (NumberFormatException nfe) {
+			// Ignore; not much we can do here...
+			return;
+		}
+
+		notificationEngine.dispatchMessage(run, to, new Message() {
+			private CompletionNotifier getNotifier(String type) {
+				CompletionNotifier n = typedNotifiers.get(type);
+				if (n == null)
+					n = backupNotifier;
+				return n;
+			}
+
+			@Override
+			public String getContent(String type) {
+				return getNotifier(type).makeCompletionMessage(name, run, code);
+			}
+
+			@Override
+			public String getTitle(String type) {
+				return getNotifier(type).makeMessageSubject(name, run, code);
+			}
+		});
+	}
+
+	@Override
+	public FactoryBean getFactory() {
+		return factory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
new file mode 100644
index 0000000..1c75d22
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
@@ -0,0 +1,323 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.worker.RunConnection.toDBform;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.utils.CallTimeLogger.PerfLogged;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * This handles storing runs, interfacing with the underlying state engine as
+ * necessary.
+ * 
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class RunDatabaseDAO extends JDOSupport<RunConnection> {
+	public RunDatabaseDAO() {
+		super(RunConnection.class);
+	}
+
+	private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+	private RunDatabase facade;
+
+	@Required
+	public void setFacade(RunDatabase facade) {
+		this.facade = facade;
+	}
+
+	// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+	@SuppressWarnings("unchecked")
+	private List<String> names() {
+		if (log.isDebugEnabled())
+			log.debug("fetching all run names");
+		return (List<String>) namedQuery("names").execute();
+	}
+
+	/**
+	 * @return The number of workflow runs in the database.
+	 */
+	@WithinSingleTransaction
+	public int countRuns() {
+		if (log.isDebugEnabled())
+			log.debug("counting the number of runs");
+		return count();
+	}
+
+	private Integer count() {
+		return (Integer) namedQuery("count").execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> timedout() {
+		return (List<String>) namedQuery("timedout").execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	private List<String> unterminated() {
+		return (List<String>) namedQuery("unterminated").execute();
+	}
+
+	@Nullable
+	private RunConnection pickRun(@Nonnull String name) {
+		if (log.isDebugEnabled())
+			log.debug("fetching the run called " + name);
+		try {
+			RunConnection rc = getById(name);
+			if (rc == null)
+				log.warn("no result for " + name);
+			return rc;
+		} catch (RuntimeException e) {
+			log.warn("problem in fetch", e);
+			throw e;
+		}
+	}
+
+	@Nullable
+	@WithinSingleTransaction
+	public String getSecurityToken(@Nonnull String name) {
+		RunConnection rc = getById(name);
+		if (rc == null)
+			return null;
+		return rc.getSecurityToken();
+	}
+
+	private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		persist(toDBform(rrd));
+	}
+
+	@Nonnull
+	private List<RunConnection> allRuns() {
+		try {
+			List<RunConnection> rcs = new ArrayList<>();
+			List<String> names = names();
+			for (String id : names) {
+				try {
+					if (id != null)
+						rcs.add(pickRun(id));
+				} catch (RuntimeException e) {
+					continue;
+				}
+			}
+			return rcs;
+		} catch (RuntimeException e) {
+			log.warn("problem in fetch", e);
+			throw e;
+		}
+	}
+
+	// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+	/**
+	 * Obtain a workflow run handle.
+	 * 
+	 * @param name
+	 *            The identifier of the run.
+	 * @return The run handle, or <tt>null</tt> if there is no such run.
+	 */
+	@Nullable
+	@WithinSingleTransaction
+	public TavernaRun get(String name) {
+		try {
+			RunConnection rc = pickRun(name);
+			return (rc == null) ? null : rc.fromDBform(facade);
+		} catch (Exception e) {
+			return null;
+		}
+	}
+
+	/**
+	 * Get the runs that a user can read things from.
+	 * 
+	 * @param user
+	 *            Who is asking?
+	 * @param p
+	 *            The policy that determines what they can see.
+	 * @return A mapping from run IDs to run handles.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+		Map<String, TavernaRun> result = new HashMap<>();
+		for (String id : names())
+			try {
+				RemoteRunDelegate rrd = pickRun(id).fromDBform(facade);
+				if (p.permitAccess(user, rrd))
+					result.put(id, rrd);
+			} catch (Exception e) {
+				continue;
+			}
+		return result;
+	}
+
+	/**
+	 * @return A list of the IDs for all workflow runs.
+	 */
+	@Nonnull
+	@WithinSingleTransaction
+	public List<String> listRunNames() {
+		List<String> runNames = new ArrayList<>();
+		for (RunConnection rc : allRuns())
+			if (rc.getId() != null)
+				runNames.add(rc.getId());
+		return runNames;
+	}
+
+	/**
+	 * @return An arbitrary, representative workflow run.
+	 * @throws Exception
+	 *             If anything goes wrong.
+	 */
+	@Nullable
+	@WithinSingleTransaction
+	public RemoteRunDelegate pickArbitraryRun() throws Exception {
+		for (RunConnection rc : allRuns()) {
+			if (rc.getId() == null)
+				continue;
+			return rc.fromDBform(facade);
+		}
+		return null;
+	}
+
+	/**
+	 * Make a workflow run persistent. Must only be called once per workflow
+	 * run.
+	 * 
+	 * @param rrd
+	 *            The workflow run to persist.
+	 * @throws IOException
+	 *             If anything goes wrong with serialisation of the run.
+	 */
+	@WithinSingleTransaction
+	public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException {
+		persist(rrd);
+	}
+
+	/**
+	 * Stop a workflow run from being persistent.
+	 * 
+	 * @param name
+	 *            The ID of the run.
+	 * @return Whether a deletion happened.
+	 */
+	@WithinSingleTransaction
+	public boolean unpersistRun(String name) {
+		RunConnection rc = pickRun(name);
+		if (rc != null)
+			delete(rc);
+		return rc != null;
+	}
+
+	/**
+	 * Ensure that the given workflow run is synchronized with the database.
+	 * 
+	 * @param run
+	 *            The run to synchronise.
+	 * @throws IOException
+	 *             If serialization of anything fails.
+	 */
+	@WithinSingleTransaction
+	public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException {
+		getById(run.id).makeChanges(run);
+	}
+
+	/**
+	 * Remove all workflow runs that have expired.
+	 * 
+	 * @return The ids of the deleted runs.
+	 */
+	@Nonnull
+	@PerfLogged
+	@WithinSingleTransaction
+	public List<String> doClean() {
+		if (log.isDebugEnabled())
+			log.debug("deleting runs that timed out before " + new Date());
+		List<String> toDelete = timedout();
+		if (log.isDebugEnabled())
+			log.debug("found " + toDelete.size() + " runs to delete");
+		for (String id : toDelete) {
+			RunConnection rc = getById(id);
+			try {
+				rc.fromDBform(facade).run.destroy();
+			} catch (Exception e) {
+				if (log.isDebugEnabled())
+					log.debug("failed to delete execution resource for " + id,
+							e);
+			}
+			delete(rc);
+		}
+		return toDelete;
+	}
+
+	/**
+	 * @return A list of workflow runs that are candidates for doing
+	 *         notification of termination.
+	 */
+	@Nonnull
+	@PerfLogged
+	@WithinSingleTransaction
+	public List<RemoteRunDelegate> getPotentiallyNotifiable() {
+		List<RemoteRunDelegate> toNotify = new ArrayList<>();
+		for (String id : unterminated())
+			try {
+				RunConnection rc = getById(id);
+				toNotify.add(rc.fromDBform(facade));
+			} catch (Exception e) {
+				log.warn("failed to fetch connection token"
+						+ "for notification of completion check", e);
+			}
+		return toNotify;
+	}
+
+	@PerfLogged
+	@WithinSingleTransaction
+	public void markFinished(@Nonnull Set<String> terminated) {
+		for (String id : terminated) {
+			RunConnection rc = getById(id);
+			if (rc == null)
+				continue;
+			try {
+				rc.fromDBform(facade).doneTransitionToFinished = true;
+				rc.setFinished(true);
+			} catch (Exception e) {
+				log.warn("failed to note termination", e);
+			}
+		}
+	}
+}