You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2018/01/09 23:30:37 UTC
[21/42] incubator-taverna-server git commit: package org.taverna ->
org.apache.taverna
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
new file mode 100644
index 0000000..1868f94
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/CompletionNotifier.java
@@ -0,0 +1,58 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * How to convert a notification about the completion of a job into a message.
+ *
+ * @author Donal Fellows
+ */
+public interface CompletionNotifier {
+ /**
+ * @return The name of this notifier.
+ */
+ String getName();
+
+ /**
+ * Called to get the content of a message that a workflow run has finished.
+ *
+ * @param name
+ * The name of the run.
+ * @param run
+ * What run are we talking about.
+ * @param code
+ * What the exit code was.
+ * @return The plain-text content of the message.
+ */
+ String makeCompletionMessage(String name, RemoteRunDelegate run, int code);
+
+ /**
+ * Called to get the subject of the message to dispatch.
+ *
+ * @param name
+ * The name of the run.
+ * @param run
+ * What run are we talking about.
+ * @param code
+ * What the exit code was.
+ * @return The plain-text subject of the message.
+ */
+ String makeMessageSubject(String name, RemoteRunDelegate run, int code);
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
new file mode 100644
index 0000000..d38f0cc
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/FactoryBean.java
@@ -0,0 +1,39 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.taverna.server.master.notification.atom.EventDAO;
+
+/**
+ * What the remote run really needs of its factory.
+ *
+ * @author Donal Fellows
+ */
+public interface FactoryBean {
+ /**
+ * @return Whether a run can actually be started at this time.
+ */
+ boolean isAllowingRunsToStart();
+
+ /**
+ * @return a handle to the master Atom event feed (<i>not</i> the per-run
+ * feed)
+ */
+ EventDAO getMasterEventFeed();
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
new file mode 100644
index 0000000..649db64
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PasswordIssuer.java
@@ -0,0 +1,73 @@
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple password issuing bean.
+ *
+ * @author Donal Fellows
+ */
+public class PasswordIssuer {
+ private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g',
+ 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
+ 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G',
+ 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
+ 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', '3', '4', '5', '6', '7',
+ '8', '9', '0', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')',
+ ',', '.', '<', '>', '/', '?', ':', ';', '-', '_', '+', '[', ']',
+ '{', '}', '`', '~' };
+ private Log log = LogFactory.getLog("Taverna.Server.Worker");
+ private SecureRandom r;
+ private int length;
+
+ public PasswordIssuer() {
+ r = new SecureRandom();
+ log.info("constructing passwords with " + r.getAlgorithm());
+ setLength(8);
+ }
+
+ public PasswordIssuer(String algorithm) throws NoSuchAlgorithmException {
+ r = SecureRandom.getInstance(algorithm);
+ log.info("constructing passwords with " + r.getAlgorithm());
+ setLength(8);
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ log.info("issued password will be " + this.length
+ + " symbols chosen from " + ALPHABET.length);
+ }
+
+ /**
+ * Issue a password.
+ *
+ * @return The new password.
+ */
+ public String issue() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ sb.append(ALPHABET[r.nextInt(ALPHABET.length)]);
+ log.info("issued new password of length " + sb.length());
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
new file mode 100644
index 0000000..37d5760
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyImpl.java
@@ -0,0 +1,171 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.identity.WorkflowInternalAuthProvider.PREFIX;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.taverna.server.master.common.Roles;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.NoCreateException;
+import org.taverna.server.master.exceptions.NoDestroyException;
+import org.taverna.server.master.exceptions.NoUpdateException;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Basic policy implementation that allows any workflow to be instantiated by
+ * any user, but which does not permit users to access each others workflow
+ * runs. It also imposes a global limit on the number of workflow runs at once.
+ *
+ * @author Donal Fellows
+ */
+class PolicyImpl implements Policy {
+ Log log = LogFactory.getLog("Taverna.Server.Worker.Policy");
+ private PolicyLimits limits;
+ private RunDBSupport runDB;
+
+ @Required
+ public void setLimits(PolicyLimits limits) {
+ this.limits = limits;
+ }
+
+ @Required
+ public void setRunDB(RunDBSupport runDB) {
+ this.runDB = runDB;
+ }
+
+ @Override
+ public int getMaxRuns() {
+ return limits.getMaxRuns();
+ }
+
+ @Override
+ public Integer getMaxRuns(UsernamePrincipal user) {
+ return null;
+ }
+
+ @Override
+ public int getOperatingLimit() {
+ return limits.getOperatingLimit();
+ }
+
+ @Override
+ public List<URI> listPermittedWorkflowURIs(UsernamePrincipal user) {
+ return limits.getPermittedWorkflowURIs();
+ }
+
+ private boolean isSelfAccess(String runId) {
+ Authentication auth = SecurityContextHolder.getContext()
+ .getAuthentication();
+ boolean self = false;
+ String id = null;
+ for (GrantedAuthority a : auth.getAuthorities()) {
+ String aa = a.getAuthority();
+ if (aa.equals(Roles.SELF)) {
+ self = true;
+ continue;
+ }
+ if (!aa.startsWith(PREFIX))
+ continue;
+ id = aa.substring(PREFIX.length());
+ }
+ return self && runId.equals(id);
+ }
+
+ @Override
+ public boolean permitAccess(UsernamePrincipal user, TavernaRun run) {
+ String username = user.getName();
+ TavernaSecurityContext context = run.getSecurityContext();
+ if (context.getOwner().getName().equals(username)) {
+ if (log.isDebugEnabled())
+ log.debug("granted access by " + user.getName() + " to "
+ + run.getId());
+ return true;
+ }
+ if (isSelfAccess(run.getId())) {
+ if (log.isDebugEnabled())
+ log.debug("access by workflow to itself: " + run.getId());
+ return true;
+ }
+ if (log.isDebugEnabled())
+ log.debug("considering access by " + user.getName() + " to "
+ + run.getId());
+ return context.getPermittedReaders().contains(username);
+ }
+
+ @Override
+ public void permitCreate(UsernamePrincipal user, Workflow workflow)
+ throws NoCreateException {
+ if (user == null)
+ throw new NoCreateException(
+ "anonymous workflow creation not allowed");
+ if (runDB.countRuns() >= getMaxRuns())
+ throw new NoCreateException("server load exceeded; please wait");
+ }
+
+ @Override
+ public synchronized void permitDestroy(UsernamePrincipal user, TavernaRun run)
+ throws NoDestroyException {
+ if (user == null)
+ throw new NoDestroyException();
+ String username = user.getName();
+ TavernaSecurityContext context = run.getSecurityContext();
+ if (context.getOwner() == null
+ || context.getOwner().getName().equals(username))
+ return;
+ if (!context.getPermittedDestroyers().contains(username))
+ throw new NoDestroyException();
+ }
+
+ @Override
+ public void permitUpdate(UsernamePrincipal user, TavernaRun run)
+ throws NoUpdateException {
+ if (user == null)
+ throw new NoUpdateException(
+ "workflow run not owned by you and you're not granted access");
+ TavernaSecurityContext context = run.getSecurityContext();
+ if (context.getOwner().getName().equals(user.getName()))
+ return;
+ if (isSelfAccess(run.getId())) {
+ if (log.isDebugEnabled())
+ log.debug("update access by workflow to itself: " + run.getId());
+ return;
+ }
+ if (!context.getPermittedUpdaters().contains(user.getName()))
+ throw new NoUpdateException(
+ "workflow run not owned by you and you're not granted access");
+ }
+
+ @Override
+ public void setPermittedWorkflowURIs(UsernamePrincipal user,
+ List<URI> permitted) {
+ limits.setPermittedWorkflowURIs(permitted);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
new file mode 100644
index 0000000..43c0aa4
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/PolicyLimits.java
@@ -0,0 +1,56 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.util.List;
+
+import org.taverna.server.master.common.Status;
+
+/**
+ * The worker policy delegates certain limits to the state model of the
+ * particular worker.
+ *
+ * @author Donal Fellows
+ */
+public interface PolicyLimits {
+ /**
+ * @return the maximum number of extant workflow runs in any state
+ */
+ int getMaxRuns();
+
+ /**
+ * @return the maximum number of workflow runs in the
+ * {@linkplain Status#Operating operating} state.
+ */
+ int getOperatingLimit();
+
+ /**
+ * @return the list of URIs to workflows that may be used to create workflow
+ * runs. If empty or <tt>null</tt>, no restriction is present.
+ */
+ List<URI> getPermittedWorkflowURIs();
+
+ /**
+ * @param permitted
+ * the list of URIs to workflows that may be used to create
+ * workflow runs.
+ */
+ void setPermittedWorkflowURIs(List<URI> permitted);
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
new file mode 100644
index 0000000..fb1ac47
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RemoteRunDelegate.java
@@ -0,0 +1,980 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Calendar.MINUTE;
+import static java.util.Collections.sort;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.UUID.randomUUID;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.logging.LogFactory.getLog;
+import static org.taverna.server.master.worker.RemoteRunDelegate.checkBadFilename;
+import static org.taverna.server.master.worker.RunConnection.NAME_LENGTH;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PipedOutputStream;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.taverna.server.localworker.remote.IllegalStateTransitionException;
+import org.taverna.server.localworker.remote.ImplementationException;
+import org.taverna.server.localworker.remote.RemoteDirectory;
+import org.taverna.server.localworker.remote.RemoteDirectoryEntry;
+import org.taverna.server.localworker.remote.RemoteFile;
+import org.taverna.server.localworker.remote.RemoteInput;
+import org.taverna.server.localworker.remote.RemoteListener;
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.localworker.remote.RemoteStatus;
+import org.taverna.server.localworker.remote.StillWorkingOnItException;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.exceptions.BadPropertyValueException;
+import org.taverna.server.master.exceptions.BadStateChangeException;
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.exceptions.NoListenerException;
+import org.taverna.server.master.exceptions.OverloadedException;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Directory;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+import org.taverna.server.master.interfaces.File;
+import org.taverna.server.master.interfaces.Input;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.TavernaSecurityContext;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Bridging shim between the WebApp world and the RMI world.
+ *
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+public class RemoteRunDelegate implements TavernaRun {
+ private transient Log log = getLog("Taverna.Server.Worker");
+ transient TavernaSecurityContext secContext;
+ Date creationInstant;
+ Workflow workflow;
+ Date expiry;
+ HashSet<String> readers;
+ HashSet<String> writers;
+ HashSet<String> destroyers;
+ transient String id;
+ transient RemoteSingleRun run;
+ transient RunDBSupport db;
+ transient FactoryBean factory;
+ boolean doneTransitionToFinished;
+ boolean generateProvenance;// FIXME expose
+ String name;
+ private static final String ELLIPSIS = "...";
+
+ public RemoteRunDelegate(Date creationInstant, Workflow workflow,
+ RemoteSingleRun rsr, int defaultLifetime, RunDBSupport db, UUID id,
+ boolean generateProvenance, FactoryBean factory) {
+ if (rsr == null)
+ throw new IllegalArgumentException("remote run must not be null");
+ this.creationInstant = creationInstant;
+ this.workflow = workflow;
+ Calendar c = Calendar.getInstance();
+ c.add(MINUTE, defaultLifetime);
+ this.expiry = c.getTime();
+ this.run = rsr;
+ this.db = db;
+ this.generateProvenance = generateProvenance;
+ this.factory = factory;
+ try {
+ this.name = "";
+ String ci = " " + creationInstant;
+ String n = workflow.getName();
+ if (n.length() > NAME_LENGTH - ci.length())
+ n = n.substring(0,
+ NAME_LENGTH - ci.length() - ELLIPSIS.length())
+ + ELLIPSIS;
+ this.name = n + ci;
+ } catch (Exception e) {
+ // Ignore; it's just a name, not something important.
+ }
+ if (id != null)
+ this.id = id.toString();
+ }
+
+ RemoteRunDelegate() {
+ }
+
+ /**
+ * Get the types of listener supported by this run.
+ *
+ * @return A list of listener type names.
+ * @throws RemoteException
+ * If anything goes wrong.
+ */
+ public List<String> getListenerTypes() throws RemoteException {
+ return run.getListenerTypes();
+ }
+
+ @Override
+ public void addListener(Listener listener) {
+ if (listener instanceof ListenerDelegate)
+ try {
+ run.addListener(((ListenerDelegate) listener).getRemote());
+ } catch (RemoteException e) {
+ log.warn("communication problem adding listener", e);
+ } catch (ImplementationException e) {
+ log.warn("implementation problem adding listener", e);
+ }
+ else
+ log.fatal("bad listener " + listener.getClass()
+ + "; not applicable remotely!");
+ }
+
+ @Override
+ public String getId() {
+ if (id == null)
+ id = randomUUID().toString();
+ return id;
+ }
+
+ /**
+ * Attach a listener to a workflow run and return its local delegate.
+ *
+ * @param type
+ * The type of listener to create.
+ * @param config
+ * The configuration of the listener.
+ * @return The local delegate of the listener.
+ * @throws NoListenerException
+ * If anything goes wrong.
+ */
+ public Listener makeListener(String type, String config)
+ throws NoListenerException {
+ try {
+ return new ListenerDelegate(run.makeListener(type, config));
+ } catch (RemoteException e) {
+ throw new NoListenerException("failed to make listener", e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ run.destroy();
+ } catch (RemoteException | ImplementationException e) {
+ log.warn("failed to destroy run", e);
+ }
+ }
+
+ @Override
+ public Date getExpiry() {
+ return new Date(expiry.getTime());
+ }
+
+ @Override
+ public List<Listener> getListeners() {
+ List<Listener> listeners = new ArrayList<>();
+ try {
+ for (RemoteListener rl : run.getListeners())
+ listeners.add(new ListenerDelegate(rl));
+ } catch (RemoteException e) {
+ log.warn("failed to get listeners", e);
+ }
+ return listeners;
+ }
+
+ @Override
+ public TavernaSecurityContext getSecurityContext() {
+ return secContext;
+ }
+
+ @Override
+ public Status getStatus() {
+ try {
+ switch (run.getStatus()) {
+ case Initialized:
+ return Status.Initialized;
+ case Operating:
+ return Status.Operating;
+ case Stopped:
+ return Status.Stopped;
+ case Finished:
+ return Status.Finished;
+ }
+ } catch (RemoteException e) {
+ log.warn("problem getting remote status", e);
+ }
+ return Status.Finished;
+ }
+
+ @Override
+ public Workflow getWorkflow() {
+ return workflow;
+ }
+
+ @Override
+ public Directory getWorkingDirectory() throws FilesystemAccessException {
+ try {
+ return new DirectoryDelegate(run.getWorkingDirectory());
+ } catch (Throwable e) {
+ if (e.getCause() != null)
+ e = e.getCause();
+ throw new FilesystemAccessException(
+ "problem getting main working directory handle", e);
+ }
+ }
+
+ @Override
+ public void setExpiry(Date d) {
+ if (d.after(new Date()))
+ expiry = new Date(d.getTime());
+ db.flushToDisk(this);
+ }
+
+ @Override
+ public String setStatus(Status s) throws BadStateChangeException {
+ try {
+ log.info("setting status of run " + id + " to " + s);
+ switch (s) {
+ case Initialized:
+ run.setStatus(RemoteStatus.Initialized);
+ break;
+ case Operating:
+ if (run.getStatus() == RemoteStatus.Initialized) {
+ if (!factory.isAllowingRunsToStart())
+ throw new OverloadedException();
+ secContext.conveySecurity();
+ }
+ run.setGenerateProvenance(generateProvenance);
+ run.setStatus(RemoteStatus.Operating);
+ factory.getMasterEventFeed()
+ .started(
+ this,
+ "started run execution",
+ "The execution of run '" + getName()
+ + "' has started.");
+ break;
+ case Stopped:
+ run.setStatus(RemoteStatus.Stopped);
+ break;
+ case Finished:
+ run.setStatus(RemoteStatus.Finished);
+ break;
+ }
+ return null;
+ } catch (IllegalStateTransitionException e) {
+ throw new BadStateChangeException(e.getMessage());
+ } catch (RemoteException e) {
+ throw new BadStateChangeException(e.getMessage(), e.getCause());
+ } catch (GeneralSecurityException | IOException e) {
+ throw new BadStateChangeException(e.getMessage(), e);
+ } catch (ImplementationException e) {
+ if (e.getCause() != null)
+ throw new BadStateChangeException(e.getMessage(), e.getCause());
+ throw new BadStateChangeException(e.getMessage(), e);
+ } catch (StillWorkingOnItException e) {
+ log.info("still working on setting status of run " + id + " to "
+ + s, e);
+ return e.getMessage();
+ } catch (InterruptedException e) {
+ throw new BadStateChangeException(
+ "interrupted while waiting to insert notification into database");
+ }
+ }
+
+ static void checkBadFilename(String filename)
+ throws FilesystemAccessException {
+ if (filename.startsWith("/"))
+ throw new FilesystemAccessException("filename may not be absolute");
+ if (Arrays.asList(filename.split("/")).contains(".."))
+ throw new FilesystemAccessException(
+ "filename may not refer to parent");
+ }
+
+ @Override
+ public String getInputBaclavaFile() {
+ try {
+ return run.getInputBaclavaFile();
+ } catch (RemoteException e) {
+ log.warn("problem when fetching input baclava file", e);
+ return null;
+ }
+ }
+
+ @Override
+ public List<Input> getInputs() {
+ ArrayList<Input> inputs = new ArrayList<>();
+ try {
+ for (RemoteInput ri : run.getInputs())
+ inputs.add(new RunInput(ri));
+ } catch (RemoteException e) {
+ log.warn("problem when fetching list of workflow inputs", e);
+ }
+ return inputs;
+ }
+
+ @Override
+ public String getOutputBaclavaFile() {
+ try {
+ return run.getOutputBaclavaFile();
+ } catch (RemoteException e) {
+ log.warn("problem when fetching output baclava file", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Input makeInput(String name) throws BadStateChangeException {
+ try {
+ return new RunInput(run.makeInput(name));
+ } catch (RemoteException e) {
+ throw new BadStateChangeException("failed to make input", e);
+ }
+ }
+
+ @Override
+ public void setInputBaclavaFile(String filename)
+ throws FilesystemAccessException, BadStateChangeException {
+ checkBadFilename(filename);
+ try {
+ run.setInputBaclavaFile(filename);
+ } catch (RemoteException e) {
+ throw new FilesystemAccessException(
+ "cannot set input baclava file name", e);
+ }
+ }
+
+ @Override
+ public void setOutputBaclavaFile(String filename)
+ throws FilesystemAccessException, BadStateChangeException {
+ checkBadFilename(filename);
+ try {
+ run.setOutputBaclavaFile(filename);
+ } catch (RemoteException e) {
+ throw new FilesystemAccessException(
+ "cannot set output baclava file name", e);
+ }
+ }
+
+ @Override
+ public Date getCreationTimestamp() {
+ return creationInstant == null ? null : new Date(
+ creationInstant.getTime());
+ }
+
+ @Override
+ public Date getFinishTimestamp() {
+ try {
+ return run.getFinishTimestamp();
+ } catch (RemoteException e) {
+ log.info("failed to get finish timestamp", e);
+ return null;
+ }
+ }
+
+ @Override
+ public Date getStartTimestamp() {
+ try {
+ return run.getStartTimestamp();
+ } catch (RemoteException e) {
+ log.info("failed to get finish timestamp", e);
+ return null;
+ }
+ }
+
+ /**
+ * @param readers
+ * the readers to set
+ */
+ public void setReaders(Set<String> readers) {
+ this.readers = new HashSet<>(readers);
+ db.flushToDisk(this);
+ }
+
+ /**
+ * @return the readers
+ */
+ public Set<String> getReaders() {
+ return readers == null ? new HashSet<String>()
+ : unmodifiableSet(readers);
+ }
+
+ /**
+ * @param writers
+ * the writers to set
+ */
+ public void setWriters(Set<String> writers) {
+ this.writers = new HashSet<>(writers);
+ db.flushToDisk(this);
+ }
+
+ /**
+ * @return the writers
+ */
+ public Set<String> getWriters() {
+ return writers == null ? new HashSet<String>()
+ : unmodifiableSet(writers);
+ }
+
+ /**
+ * @param destroyers
+ * the destroyers to set
+ */
+ public void setDestroyers(Set<String> destroyers) {
+ this.destroyers = new HashSet<>(destroyers);
+ db.flushToDisk(this);
+ }
+
+ /**
+ * @return the destroyers
+ */
+ public Set<String> getDestroyers() {
+ return destroyers == null ? new HashSet<String>()
+ : unmodifiableSet(destroyers);
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ out.writeUTF(secContext.getOwner().getName());
+ out.writeObject(secContext.getFactory());
+ out.writeObject(new MarshalledObject<>(run));
+ }
+
+ @Override
+ public boolean getGenerateProvenance() {
+ return generateProvenance;
+ }
+
+ @Override
+ public void setGenerateProvenance(boolean generateProvenance) {
+ this.generateProvenance = generateProvenance;
+ db.flushToDisk(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ if (log == null)
+ log = getLog("Taverna.Server.LocalWorker");
+ final String creatorName = in.readUTF();
+ SecurityContextFactory factory = (SecurityContextFactory) in
+ .readObject();
+ try {
+ secContext = factory.create(this,
+ new UsernamePrincipal(creatorName));
+ } catch (RuntimeException | IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SecurityContextReconstructionException(e);
+ }
+ run = ((MarshalledObject<RemoteSingleRun>) in.readObject()).get();
+ }
+
+ public void setSecurityContext(TavernaSecurityContext tavernaSecurityContext) {
+ secContext = tavernaSecurityContext;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void setName(@Nonnull String name) {
+ if (name.length() > RunConnection.NAME_LENGTH)
+ this.name = name.substring(0, RunConnection.NAME_LENGTH);
+ else
+ this.name = name;
+ db.flushToDisk(this);
+ }
+
+ @Override
+ public void ping() throws UnknownRunException {
+ try {
+ run.ping();
+ } catch (RemoteException e) {
+ throw new UnknownRunException(e);
+ }
+ }
+}
+
+abstract class DEDelegate implements DirectoryEntry {
+ Log log = getLog("Taverna.Server.Worker");
+ private RemoteDirectoryEntry entry;
+ private String name;
+ private String full;
+ private Date cacheModTime;
+ private long cacheQueryTime = 0L;
+
+ DEDelegate(RemoteDirectoryEntry entry) {
+ this.entry = entry;
+ }
+
+ @Override
+ public void destroy() throws FilesystemAccessException {
+ try {
+ entry.destroy();
+ } catch (IOException e) {
+ throw new FilesystemAccessException(
+ "failed to delete directory entry", e);
+ }
+ }
+
+ @Override
+ public String getFullName() {
+ if (full != null)
+ return full;
+ String n = getName();
+ RemoteDirectoryEntry re = entry;
+ try {
+ while (true) {
+ RemoteDirectory parent = re.getContainingDirectory();
+ if (parent == null)
+ break;
+ n = parent.getName() + "/" + n;
+ re = parent;
+ }
+ } catch (RemoteException e) {
+ log.warn("failed to generate full name", e);
+ }
+ return (full = n);
+ }
+
+ @Override
+ public String getName() {
+ if (name == null)
+ try {
+ name = entry.getName();
+ } catch (RemoteException e) {
+ log.error("failed to get name", e);
+ }
+ return name;
+ }
+
+ @Override
+ public Date getModificationDate() {
+ if (cacheModTime == null || currentTimeMillis() - cacheQueryTime < 5000)
+ try {
+ cacheModTime = entry.getModificationDate();
+ cacheQueryTime = currentTimeMillis();
+ } catch (RemoteException e) {
+ log.error("failed to get modification time", e);
+ }
+ return cacheModTime;
+ }
+
+ @Override
+ public int compareTo(DirectoryEntry de) {
+ return getFullName().compareTo(de.getFullName());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && o instanceof DEDelegate
+ && getFullName().equals(((DEDelegate) o).getFullName());
+ }
+
+ @Override
+ public int hashCode() {
+ return getFullName().hashCode();
+ }
+}
+
+class DirectoryDelegate extends DEDelegate implements Directory {
+ RemoteDirectory rd;
+
+ DirectoryDelegate(RemoteDirectory dir) {
+ super(dir);
+ rd = dir;
+ }
+
+ @Override
+ public Collection<DirectoryEntry> getContents()
+ throws FilesystemAccessException {
+ ArrayList<DirectoryEntry> result = new ArrayList<>();
+ try {
+ for (RemoteDirectoryEntry rde : rd.getContents()) {
+ if (rde instanceof RemoteDirectory)
+ result.add(new DirectoryDelegate((RemoteDirectory) rde));
+ else
+ result.add(new FileDelegate((RemoteFile) rde));
+ }
+ } catch (IOException e) {
+ throw new FilesystemAccessException(
+ "failed to get directory contents", e);
+ }
+ return result;
+ }
+
+ @Override
+ public Collection<DirectoryEntry> getContentsByDate()
+ throws FilesystemAccessException {
+ ArrayList<DirectoryEntry> result = new ArrayList<>(getContents());
+ sort(result, new DateComparator());
+ return result;
+ }
+
+ static class DateComparator implements Comparator<DirectoryEntry> {
+ @Override
+ public int compare(DirectoryEntry a, DirectoryEntry b) {
+ return a.getModificationDate().compareTo(b.getModificationDate());
+ }
+ }
+
+ @Override
+ public File makeEmptyFile(Principal actor, String name)
+ throws FilesystemAccessException {
+ try {
+ return new FileDelegate(rd.makeEmptyFile(name));
+ } catch (IOException e) {
+ throw new FilesystemAccessException("failed to make empty file", e);
+ }
+ }
+
+ @Override
+ public Directory makeSubdirectory(Principal actor, String name)
+ throws FilesystemAccessException {
+ try {
+ return new DirectoryDelegate(rd.makeSubdirectory(name));
+ } catch (IOException e) {
+ throw new FilesystemAccessException("failed to make subdirectory",
+ e);
+ }
+ }
+
+ @Override
+ public ZipStream getContentsAsZip() throws FilesystemAccessException {
+ ZipStream zs = new ZipStream();
+
+ final ZipOutputStream zos;
+ try {
+ zos = new ZipOutputStream(new PipedOutputStream(zs));
+ } catch (IOException e) {
+ throw new FilesystemAccessException("problem building zip stream",
+ e);
+ }
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zipDirectory(rd, null, zos);
+ } catch (IOException e) {
+ log.warn("problem when zipping directory", e);
+ } finally {
+ closeQuietly(zos);
+ }
+ }
+ });
+ t.setDaemon(true);
+ t.start();
+ return zs;
+ }
+
+ /**
+ * Compresses a directory tree into a ZIP.
+ *
+ * @param dir
+ * The directory to compress.
+ * @param base
+ * The base name of the directory (or <tt>null</tt> if this is
+ * the root directory of the ZIP).
+ * @param zos
+ * Where to write the compressed data.
+ * @throws RemoteException
+ * If some kind of problem happens with the remote delegates.
+ * @throws IOException
+ * If we run into problems with reading or writing data.
+ */
+ void zipDirectory(RemoteDirectory dir, String base, ZipOutputStream zos)
+ throws RemoteException, IOException {
+ for (RemoteDirectoryEntry rde : dir.getContents()) {
+ String name = rde.getName();
+ if (base != null)
+ name = base + "/" + name;
+ if (rde instanceof RemoteDirectory) {
+ RemoteDirectory rd = (RemoteDirectory) rde;
+ zipDirectory(rd, name, zos);
+ } else {
+ RemoteFile rf = (RemoteFile) rde;
+ zos.putNextEntry(new ZipEntry(name));
+ try {
+ int off = 0;
+ while (true) {
+ byte[] c = rf.getContents(off, 64 * 1024);
+ if (c == null || c.length == 0)
+ break;
+ zos.write(c);
+ off += c.length;
+ }
+ } finally {
+ zos.closeEntry();
+ }
+ }
+ }
+ }
+}
+
+class FileDelegate extends DEDelegate implements File {
+ RemoteFile rf;
+
+ FileDelegate(RemoteFile f) {
+ super(f);
+ this.rf = f;
+ }
+
+ @Override
+ public byte[] getContents(int offset, int length)
+ throws FilesystemAccessException {
+ try {
+ return rf.getContents(offset, length);
+ } catch (IOException e) {
+ throw new FilesystemAccessException("failed to read file contents",
+ e);
+ }
+ }
+
+ @Override
+ public long getSize() throws FilesystemAccessException {
+ try {
+ return rf.getSize();
+ } catch (IOException e) {
+ throw new FilesystemAccessException("failed to get file length", e);
+ }
+ }
+
+ @Override
+ public void setContents(byte[] data) throws FilesystemAccessException {
+ try {
+ rf.setContents(data);
+ } catch (IOException e) {
+ throw new FilesystemAccessException(
+ "failed to write file contents", e);
+ }
+ }
+
+ @Override
+ public void appendContents(byte[] data) throws FilesystemAccessException {
+ try {
+ rf.appendContents(data);
+ } catch (IOException e) {
+ throw new FilesystemAccessException(
+ "failed to write file contents", e);
+ }
+ }
+
+ @Override
+ public void copy(File from) throws FilesystemAccessException {
+ FileDelegate fromFile;
+ try {
+ fromFile = (FileDelegate) from;
+ } catch (ClassCastException e) {
+ throw new FilesystemAccessException("different types of File?!");
+ }
+
+ try {
+ rf.copy(fromFile.rf);
+ } catch (Exception e) {
+ throw new FilesystemAccessException("failed to copy file contents",
+ e);
+ }
+ return;
+ }
+}
+
+class ListenerDelegate implements Listener {
+ private Log log = getLog("Taverna.Server.Worker");
+ private RemoteListener r;
+ String conf;
+
+ ListenerDelegate(RemoteListener l) {
+ r = l;
+ }
+
+ RemoteListener getRemote() {
+ return r;
+ }
+
+ @Override
+ public String getConfiguration() {
+ try {
+ if (conf == null)
+ conf = r.getConfiguration();
+ } catch (RemoteException e) {
+ log.warn("failed to get configuration", e);
+ }
+ return conf;
+ }
+
+ @Override
+ public String getName() {
+ try {
+ return r.getName();
+ } catch (RemoteException e) {
+ log.warn("failed to get name", e);
+ return "UNKNOWN NAME";
+ }
+ }
+
+ @Override
+ public String getProperty(String propName) throws NoListenerException {
+ try {
+ return r.getProperty(propName);
+ } catch (RemoteException e) {
+ throw new NoListenerException("no such property: " + propName, e);
+ }
+ }
+
+ @Override
+ public String getType() {
+ try {
+ return r.getType();
+ } catch (RemoteException e) {
+ log.warn("failed to get type", e);
+ return "UNKNOWN TYPE";
+ }
+ }
+
+ @Override
+ public String[] listProperties() {
+ try {
+ return r.listProperties();
+ } catch (RemoteException e) {
+ log.warn("failed to list properties", e);
+ return new String[0];
+ }
+ }
+
+ @Override
+ public void setProperty(String propName, String value)
+ throws NoListenerException, BadPropertyValueException {
+ try {
+ r.setProperty(propName, value);
+ } catch (RemoteException e) {
+ log.warn("failed to set property", e);
+ if (e.getCause() != null
+ && e.getCause() instanceof RuntimeException)
+ throw new NoListenerException("failed to set property",
+ e.getCause());
+ if (e.getCause() != null && e.getCause() instanceof Exception)
+ throw new BadPropertyValueException("failed to set property",
+ e.getCause());
+ throw new BadPropertyValueException("failed to set property", e);
+ }
+ }
+}
+
+class RunInput implements Input {
+ private final RemoteInput i;
+
+ RunInput(RemoteInput remote) {
+ this.i = remote;
+ }
+
+ @Override
+ public String getFile() {
+ try {
+ return i.getFile();
+ } catch (RemoteException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public String getName() {
+ try {
+ return i.getName();
+ } catch (RemoteException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public String getValue() {
+ try {
+ return i.getValue();
+ } catch (RemoteException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void setFile(String file) throws FilesystemAccessException,
+ BadStateChangeException {
+ checkBadFilename(file);
+ try {
+ i.setFile(file);
+ } catch (RemoteException e) {
+ throw new FilesystemAccessException("cannot set file for input", e);
+ }
+ }
+
+ @Override
+ public void setValue(String value) throws BadStateChangeException {
+ try {
+ i.setValue(value);
+ } catch (RemoteException e) {
+ throw new BadStateChangeException(e);
+ }
+ }
+
+ @Override
+ public String getDelimiter() {
+ try {
+ return i.getDelimiter();
+ } catch (RemoteException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void setDelimiter(String delimiter) throws BadStateChangeException {
+ try {
+ if (delimiter != null)
+ delimiter = delimiter.substring(0, 1);
+ i.setDelimiter(delimiter);
+ } catch (RemoteException e) {
+ throw new BadStateChangeException(e);
+ }
+ }
+}
+
+@SuppressWarnings("serial")
+class SecurityContextReconstructionException extends RuntimeException {
+ public SecurityContextReconstructionException(Throwable t) {
+ super("failed to rebuild security context", t);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
new file mode 100644
index 0000000..0c2b1a9
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunConnection.java
@@ -0,0 +1,252 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.taverna.server.master.worker.RunConnection.COUNT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.NAMES_QUERY;
+import static org.taverna.server.master.worker.RunConnection.SCHEMA;
+import static org.taverna.server.master.worker.RunConnection.TABLE;
+import static org.taverna.server.master.worker.RunConnection.TIMEOUT_QUERY;
+import static org.taverna.server.master.worker.RunConnection.UNTERMINATED_QUERY;
+
+import java.io.IOException;
+import java.rmi.MarshalledObject;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.taverna.server.localworker.remote.RemoteSingleRun;
+import org.taverna.server.master.common.Credential;
+import org.taverna.server.master.common.Trust;
+import org.taverna.server.master.common.Workflow;
+import org.taverna.server.master.interfaces.SecurityContextFactory;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The representation of the connections to the runs that actually participates
+ * in the persistence system.
+ *
+ * @author Donal Fellows
+ */
+@PersistenceCapable(table = TABLE, schema = SCHEMA)
+@Queries({
+ @Query(name = "count", language = "SQL", value = COUNT_QUERY, unique = "true", resultClass = Integer.class),
+ @Query(name = "names", language = "SQL", value = NAMES_QUERY, unique = "false", resultClass = String.class),
+ @Query(name = "unterminated", language = "SQL", value = UNTERMINATED_QUERY, unique = "false", resultClass = String.class),
+ @Query(name = "timedout", language = "SQL", value = TIMEOUT_QUERY, unique = "false", resultClass = String.class) })
+public class RunConnection {
+ static final String SCHEMA = "TAVERNA";
+ static final String TABLE = "RUN_CONNECTION";
+ private static final String FULL_NAME = SCHEMA + "." + TABLE;
+ static final String COUNT_QUERY = "SELECT count(*) FROM " + FULL_NAME;
+ static final String NAMES_QUERY = "SELECT ID FROM " + FULL_NAME;
+ static final String TIMEOUT_QUERY = "SELECT ID FROM " + FULL_NAME
+ + " WHERE expiry < CURRENT_TIMESTAMP";
+ static final String UNTERMINATED_QUERY = "SELECT ID FROM " + FULL_NAME
+ + " WHERE doneTransitionToFinished = 0";
+ static final int NAME_LENGTH = 48;
+
+ @PrimaryKey
+ @Column(length = 40)
+ private String id;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Column(length = NAME_LENGTH)
+ private String name;
+
+ @Persistent(defaultFetchGroup = "true")
+ private Date creationInstant;
+
+ @Persistent(defaultFetchGroup = "true", serialized = "true")
+ @Column(jdbcType = "BLOB", sqlType = "BLOB")
+ private Workflow workflow;
+
+ @Persistent(defaultFetchGroup = "true")
+ private Date expiry;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Join(table = TABLE + "_READERS", column = "ID")
+ private String[] readers;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Join(table = TABLE + "_WRITERS", column = "ID")
+ private String[] writers;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Join(table = TABLE + "_DESTROYERS", column = "ID")
+ private String[] destroyers;
+
+ @Persistent(defaultFetchGroup = "true", serialized = "true")
+ @Column(jdbcType = "BLOB", sqlType = "BLOB")
+ private MarshalledObject<RemoteSingleRun> run;
+
+ @Persistent(defaultFetchGroup = "true")
+ private int doneTransitionToFinished;
+
+ @Persistent(defaultFetchGroup = "true")
+ private int generateProvenance;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Column(length = 128)
+ String owner;
+
+ @Persistent(defaultFetchGroup = "true")
+ @Column(length = 36)
+ private String securityToken;
+
+ @Persistent(defaultFetchGroup = "true", serialized = "true")
+ @Column(jdbcType = "BLOB", sqlType = "BLOB")
+ private SecurityContextFactory securityContextFactory;
+ @Persistent(defaultFetchGroup = "true", serialized = "true")
+ @Column(jdbcType = "BLOB", sqlType = "BLOB")
+ private Credential[] credentials;
+ @Persistent(defaultFetchGroup = "true", serialized = "true")
+ @Column(jdbcType = "BLOB", sqlType = "BLOB")
+ private Trust[] trust;
+
+ private static final String[] STRING_ARY = new String[0];
+
+ public String getId() {
+ return id;
+ }
+
+ public boolean isFinished() {
+ return doneTransitionToFinished != 0;
+ }
+
+ public void setFinished(boolean finished) {
+ doneTransitionToFinished = (finished ? 1 : 0);
+ }
+
+ public boolean isProvenanceGenerated() {
+ return generateProvenance != 0;
+ }
+
+ public void setProvenanceGenerated(boolean generate) {
+ generateProvenance = (generate ? 1 : 0);
+ }
+
+ /**
+ * Manufacture a persistent representation of the given workflow run. Must
+ * be called within the context of a transaction.
+ *
+ * @param rrd
+ * The remote delegate of the workflow run.
+ * @return The persistent object.
+ * @throws IOException
+ * If serialisation fails.
+ */
+ @Nonnull
+ public static RunConnection toDBform(@Nonnull RemoteRunDelegate rrd)
+ throws IOException {
+ RunConnection rc = new RunConnection();
+ rc.id = rrd.id;
+ rc.makeChanges(rrd);
+ return rc;
+ }
+
+ private static List<String> list(String[] ary) {
+ if (ary == null)
+ return emptyList();
+ return asList(ary);
+ }
+
+ /**
+ * Get the remote run delegate for a particular persistent connection. Must
+ * be called within the context of a transaction.
+ *
+ * @param db
+ * The database facade.
+ * @return The delegate object.
+ * @throws Exception
+ * If anything goes wrong.
+ */
+ @Nonnull
+ public RemoteRunDelegate fromDBform(@Nonnull RunDBSupport db)
+ throws Exception {
+ RemoteRunDelegate rrd = new RemoteRunDelegate();
+ rrd.id = getId();
+ rrd.creationInstant = creationInstant;
+ rrd.workflow = workflow;
+ rrd.expiry = expiry;
+ rrd.readers = new HashSet<>(list(readers));
+ rrd.writers = new HashSet<>(list(writers));
+ rrd.destroyers = new HashSet<>(list(destroyers));
+ rrd.run = run.get();
+ rrd.doneTransitionToFinished = isFinished();
+ rrd.generateProvenance = isProvenanceGenerated();
+ rrd.secContext = securityContextFactory.create(rrd,
+ new UsernamePrincipal(owner));
+ ((SecurityContextDelegate)rrd.secContext).setCredentialsAndTrust(credentials,trust);
+ rrd.db = db;
+ rrd.factory = db.getFactory();
+ rrd.name = name;
+ return rrd;
+ }
+
+ /**
+ * Flush changes from a remote run delegate to the database. Must be called
+ * within the context of a transaction.
+ *
+ * @param rrd
+ * The remote run delegate object that has potential changes.
+ * @throws IOException
+ * If anything goes wrong in serialization.
+ */
+ public void makeChanges(@Nonnull RemoteRunDelegate rrd) throws IOException {
+ // Properties that are set exactly once
+ if (creationInstant == null) {
+ creationInstant = rrd.getCreationTimestamp();
+ workflow = rrd.getWorkflow();
+ run = new MarshalledObject<>(rrd.run);
+ securityContextFactory = rrd.getSecurityContext().getFactory();
+ owner = rrd.getSecurityContext().getOwner().getName();
+ securityToken = ((org.taverna.server.master.worker.SecurityContextFactory) securityContextFactory)
+ .issueNewPassword();
+ }
+ // Properties that are set multiple times
+ expiry = rrd.getExpiry();
+ readers = rrd.getReaders().toArray(STRING_ARY);
+ writers = rrd.getWriters().toArray(STRING_ARY);
+ destroyers = rrd.getDestroyers().toArray(STRING_ARY);
+ credentials = rrd.getSecurityContext().getCredentials();
+ trust = rrd.getSecurityContext().getTrusted();
+ if (rrd.name.length() > NAME_LENGTH)
+ this.name = rrd.name.substring(0, NAME_LENGTH);
+ else
+ this.name = rrd.name;
+ setFinished(rrd.doneTransitionToFinished);
+ setProvenanceGenerated(rrd.generateProvenance);
+ }
+
+ public String getSecurityToken() {
+ return securityToken;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
new file mode 100644
index 0000000..5fa96b8
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDBSupport.java
@@ -0,0 +1,96 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.taverna.server.master.notification.NotificationEngine;
+
+/**
+ * The interface to the database of runs.
+ *
+ * @author Donal Fellows
+ */
+public interface RunDBSupport {
+ /**
+ * Scan each run to see if it has finished yet and issue registered
+ * notifications if it has.
+ */
+ void checkForFinishNow();
+
+ /**
+ * Remove currently-expired runs from this database.
+ */
+ void cleanNow();
+
+ /**
+ * How many runs are stored in the database.
+ *
+ * @return The current size of the run table.
+ */
+ int countRuns();
+
+ /**
+ * Ensure that a run gets persisted in the database. It is assumed that the
+ * value is already in there.
+ *
+ * @param run
+ * The run to persist.
+ */
+ void flushToDisk(@Nonnull RemoteRunDelegate run);
+
+ /**
+ * Select an arbitrary representative run.
+ *
+ * @return The selected run.
+ * @throws Exception
+ * If anything goes wrong.
+ */
+ @Nullable
+ RemoteRunDelegate pickArbitraryRun() throws Exception;
+
+ /**
+ * Get a list of all the run names.
+ *
+ * @return The names (i.e., UUIDs) of all the runs.
+ */
+ @Nonnull
+ List<String> listRunNames();
+
+ /**
+ * @param notificationEngine
+ * A reference to the notification fabric bean.
+ */
+ void setNotificationEngine(NotificationEngine notificationEngine);
+
+ /**
+ * @param notifier
+ * A reference to the bean that creates messages about workflow
+ * run termination.
+ */
+ void setNotifier(CompletionNotifier notifier);
+
+ /**
+ * @return A reference to the actual factory for remote runs.
+ */
+ FactoryBean getFactory();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
new file mode 100644
index 0000000..65aec70
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabase.java
@@ -0,0 +1,324 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Integer.parseInt;
+import static java.util.UUID.randomUUID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.exceptions.UnknownRunException;
+import org.taverna.server.master.interfaces.Listener;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.RunStore;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.notification.NotificationEngine;
+import org.taverna.server.master.notification.NotificationEngine.Message;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The main facade bean that interfaces to the database of runs.
+ *
+ * @author Donal Fellows
+ */
+public class RunDatabase implements RunStore, RunDBSupport {
+ private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+ RunDatabaseDAO dao;
+ CompletionNotifier backupNotifier;
+ Map<String, CompletionNotifier> typedNotifiers;
+ private NotificationEngine notificationEngine;
+ @Autowired
+ private FactoryBean factory;
+ private Map<String, TavernaRun> cache = new HashMap<>();
+
+ @Override
+ @Required
+ public void setNotifier(CompletionNotifier n) {
+ backupNotifier = n;
+ }
+
+ public void setTypeNotifiers(List<CompletionNotifier> notifiers) {
+ typedNotifiers = new HashMap<>();
+ for (CompletionNotifier n : notifiers)
+ typedNotifiers.put(n.getName(), n);
+ }
+
+ @Required
+ @Override
+ public void setNotificationEngine(NotificationEngine notificationEngine) {
+ this.notificationEngine = notificationEngine;
+ }
+
+ @Required
+ public void setDao(RunDatabaseDAO dao) {
+ this.dao = dao;
+ }
+
+ @Override
+ public void checkForFinishNow() {
+ /*
+ * Get which runs are actually newly finished; this requires getting the
+ * candidates from the database and *then* doing the expensive requests
+ * to the back end to find out the status.
+ */
+ Map<String, RemoteRunDelegate> notifiable = new HashMap<>();
+ for (RemoteRunDelegate p : dao.getPotentiallyNotifiable())
+ if (p.getStatus() == Status.Finished)
+ notifiable.put(p.getId(), p);
+
+ // Check if there's nothing more to do
+ if (notifiable.isEmpty())
+ return;
+
+ /*
+ * Tell the database about the ones we've got.
+ */
+ dao.markFinished(notifiable.keySet());
+
+ /*
+ * Send out the notifications. The notification addresses are stored in
+ * the back-end engine, so this is *another* thing that can take time.
+ */
+ for (RemoteRunDelegate rrd : notifiable.values())
+ for (Listener l : rrd.getListeners())
+ if (l.getName().equals("io")) {
+ try {
+ notifyFinished(rrd.id, l, rrd);
+ } catch (Exception e) {
+ log.warn("failed to do notification of completion", e);
+ }
+ break;
+ }
+ }
+
+ @Override
+ public void cleanNow() {
+ List<String> cleaned;
+ try {
+ cleaned = dao.doClean();
+ } catch (Exception e) {
+ log.warn("failure during deletion of expired runs", e);
+ return;
+ }
+ synchronized (cache) {
+ for (String id : cleaned)
+ cache.remove(id);
+ }
+ }
+
+ @Override
+ public int countRuns() {
+ return dao.countRuns();
+ }
+
+ @Override
+ public void flushToDisk(RemoteRunDelegate run) {
+ try {
+ dao.flushToDisk(run);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "unexpected problem when persisting run record in database",
+ e);
+ }
+ }
+
+ @Override
+ public RemoteRunDelegate pickArbitraryRun() throws Exception {
+ return dao.pickArbitraryRun();
+ }
+
+ @Override
+ public List<String> listRunNames() {
+ return dao.listRunNames();
+ }
+
+ @Nullable
+ private TavernaRun get(String uuid) {
+ TavernaRun run = null;
+ synchronized (cache) {
+ run = cache.get(uuid);
+ }
+ try {
+ if (run != null)
+ run.ping();
+ } catch (UnknownRunException e) {
+ if (log.isDebugEnabled())
+ log.debug("stale mapping in cache?", e);
+ // Don't need to flush the cache; this happens when cleaning anyway
+ run = null;
+ }
+ if (run == null)
+ run = dao.get(uuid);
+ return run;
+ }
+
+ @Override
+ public TavernaRun getRun(UsernamePrincipal user, Policy p, String uuid)
+ throws UnknownRunException {
+ // Check first to see if the 'uuid' actually looks like a UUID; if
+ // not, throw it out immediately without logging an exception.
+ try {
+ UUID.fromString(uuid);
+ } catch (IllegalArgumentException e) {
+ if (log.isDebugEnabled())
+ log.debug("run ID does not look like UUID; rejecting...");
+ throw new UnknownRunException();
+ }
+ TavernaRun run = get(uuid);
+ if (run != null && (user == null || p.permitAccess(user, run)))
+ return run;
+ throw new UnknownRunException();
+ }
+
+ @Override
+ public TavernaRun getRun(String uuid) throws UnknownRunException {
+ TavernaRun run = get(uuid);
+ if (run != null)
+ return run;
+ throw new UnknownRunException();
+ }
+
+ @Override
+ public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+ synchronized (cache) {
+ Map<String, TavernaRun> cached = new HashMap<>();
+ for (Entry<String, TavernaRun> e : cache.entrySet()) {
+ TavernaRun r = e.getValue();
+ if (p.permitAccess(user, r))
+ cached.put(e.getKey(), r);
+ }
+ if (!cached.isEmpty())
+ return cached;
+ }
+ return dao.listRuns(user, p);
+ }
+
+ private void logLength(String message, Object obj) {
+ if (!log.isDebugEnabled())
+ return;
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(obj);
+ }
+ log.debug(message + ": " + baos.size());
+ } catch (Exception e) {
+ log.warn("oops", e);
+ }
+ }
+
+ @Override
+ public String registerRun(TavernaRun run) {
+ if (!(run instanceof RemoteRunDelegate))
+ throw new IllegalArgumentException(
+ "run must be created by localworker package");
+ RemoteRunDelegate rrd = (RemoteRunDelegate) run;
+ if (rrd.id == null)
+ rrd.id = randomUUID().toString();
+ logLength("RemoteRunDelegate serialized length", rrd);
+ try {
+ dao.persistRun(rrd);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "unexpected problem when persisting run record in database",
+ e);
+ }
+ synchronized (cache) {
+ cache.put(rrd.getId(), run);
+ }
+ return rrd.getId();
+ }
+
+ @Override
+ public void unregisterRun(String uuid) {
+ try {
+ if (dao.unpersistRun(uuid))
+ synchronized (cache) {
+ cache.remove(uuid);
+ }
+ } catch (RuntimeException e) {
+ if (log.isDebugEnabled())
+ log.debug("problem persisting the deletion of the run " + uuid,
+ e);
+ }
+ }
+
+ /**
+ * Process the event that a run has finished.
+ *
+ * @param name
+ * The name of the run.
+ * @param io
+ * The io listener of the run (used to get information about the
+ * run).
+ * @param run
+ * The handle to the run.
+ * @throws Exception
+ * If anything goes wrong.
+ */
+ private void notifyFinished(final String name, Listener io,
+ final RemoteRunDelegate run) throws Exception {
+ String to = io.getProperty("notificationAddress");
+ final int code;
+ try {
+ code = parseInt(io.getProperty("exitcode"));
+ } catch (NumberFormatException nfe) {
+ // Ignore; not much we can do here...
+ return;
+ }
+
+ notificationEngine.dispatchMessage(run, to, new Message() {
+ private CompletionNotifier getNotifier(String type) {
+ CompletionNotifier n = typedNotifiers.get(type);
+ if (n == null)
+ n = backupNotifier;
+ return n;
+ }
+
+ @Override
+ public String getContent(String type) {
+ return getNotifier(type).makeCompletionMessage(name, run, code);
+ }
+
+ @Override
+ public String getTitle(String type) {
+ return getNotifier(type).makeMessageSubject(name, run, code);
+ }
+ });
+ }
+
+ @Override
+ public FactoryBean getFactory() {
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
new file mode 100644
index 0000000..1c75d22
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/worker/RunDatabaseDAO.java
@@ -0,0 +1,323 @@
+/*
+ */
+package org.taverna.server.master.worker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.worker.RunConnection.toDBform;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.Policy;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.utils.CallTimeLogger.PerfLogged;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * This handles storing runs, interfacing with the underlying state engine as
+ * necessary.
+ *
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class RunDatabaseDAO extends JDOSupport<RunConnection> {
+ public RunDatabaseDAO() {
+ super(RunConnection.class);
+ }
+
+ private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
+ private RunDatabase facade;
+
+ @Required
+ public void setFacade(RunDatabase facade) {
+ this.facade = facade;
+ }
+
+ // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+ @SuppressWarnings("unchecked")
+ private List<String> names() {
+ if (log.isDebugEnabled())
+ log.debug("fetching all run names");
+ return (List<String>) namedQuery("names").execute();
+ }
+
+ /**
+ * @return The number of workflow runs in the database.
+ */
+ @WithinSingleTransaction
+ public int countRuns() {
+ if (log.isDebugEnabled())
+ log.debug("counting the number of runs");
+ return count();
+ }
+
+ private Integer count() {
+ return (Integer) namedQuery("count").execute();
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> timedout() {
+ return (List<String>) namedQuery("timedout").execute();
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> unterminated() {
+ return (List<String>) namedQuery("unterminated").execute();
+ }
+
+ @Nullable
+ private RunConnection pickRun(@Nonnull String name) {
+ if (log.isDebugEnabled())
+ log.debug("fetching the run called " + name);
+ try {
+ RunConnection rc = getById(name);
+ if (rc == null)
+ log.warn("no result for " + name);
+ return rc;
+ } catch (RuntimeException e) {
+ log.warn("problem in fetch", e);
+ throw e;
+ }
+ }
+
+ @Nullable
+ @WithinSingleTransaction
+ public String getSecurityToken(@Nonnull String name) {
+ RunConnection rc = getById(name);
+ if (rc == null)
+ return null;
+ return rc.getSecurityToken();
+ }
+
+ private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException {
+ persist(toDBform(rrd));
+ }
+
+ @Nonnull
+ private List<RunConnection> allRuns() {
+ try {
+ List<RunConnection> rcs = new ArrayList<>();
+ List<String> names = names();
+ for (String id : names) {
+ try {
+ if (id != null)
+ rcs.add(pickRun(id));
+ } catch (RuntimeException e) {
+ continue;
+ }
+ }
+ return rcs;
+ } catch (RuntimeException e) {
+ log.warn("problem in fetch", e);
+ throw e;
+ }
+ }
+
+ // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+ /**
+ * Obtain a workflow run handle.
+ *
+ * @param name
+ * The identifier of the run.
+ * @return The run handle, or <tt>null</tt> if there is no such run.
+ */
+ @Nullable
+ @WithinSingleTransaction
+ public TavernaRun get(String name) {
+ try {
+ RunConnection rc = pickRun(name);
+ return (rc == null) ? null : rc.fromDBform(facade);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * Get the runs that a user can read things from.
+ *
+ * @param user
+ * Who is asking?
+ * @param p
+ * The policy that determines what they can see.
+ * @return A mapping from run IDs to run handles.
+ */
+ @Nonnull
+ @WithinSingleTransaction
+ public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
+ Map<String, TavernaRun> result = new HashMap<>();
+ for (String id : names())
+ try {
+ RemoteRunDelegate rrd = pickRun(id).fromDBform(facade);
+ if (p.permitAccess(user, rrd))
+ result.put(id, rrd);
+ } catch (Exception e) {
+ continue;
+ }
+ return result;
+ }
+
+ /**
+ * @return A list of the IDs for all workflow runs.
+ */
+ @Nonnull
+ @WithinSingleTransaction
+ public List<String> listRunNames() {
+ List<String> runNames = new ArrayList<>();
+ for (RunConnection rc : allRuns())
+ if (rc.getId() != null)
+ runNames.add(rc.getId());
+ return runNames;
+ }
+
+ /**
+ * @return An arbitrary, representative workflow run.
+ * @throws Exception
+ * If anything goes wrong.
+ */
+ @Nullable
+ @WithinSingleTransaction
+ public RemoteRunDelegate pickArbitraryRun() throws Exception {
+ for (RunConnection rc : allRuns()) {
+ if (rc.getId() == null)
+ continue;
+ return rc.fromDBform(facade);
+ }
+ return null;
+ }
+
+ /**
+ * Make a workflow run persistent. Must only be called once per workflow
+ * run.
+ *
+ * @param rrd
+ * The workflow run to persist.
+ * @throws IOException
+ * If anything goes wrong with serialisation of the run.
+ */
+ @WithinSingleTransaction
+ public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException {
+ persist(rrd);
+ }
+
+ /**
+ * Stop a workflow run from being persistent.
+ *
+ * @param name
+ * The ID of the run.
+ * @return Whether a deletion happened.
+ */
+ @WithinSingleTransaction
+ public boolean unpersistRun(String name) {
+ RunConnection rc = pickRun(name);
+ if (rc != null)
+ delete(rc);
+ return rc != null;
+ }
+
+ /**
+ * Ensure that the given workflow run is synchronized with the database.
+ *
+ * @param run
+ * The run to synchronise.
+ * @throws IOException
+ * If serialization of anything fails.
+ */
+ @WithinSingleTransaction
+ public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException {
+ getById(run.id).makeChanges(run);
+ }
+
+ /**
+ * Remove all workflow runs that have expired.
+ *
+ * @return The ids of the deleted runs.
+ */
+ @Nonnull
+ @PerfLogged
+ @WithinSingleTransaction
+ public List<String> doClean() {
+ if (log.isDebugEnabled())
+ log.debug("deleting runs that timed out before " + new Date());
+ List<String> toDelete = timedout();
+ if (log.isDebugEnabled())
+ log.debug("found " + toDelete.size() + " runs to delete");
+ for (String id : toDelete) {
+ RunConnection rc = getById(id);
+ try {
+ rc.fromDBform(facade).run.destroy();
+ } catch (Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("failed to delete execution resource for " + id,
+ e);
+ }
+ delete(rc);
+ }
+ return toDelete;
+ }
+
+ /**
+ * @return A list of workflow runs that are candidates for doing
+ * notification of termination.
+ */
+ @Nonnull
+ @PerfLogged
+ @WithinSingleTransaction
+ public List<RemoteRunDelegate> getPotentiallyNotifiable() {
+ List<RemoteRunDelegate> toNotify = new ArrayList<>();
+ for (String id : unterminated())
+ try {
+ RunConnection rc = getById(id);
+ toNotify.add(rc.fromDBform(facade));
+ } catch (Exception e) {
+ log.warn("failed to fetch connection token"
+ + "for notification of completion check", e);
+ }
+ return toNotify;
+ }
+
+ @PerfLogged
+ @WithinSingleTransaction
+ public void markFinished(@Nonnull Set<String> terminated) {
+ for (String id : terminated) {
+ RunConnection rc = getById(id);
+ if (rc == null)
+ continue;
+ try {
+ rc.fromDBform(facade).doneTransitionToFinished = true;
+ rc.setFinished(true);
+ } catch (Exception e) {
+ log.warn("failed to note termination", e);
+ }
+ }
+ }
+}