You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2018/01/09 23:30:43 UTC
[27/42] incubator-taverna-server git commit: package org.taverna ->
org.apache.taverna
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
new file mode 100644
index 0000000..e32dcca
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/LocalWorkerState.java
@@ -0,0 +1,475 @@
+/*
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.io.File.separator;
+import static java.lang.System.getProperty;
+import static java.rmi.registry.Registry.REGISTRY_PORT;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+import static org.taverna.server.master.defaults.Default.EXTRA_ARGUMENTS;
+import static org.taverna.server.master.defaults.Default.PASSWORD_FILE;
+import static org.taverna.server.master.defaults.Default.REGISTRY_JAR;
+import static org.taverna.server.master.defaults.Default.RMI_PREFIX;
+import static org.taverna.server.master.defaults.Default.RUN_LIFE_MINUTES;
+import static org.taverna.server.master.defaults.Default.RUN_OPERATING_LIMIT;
+import static org.taverna.server.master.defaults.Default.SECURE_FORK_IMPLEMENTATION_JAR;
+import static org.taverna.server.master.defaults.Default.SERVER_WORKER_IMPLEMENTATION_JAR;
+import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_POLL_SLEEP;
+import static org.taverna.server.master.defaults.Default.SUBPROCESS_START_WAIT;
+import static org.taverna.server.master.localworker.PersistedState.KEY;
+import static org.taverna.server.master.localworker.PersistedState.makeInstance;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URI;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.common.Status;
+import org.taverna.server.master.defaults.Default;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.worker.WorkerModel;
+
+/**
+ * The persistent state of a local worker factory.
+ *
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class LocalWorkerState extends JDOSupport<PersistedState> implements
+ WorkerModel {
+ public LocalWorkerState() {
+ super(PersistedState.class);
+ }
+
+ private LocalWorkerState self;
+
+ @Required
+ public void setSelf(LocalWorkerState self) {
+ this.self = self;
+ }
+
+ /** Initial lifetime of runs, in minutes. */
+ int defaultLifetime;
+ /**
+ * Maximum number of runs to exist at once. Note that this includes when
+ * they are just existing for the purposes of file transfer (
+ * {@link Status#Initialized}/{@link Status#Finished} states).
+ */
+ int maxRuns;
+ /**
+ * Prefix to use for RMI names.
+ */
+ String factoryProcessNamePrefix;
+ /**
+ * Full path name of the script used to start running a workflow; normally
+ * expected to be "<i>somewhere/</i><tt>executeWorkflow.sh</tt>".
+ */
+ String executeWorkflowScript;
+ /** Default value for {@link #executeWorkflowScript}. */
+ private transient String defaultExecuteWorkflowScript;
+ /**
+ * Full path name of the file containing the password used to launch workers
+ * as other users. The file is normally expected to contain a single line,
+ * the password, and to be thoroughly locked down so only the user running
+ * the server (e.g., "<tt>tomcat</tt>") can read it; it will probably reside
+ * in either the user's home directory or in a system configuration
+ * directory.
+ */
+ String passwordFile;
+ /** Default value for {@link #passwordFile}. */
+ private transient String defaultPasswordFile = PASSWORD_FILE;
+ /**
+ * The extra arguments to pass to the subprocess.
+ */
+ String[] extraArgs;
+ /**
+ * How long to wait for subprocess startup, in seconds.
+ */
+ int waitSeconds;
+ /**
+ * Polling interval to use during startup, in milliseconds.
+ */
+ int sleepMS;
+ /**
+ * Full path name to the worker process's implementation JAR.
+ */
+ String serverWorkerJar;
+ private static final String DEFAULT_WORKER_JAR = LocalWorkerState.class
+ .getClassLoader().getResource(SERVER_WORKER_IMPLEMENTATION_JAR)
+ .getFile();
+ /**
+ * Full path name to the Java binary to use to run the subprocess.
+ */
+ String javaBinary;
+ private static final String DEFAULT_JAVA_BINARY = getProperty("java.home")
+ + separator + "bin" + separator + "java";
+ /**
+ * Full path name to the secure fork process's implementation JAR.
+ */
+ String serverForkerJar;
+ private static final String DEFAULT_FORKER_JAR = LocalWorkerState.class
+ .getClassLoader().getResource(SECURE_FORK_IMPLEMENTATION_JAR)
+ .getFile();
+
+ String registryHost;
+ int registryPort;
+
+ int operatingLimit;
+
+ URI[] permittedWorkflows;
+ private String registryJar;
+ private static final String DEFAULT_REGISTRY_JAR = LocalWorkerState.class
+ .getClassLoader().getResource(REGISTRY_JAR).getFile();
+
+ @Override
+ public void setDefaultLifetime(int defaultLifetime) {
+ this.defaultLifetime = defaultLifetime;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public int getDefaultLifetime() {
+ return defaultLifetime < 1 ? RUN_LIFE_MINUTES : defaultLifetime;
+ }
+
+ @Override
+ public void setMaxRuns(int maxRuns) {
+ this.maxRuns = maxRuns;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public int getMaxRuns() {
+ return maxRuns < 1 ? Default.RUN_COUNT_MAX : maxRuns;
+ }
+
+ @Override
+ public int getOperatingLimit() {
+ return operatingLimit < 1 ? RUN_OPERATING_LIMIT : operatingLimit;
+ }
+
+ @Override
+ public void setOperatingLimit(int operatingLimit) {
+ this.operatingLimit = operatingLimit;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) {
+ this.factoryProcessNamePrefix = factoryProcessNamePrefix;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getFactoryProcessNamePrefix() {
+ return factoryProcessNamePrefix == null ? RMI_PREFIX
+ : factoryProcessNamePrefix;
+ }
+
+ @Override
+ public void setExecuteWorkflowScript(String executeWorkflowScript) {
+ this.executeWorkflowScript = executeWorkflowScript;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getExecuteWorkflowScript() {
+ return executeWorkflowScript == null ? defaultExecuteWorkflowScript
+ : executeWorkflowScript;
+ }
+
+ private static String guessWorkflowScript() {
+ File utilDir = new File(DEFAULT_WORKER_JAR).getParentFile();
+ File[] dirs = utilDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ // Support both taverna-commandline* (2.5) and
+ // taverna-command-line* (3.1)
+ return name.toLowerCase().startsWith("taverna-command");
+ }
+ });
+ if (dirs.length == 0) {
+ throw new IllegalStateException("Can't find taverna-command* distro in " + utilDir);
+ }
+ File script = new File(dirs[0], "executeworkflow.sh");
+ if (! script.isFile()) {
+ throw new IllegalStateException("Can't find launcher script " + script);
+ }
+ return script.toString();
+ }
+
+ /**
+ * Set what executeworkflow script to use by default. This is the value that
+ * is used if not overridden by the administration interface.
+ *
+ * @param defaultScript
+ * Full path to the script to use.
+ */
+ public void setDefaultExecuteWorkflowScript(String defaultScript) {
+ if (defaultScript.startsWith("${") || defaultScript.equals("NONE")) {
+ this.defaultExecuteWorkflowScript = guessWorkflowScript();
+ return;
+ }
+ this.defaultExecuteWorkflowScript = defaultScript;
+ }
+
+ String getDefaultExecuteWorkflowScript() {
+ return defaultExecuteWorkflowScript;
+ }
+
+ @Override
+ public void setExtraArgs(String[] extraArgs) {
+ this.extraArgs = extraArgs.clone();
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String[] getExtraArgs() {
+ return extraArgs == null ? EXTRA_ARGUMENTS : extraArgs.clone();
+ }
+
+ @Override
+ public void setWaitSeconds(int waitSeconds) {
+ this.waitSeconds = waitSeconds;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public int getWaitSeconds() {
+ return waitSeconds < 1 ? SUBPROCESS_START_WAIT : waitSeconds;
+ }
+
+ @Override
+ public void setSleepMS(int sleepMS) {
+ this.sleepMS = sleepMS;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public int getSleepMS() {
+ return sleepMS < 1 ? SUBPROCESS_START_POLL_SLEEP : sleepMS;
+ }
+
+ @Override
+ public void setServerWorkerJar(String serverWorkerJar) {
+ this.serverWorkerJar = serverWorkerJar;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getServerWorkerJar() {
+ return serverWorkerJar == null ? DEFAULT_WORKER_JAR : serverWorkerJar;
+ }
+
+ @Override
+ public void setServerForkerJar(String serverForkerJar) {
+ this.serverForkerJar = serverForkerJar;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getServerForkerJar() {
+ return serverForkerJar == null ? DEFAULT_FORKER_JAR : serverForkerJar;
+ }
+
+ @Override
+ public void setJavaBinary(String javaBinary) {
+ this.javaBinary = javaBinary;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getJavaBinary() {
+ return javaBinary == null ? DEFAULT_JAVA_BINARY : javaBinary;
+ }
+
+ @Override
+ public void setPasswordFile(String passwordFile) {
+ this.passwordFile = passwordFile;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getPasswordFile() {
+ return passwordFile == null ? defaultPasswordFile : passwordFile;
+ }
+
+ void setDefaultPasswordFile(String defaultPasswordFile) {
+ this.defaultPasswordFile = defaultPasswordFile;
+ }
+
+ @Override
+ public void setRegistryHost(String registryHost) {
+ this.registryHost = (registryHost == null ? "" : registryHost);
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public String getRegistryHost() {
+ return (registryHost == null || registryHost.isEmpty()) ? null
+ : registryHost;
+ }
+
+ @Override
+ public void setRegistryPort(int registryPort) {
+ this.registryPort = ((registryPort < 1 || registryPort > 65534) ? REGISTRY_PORT
+ : registryPort);
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public int getRegistryPort() {
+ return registryPort == 0 ? REGISTRY_PORT : registryPort;
+ }
+
+ @Override
+ public String getRegistryJar() {
+ return registryJar == null ? DEFAULT_REGISTRY_JAR : registryJar;
+ }
+
+ @Override
+ public void setRegistryJar(String rmiRegistryJar) {
+ this.registryJar = (rmiRegistryJar == null || rmiRegistryJar.isEmpty()) ? null
+ : rmiRegistryJar;
+ if (loadedState)
+ self.store();
+ }
+
+ @Override
+ public List<URI> getPermittedWorkflowURIs() {
+ if (permittedWorkflows == null || permittedWorkflows.length == 0)
+ return emptyList();
+ return unmodifiableList(asList(permittedWorkflows));
+ }
+
+ @Override
+ public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) {
+ if (permittedWorkflows == null || permittedWorkflows.isEmpty())
+ this.permittedWorkflows = new URI[0];
+ else
+ this.permittedWorkflows = permittedWorkflows
+ .toArray(new URI[permittedWorkflows.size()]);
+ if (loadedState)
+ self.store();
+ }
+
+ public static final boolean DEFAULT_GENERATE_PROVENANCE = false;
+ private Boolean generateProvenance;
+
+ @Override
+ public boolean getGenerateProvenance() {
+ Boolean g = generateProvenance;
+ return g == null ? DEFAULT_GENERATE_PROVENANCE : (boolean) g;
+ }
+
+ @Override
+ public void setGenerateProvenance(boolean generate) {
+ this.generateProvenance = generate;
+ if (loadedState)
+ self.store();
+ }
+
+ // --------------------------------------------------------------
+
+ private boolean loadedState;
+
+ @PostConstruct
+ @WithinSingleTransaction
+ public void load() {
+ if (loadedState || !isPersistent())
+ return;
+ WorkerModel state = getById(KEY);
+ if (state == null) {
+ store();
+ return;
+ }
+
+ defaultLifetime = state.getDefaultLifetime();
+ executeWorkflowScript = state.getExecuteWorkflowScript();
+ extraArgs = state.getExtraArgs();
+ factoryProcessNamePrefix = state.getFactoryProcessNamePrefix();
+ javaBinary = state.getJavaBinary();
+ maxRuns = state.getMaxRuns();
+ serverWorkerJar = state.getServerWorkerJar();
+ serverForkerJar = state.getServerForkerJar();
+ passwordFile = state.getPasswordFile();
+ sleepMS = state.getSleepMS();
+ waitSeconds = state.getWaitSeconds();
+ registryHost = state.getRegistryHost();
+ registryPort = state.getRegistryPort();
+ operatingLimit = state.getOperatingLimit();
+ List<URI> pwu = state.getPermittedWorkflowURIs();
+ permittedWorkflows = (URI[]) pwu.toArray(new URI[pwu.size()]);
+ registryJar = state.getRegistryJar();
+ generateProvenance = state.getGenerateProvenance();
+
+ loadedState = true;
+ }
+
+ @WithinSingleTransaction
+ public void store() {
+ if (!isPersistent())
+ return;
+ WorkerModel state = getById(KEY);
+ if (state == null)
+ state = persist(makeInstance());
+
+ state.setDefaultLifetime(defaultLifetime);
+ state.setExecuteWorkflowScript(executeWorkflowScript);
+ state.setExtraArgs(extraArgs);
+ state.setFactoryProcessNamePrefix(factoryProcessNamePrefix);
+ state.setJavaBinary(javaBinary);
+ state.setMaxRuns(maxRuns);
+ state.setServerWorkerJar(serverWorkerJar);
+ state.setServerForkerJar(serverForkerJar);
+ state.setPasswordFile(passwordFile);
+ state.setSleepMS(sleepMS);
+ state.setWaitSeconds(waitSeconds);
+ state.setRegistryHost(registryHost);
+ state.setRegistryPort(registryPort);
+ state.setOperatingLimit(operatingLimit);
+ if (permittedWorkflows != null)
+ state.setPermittedWorkflowURIs(asList(permittedWorkflows));
+ state.setRegistryJar(registryJar);
+ if (generateProvenance != null)
+ state.setGenerateProvenance(generateProvenance);
+
+ loadedState = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
new file mode 100644
index 0000000..3ed4c51
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/PersistedState.java
@@ -0,0 +1,270 @@
+/*
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jdo.annotations.Join;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+
+import org.taverna.server.master.worker.WorkerModel;
+
+/**
+ * The actual database connector for persisted local worker state.
+ *
+ * @author Donal Fellows
+ */
+/*
+ * WARNING! If you change the name of this class, update persistence.xml as
+ * well!
+ */
+@PersistenceCapable(table = PersistedState.TABLE)
+class PersistedState implements WorkerModel {
+ static final String TABLE = "LOCALWORKERSTATE__PERSISTEDSTATE";
+
+ static PersistedState makeInstance() {
+ PersistedState o = new PersistedState();
+ o.ID = KEY;
+ return o;
+ }
+
+ @PrimaryKey(column = "ID")
+ protected int ID;
+
+ static final int KEY = 32;
+
+ @Persistent
+ private int defaultLifetime;
+ @Persistent
+ private int maxRuns;
+ @Persistent
+ private String factoryProcessNamePrefix;
+ @Persistent
+ private String executeWorkflowScript;
+ @Persistent(serialized = "true")
+ private String[] extraArgs;
+ @Persistent
+ private int waitSeconds;
+ @Persistent
+ private int sleepMS;
+ @Persistent
+ private String serverWorkerJar;
+ @Persistent
+ private String serverForkerJar;
+ @Persistent
+ private String registryJar;
+ @Persistent
+ private String passwordFile;
+ @Persistent
+ private String javaBinary;
+ @Persistent
+ private int registryPort;
+ @Persistent
+ private String registryHost;
+ @Persistent
+ private int operatingLimit;
+ @Persistent(defaultFetchGroup = "true")
+ @Join(table = TABLE + "_PERMWFURI", column = "ID")
+ private String[] permittedWorkflows;
+ @Persistent
+ private int generateProvenance;
+
+ @Override
+ public void setDefaultLifetime(int defaultLifetime) {
+ this.defaultLifetime = defaultLifetime;
+ }
+
+ @Override
+ public int getDefaultLifetime() {
+ return defaultLifetime;
+ }
+
+ @Override
+ public void setMaxRuns(int maxRuns) {
+ this.maxRuns = maxRuns;
+ }
+
+ @Override
+ public int getMaxRuns() {
+ return maxRuns;
+ }
+
+ @Override
+ public void setFactoryProcessNamePrefix(String factoryProcessNamePrefix) {
+ this.factoryProcessNamePrefix = factoryProcessNamePrefix;
+ }
+
+ @Override
+ public String getFactoryProcessNamePrefix() {
+ return factoryProcessNamePrefix;
+ }
+
+ @Override
+ public void setExecuteWorkflowScript(String executeWorkflowScript) {
+ this.executeWorkflowScript = executeWorkflowScript;
+ }
+
+ @Override
+ public String getExecuteWorkflowScript() {
+ return executeWorkflowScript;
+ }
+
+ @Override
+ public void setExtraArgs(String[] extraArgs) {
+ this.extraArgs = extraArgs;
+ }
+
+ @Override
+ public String[] getExtraArgs() {
+ return extraArgs;
+ }
+
+ @Override
+ public void setWaitSeconds(int waitSeconds) {
+ this.waitSeconds = waitSeconds;
+ }
+
+ @Override
+ public int getWaitSeconds() {
+ return waitSeconds;
+ }
+
+ @Override
+ public void setSleepMS(int sleepMS) {
+ this.sleepMS = sleepMS;
+ }
+
+ @Override
+ public int getSleepMS() {
+ return sleepMS;
+ }
+
+ @Override
+ public void setServerWorkerJar(String serverWorkerJar) {
+ this.serverWorkerJar = serverWorkerJar;
+ }
+
+ @Override
+ public String getServerWorkerJar() {
+ return serverWorkerJar;
+ }
+
+ @Override
+ public void setJavaBinary(String javaBinary) {
+ this.javaBinary = javaBinary;
+ }
+
+ @Override
+ public String getJavaBinary() {
+ return javaBinary;
+ }
+
+ @Override
+ public void setRegistryPort(int registryPort) {
+ this.registryPort = registryPort;
+ }
+
+ @Override
+ public int getRegistryPort() {
+ return registryPort;
+ }
+
+ @Override
+ public void setRegistryHost(String registryHost) {
+ this.registryHost = registryHost;
+ }
+
+ @Override
+ public String getRegistryHost() {
+ return registryHost;
+ }
+
+ @Override
+ public void setServerForkerJar(String serverForkerJar) {
+ this.serverForkerJar = serverForkerJar;
+ }
+
+ @Override
+ public String getServerForkerJar() {
+ return serverForkerJar;
+ }
+
+ @Override
+ public void setPasswordFile(String passwordFile) {
+ this.passwordFile = passwordFile;
+ }
+
+ @Override
+ public String getPasswordFile() {
+ return passwordFile;
+ }
+
+ @Override
+ public void setOperatingLimit(int operatingLimit) {
+ this.operatingLimit = operatingLimit;
+ }
+
+ @Override
+ public int getOperatingLimit() {
+ return operatingLimit;
+ }
+
+ @Override
+ public List<URI> getPermittedWorkflowURIs() {
+ String[] pw = this.permittedWorkflows;
+ if (pw == null)
+ return new ArrayList<>();
+ List<URI> uris = new ArrayList<>(pw.length);
+ for (String uri : pw)
+ uris.add(URI.create(uri));
+ return uris;
+ }
+
+ @Override
+ public void setPermittedWorkflowURIs(List<URI> permittedWorkflows) {
+ String[] pw = new String[permittedWorkflows.size()];
+ for (int i = 0; i < pw.length; i++)
+ pw[i] = permittedWorkflows.get(i).toString();
+ this.permittedWorkflows = pw;
+ }
+
+ @Override
+ public String getRegistryJar() {
+ return registryJar;
+ }
+
+ @Override
+ public void setRegistryJar(String registryJar) {
+ this.registryJar = registryJar;
+ }
+
+ @Override
+ public boolean getGenerateProvenance() {
+ return generateProvenance > 0;
+ }
+
+ @Override
+ public void setGenerateProvenance(boolean generateProvenance) {
+ this.generateProvenance = (generateProvenance ? 1 : 0);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
new file mode 100644
index 0000000..e563965
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/StreamLogger.java
@@ -0,0 +1,78 @@
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Thread.interrupted;
+import static org.apache.commons.logging.LogFactory.getLog;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.commons.logging.Log;
+
+abstract class StreamLogger {
+ protected final Log log;
+ private Thread t;
+ private InputStream in;
+
+ protected StreamLogger(final String name, InputStream is) {
+ log = getLog("Taverna.Server.LocalWorker." + name);
+ in = is;
+ t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(in))) {
+ String line;
+ while (!interrupted() && (line = br.readLine()) != null)
+ if (!line.isEmpty())
+ write(line);
+ } catch (IOException e) {
+ // Do nothing...
+ } catch (Exception e) {
+ log.warn("failure in reading from " + name, e);
+ }
+ }
+ }, name + ".StreamLogger");
+ t.setContextClassLoader(null);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ /**
+ * Write a line read from the subprocess to the log.
+ * <p>
+ * This needs to be implemented by subclasses in order for the log to be
+ * correctly written with the class name.
+ *
+ * @param msg
+ * The message to write. Guaranteed to have no newline characters
+ * in it and to be non-empty.
+ */
+ protected abstract void write(String msg);
+
+ public void stop() {
+ log.info("trying to close down " + t.getName());
+ t.interrupt();
+ try {
+ in.close();
+ } catch (IOException e) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
new file mode 100644
index 0000000..031ce34
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/localworker/package-info.java
@@ -0,0 +1,23 @@
+/*
+ */
+/**
+ * Implementation of a Taverna Server back-end that works by forking off
+ * workflow executors on the local system.
+ */
+package org.taverna.server.master.localworker;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
new file mode 100644
index 0000000..2a51496
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/EmailDispatcher.java
@@ -0,0 +1,126 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.mail.MailSender;
+import org.springframework.mail.SimpleMailMessage;
+import org.springframework.mail.javamail.JavaMailSender;
+
+/**
+ * How to send a plain text message by email to someone.
+ *
+ * @author Donal Fellows
+ */
+public class EmailDispatcher extends RateLimitedDispatcher {
+ @Override
+ public String getName() {
+ return "mailto";
+ }
+
+ /**
+ * @param from
+ * Email address that the notification is to come from.
+ */
+ @Required
+ public void setFrom(String from) {
+ this.from = valid(from, "");
+ }
+
+ /**
+ * @param host
+ * The outgoing SMTP server address.
+ */
+ @Required
+ public void setSmtpHost(String host) {
+ this.host = valid(host, "");
+ }
+
+ /**
+ * @param contentType
+ * The content type of the message to be sent. For example, "
+ * <tt>text/plain</tt>".
+ */
+ public void setMessageContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ /**
+ * @param sender
+ * the sender to set
+ */
+ public void setSender(MailSender sender) {
+ this.sender = sender;
+ }
+
+ private String from;
+ private String host;
+ private MailSender sender;
+ @SuppressWarnings("unused")
+ private String contentType = TEXT_PLAIN;
+
+ /**
+ * Try to perform the lookup of the email service. This is called during
+ * configuration so that any failure happens at a useful, predictable time.
+ */
+ @PostConstruct
+ public void tryLookup() {
+ if (!isAvailable()) {
+ log.warn("no mail support; disabling email dispatch");
+ sender = null;
+ return;
+ }
+ try {
+ if (sender instanceof JavaMailSender)
+ ((JavaMailSender) sender).createMimeMessage();
+ } catch (Throwable t) {
+ log.warn("sender having problems constructing messages; "
+ + "disabling...", t);
+ sender = null;
+ }
+ }
+
+ @Override
+ public void dispatch(String messageSubject, String messageContent, String to)
+ throws Exception {
+ // Simple checks for acceptability
+ if (!to.matches(".+@.+")) {
+ log.info("did not send email notification: improper email address \""
+ + to + "\"");
+ return;
+ }
+
+ SimpleMailMessage message = new SimpleMailMessage();
+ message.setFrom(from);
+ message.setTo(to.trim());
+ message.setSubject(messageSubject);
+ message.setText(messageContent);
+ sender.send(message);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return (host != null && !host.isEmpty() && sender != null
+ && from != null && !from.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
new file mode 100644
index 0000000..711f4e6
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/JabberDispatcher.java
@@ -0,0 +1,153 @@
+/*
+ */
+
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jivesoftware.smack.Chat;
+import org.jivesoftware.smack.ConnectionConfiguration;
+import org.jivesoftware.smack.MessageListener;
+import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.packet.Message;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * Send notifications by Jabber/XMPP.
+ *
+ * @author Donal Fellows
+ */
+public class JabberDispatcher implements MessageDispatcher {
+ @Override
+ public String getName() {
+ return "xmpp";
+ }
+
+ private Log log = LogFactory.getLog("Taverna.Server.Notification");
+ private XMPPConnection conn;
+ private String resource = "TavernaServer";
+ private String host = "";
+ private String user = "";
+ private String pass = "";
+
+ /**
+ * @param resource
+ * The XMPP resource to use when connecting the server. This
+ * defaults to "<tt>TavernaServer</tt>".
+ */
+ public void setResource(String resource) {
+ this.resource = resource;
+ }
+
+ /**
+ * @param service
+ * The XMPP service URL.
+ */
+ public void setHost(String service) {
+ if (service == null || service.trim().isEmpty()
+ || service.trim().startsWith("$"))
+ this.host = "";
+ else
+ this.host = service.trim();
+ }
+
+ /**
+ * @param user
+ * The user identity to use with the XMPP service.
+ */
+ public void setUsername(String user) {
+ if (user == null || user.trim().isEmpty()
+ || user.trim().startsWith("$"))
+ this.user = "";
+ else
+ this.user = user.trim();
+ }
+
+ /**
+ * @param pass
+ * The password to use with the XMPP service.
+ */
+ public void setPassword(String pass) {
+ if (pass == null || pass.trim().isEmpty()
+ || pass.trim().startsWith("$"))
+ this.pass = "";
+ else
+ this.pass = pass.trim();
+ }
+
+ @PostConstruct
+ void setup() {
+ try {
+ if (host.isEmpty() || user.isEmpty() || pass.isEmpty()) {
+ log.info("disabling XMPP support; incomplete configuration");
+ conn = null;
+ return;
+ }
+ ConnectionConfiguration cfg = new ConnectionConfiguration(host);
+ cfg.setSendPresence(false);
+ XMPPConnection c = new XMPPConnection(cfg);
+ c.connect();
+ c.login(user, pass, resource);
+ conn = c;
+ log.info("connected to XMPP service <" + host + "> as user <"
+ + user + ">");
+ } catch (Exception e) {
+ log.info("failed to connect to XMPP server", e);
+ }
+ }
+
+ @PreDestroy
+ public void close() {
+ if (conn != null)
+ conn.disconnect();
+ conn = null;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return conn != null;
+ }
+
+ @Override
+ public void dispatch(TavernaRun ignored, String messageSubject,
+ String messageContent, String targetParameter) throws Exception {
+ Chat chat = conn.getChatManager().createChat(targetParameter,
+ new DroppingListener());
+ Message m = new Message();
+ m.addBody(null, messageContent);
+ m.setSubject(messageSubject);
+ chat.sendMessage(m);
+ }
+
+ static class DroppingListener implements MessageListener {
+ private Log log = LogFactory
+ .getLog("Taverna.Server.Notification.Jabber");
+
+ @Override
+ public void processMessage(Chat chat, Message message) {
+ if (log.isDebugEnabled())
+ log.debug("unexpectedly received XMPP message from <"
+ + message.getFrom() + ">; ignoring");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
new file mode 100644
index 0000000..067d154
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/NotificationEngine.java
@@ -0,0 +1,158 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * A common object for handling dispatch of event-driven messages.
+ *
+ * @author Donal Fellows
+ */
+public class NotificationEngine {
+ private Log log = LogFactory.getLog("Taverna.Server.Notification");
+ private Map<String, MessageDispatcher> dispatchers;
+ private List<MessageDispatcher> universalDispatchers;
+
+ /**
+ * @param dispatchers
+ * The various dispatchers we want to install.
+ */
+ @Required
+ public void setDispatchers(List<MessageDispatcher> dispatchers) {
+ this.dispatchers = new HashMap<>();
+ for (MessageDispatcher d : dispatchers)
+ this.dispatchers.put(d.getName(), d);
+ }
+
+ /**
+ * @param dispatcherList
+ * A list of dispatch objects to always dispatch to.
+ */
+ @Required
+ public void setUniversalDispatchers(List<MessageDispatcher> dispatcherList) {
+ this.universalDispatchers = dispatcherList;
+ }
+
+ private void dispatchToChosenTarget(TavernaRun originator, String scheme,
+ String target, Message message) throws Exception {
+ try {
+ MessageDispatcher d = dispatchers.get(scheme);
+ if (d != null && d.isAvailable())
+ d.dispatch(originator, message.getTitle(scheme),
+ message.getContent(scheme), target);
+ else
+ log.warn("no such notification dispatcher for " + scheme);
+ } catch (URISyntaxException e) {
+ // See if *someone* will handle the message
+ Exception e2 = null;
+ for (MessageDispatcher d : dispatchers.values())
+ try {
+ if (d.isAvailable()) {
+ d.dispatch(originator, message.getTitle(d.getName()),
+ message.getContent(d.getName()), scheme + ":"
+ + target);
+ return;
+ }
+ } catch (Exception ex) {
+ if (log.isDebugEnabled())
+ log.debug("failed in pseudo-directed dispatch of "
+ + scheme + ":" + target, ex);
+ e2 = ex;
+ }
+ if (e2 != null)
+ throw e2;
+ }
+ }
+
+ private void dispatchUniversally(TavernaRun originator, Message message)
+ throws Exception {
+ for (MessageDispatcher d : universalDispatchers)
+ try {
+ if (d.isAvailable())
+ d.dispatch(originator, message.getTitle(d.getName()),
+ message.getContent(d.getName()), null);
+ } catch (Exception e) {
+ log.warn("problem in universal dispatcher", e);
+ }
+ }
+
+ /**
+ * Dispatch a message over the notification fabric.
+ *
+ * @param originator
+ * What workflow run was the source of this message?
+ * @param destination
+ * Where the message should get delivered to. The correct format
+ * of this is either as a URI of some form (where the scheme
+ * determines the dispatcher) or as an invalid URI in which case
+ * it is just tried against the possibilities to see if any
+ * succeeds.
+ * @param subject
+ * The subject line of the message.
+ * @param message
+ * The plain text body of the message.
+ * @throws Exception
+ * If anything goes wrong with the dispatch process.
+ */
+ public void dispatchMessage(TavernaRun originator, String destination,
+ Message message) throws Exception {
+ if (destination != null && !destination.trim().isEmpty()) {
+ try {
+ URI toURI = new URI(destination.trim());
+ dispatchToChosenTarget(originator, toURI.getScheme(),
+ toURI.getSchemeSpecificPart(), message);
+ } catch (URISyntaxException e) {
+ // Ignore
+ }
+ }
+ dispatchUniversally(originator, message);
+ }
+
+ /**
+ * @return The message dispatchers that are actually available (i.e., not
+ * disabled by configuration somewhere).
+ */
+ public List<String> listAvailableDispatchers() {
+ ArrayList<String> result = new ArrayList<>();
+ for (Map.Entry<String, MessageDispatcher> entry : dispatchers
+ .entrySet()) {
+ if (entry.getValue().isAvailable())
+ result.add(entry.getKey());
+ }
+ return result;
+ }
+
+ public interface Message {
+ String getContent(String type);
+
+ String getTitle(String type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
new file mode 100644
index 0000000..a41e23b
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/RateLimitedDispatcher.java
@@ -0,0 +1,102 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+
+/**
+ * Rate-limiting support. Some message fabrics simply should not be used to send
+ * a lot of messages.
+ *
+ * @author Donal Fellows
+ */
+public abstract class RateLimitedDispatcher implements MessageDispatcher {
+ /** Pre-configured logger. */
+ protected Log log = LogFactory.getLog("Taverna.Server.Notification");
+ private int cooldownSeconds;
+ private Map<String, DateTime> lastSend = new HashMap<>();
+
+ String valid(String value, String def) {
+ if (value == null || value.trim().isEmpty()
+ || value.trim().startsWith("${"))
+ return def;
+ else
+ return value.trim();
+ }
+
+ /**
+ * Set how long must elapse between updates to the status of any particular
+ * user. Calls before that time are just silently dropped.
+ *
+ * @param cooldownSeconds
+ * Time to elapse, in seconds.
+ */
+ public void setCooldownSeconds(int cooldownSeconds) {
+ this.cooldownSeconds = cooldownSeconds;
+ }
+
+ /**
+ * Test whether the rate limiter allows the given user to send a message.
+ *
+ * @param who
+ * Who wants to send the message?
+ * @return <tt>true</tt> iff they are permitted.
+ */
+ protected boolean isSendAllowed(String who) {
+ DateTime now = new DateTime();
+ synchronized (lastSend) {
+ DateTime last = lastSend.get(who);
+ if (last != null) {
+ if (!now.isAfter(last.plusSeconds(cooldownSeconds)))
+ return false;
+ }
+ lastSend.put(who, now);
+ }
+ return true;
+ }
+
+ @Override
+ public void dispatch(TavernaRun ignored, String messageSubject,
+ String messageContent, String target) throws Exception {
+ if (isSendAllowed(target))
+ dispatch(messageSubject, messageContent, target);
+ }
+
+ /**
+ * Dispatch a message to a recipient that doesn't care what produced it.
+ *
+ * @param messageSubject
+ * The subject of the message to send.
+ * @param messageContent
+ * The plain-text content of the message to send.
+ * @param target
+ * A description of where it is to go.
+ * @throws Exception
+ * If anything goes wrong.
+ */
+ public abstract void dispatch(String messageSubject, String messageContent,
+ String target) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
new file mode 100644
index 0000000..559f111
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/SMSDispatcher.java
@@ -0,0 +1,171 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.defaults.Default.SMS_GATEWAY_URL;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.springframework.beans.factory.annotation.Required;
+
+/**
+ * Dispatch termination messages via SMS.
+ *
+ * @author Donal Fellows
+ */
+public class SMSDispatcher extends RateLimitedDispatcher {
+ @Override
+ public String getName() {
+ return "sms";
+ }
+
+ private CloseableHttpClient client;
+ private URI service;
+ private String user = "", pass = "";
+ private String usernameField = "username", passwordField = "password",
+ destinationField = "to", messageField = "text";
+
+ /**
+ * @param usernameField
+ * The name of the field that conveys the sending username; this
+ * is the <i>server</i>'s identity.
+ */
+ @Required
+ public void setUsernameField(String usernameField) {
+ this.usernameField = usernameField;
+ }
+
+ /**
+ * @param passwordField
+ * The field holding the password to authenticate the server to
+ * the SMS gateway.
+ */
+ @Required
+ public void setPasswordField(String passwordField) {
+ this.passwordField = passwordField;
+ }
+
+ /**
+ * @param destinationField
+ * The field holding the number to send the SMS to.
+ */
+ @Required
+ public void setDestinationField(String destinationField) {
+ this.destinationField = destinationField;
+ }
+
+ /**
+ * @param messageField
+ * The field holding the plain-text message to send.
+ */
+ @Required
+ public void setMessageField(String messageField) {
+ this.messageField = messageField;
+ }
+
+ public void setService(String serviceURL) {
+ String s = valid(serviceURL, "");
+ if (s.isEmpty()) {
+ log.warn("did not get sms.service from servlet config; using default ("
+ + SMS_GATEWAY_URL + ")");
+ s = SMS_GATEWAY_URL;
+ }
+ try {
+ service = new URI(s);
+ } catch (URISyntaxException e) {
+ service = null;
+ }
+ }
+
+ public void setUser(String user) {
+ this.user = valid(user, "");
+ }
+
+ public void setPassword(String pass) {
+ this.pass = valid(pass, "");
+ }
+
+ @PostConstruct
+ void init() {
+ client = HttpClientBuilder.create().build();
+ }
+
+ @PreDestroy
+ void close() throws IOException {
+ try {
+ if (client != null)
+ client.close();
+ } finally {
+ client = null;
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return service != null && !user.isEmpty() && !pass.isEmpty();
+ }
+
+ @Override
+ public void dispatch(String messageSubject, String messageContent,
+ String targetParameter) throws Exception {
+ // Sanity check
+ if (!targetParameter.matches("[^0-9]+"))
+ throw new Exception("invalid phone number");
+
+ if (!isSendAllowed("anyone"))
+ return;
+
+ // Build the message to send
+ List<NameValuePair> params = new ArrayList<>();
+ params.add(new BasicNameValuePair(usernameField, user));
+ params.add(new BasicNameValuePair(passwordField, pass));
+ params.add(new BasicNameValuePair(destinationField, targetParameter));
+ params.add(new BasicNameValuePair(messageField, messageContent));
+
+ // Send the message
+ HttpPost post = new HttpPost(service);
+ post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
+ HttpResponse response = client.execute(post);
+
+ // Log the response
+ HttpEntity entity = response.getEntity();
+ if (entity != null)
+ try (BufferedReader e = new BufferedReader(new InputStreamReader(
+ entity.getContent()))) {
+ log.info(e.readLine());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
new file mode 100644
index 0000000..a0269a5
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/TwitterDispatcher.java
@@ -0,0 +1,145 @@
+/*
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Properties;
+
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.conf.Configuration;
+import twitter4j.conf.PropertyConfiguration;
+import twitter4j.auth.AuthorizationFactory;
+
+/**
+ * Super simple-minded twitter dispatcher. You need to tell it your consumer key
+ * and secret as part of the connection parameters, for example via a dispatcher
+ * URN of "<tt>twitter:fred:bloggs</tt>" where <tt>fred</tt> is the key and
+ * <tt>bloggs</tt> is the secret.
+ *
+ * @author Donal Fellows
+ */
+public class TwitterDispatcher extends RateLimitedDispatcher {
+ @Override
+ public String getName() {
+ return "twitter";
+ }
+
+ public static final int MAX_MESSAGE_LENGTH = 140;
+ public static final char ELLIPSIS = '\u2026';
+
+ private String token = "";
+ private String secret = "";
+
+ public void setAccessToken(String token) {
+ this.token = valid(token, "");
+ }
+
+ public void setAccessSecret(String secret) {
+ this.secret = valid(secret, "");
+ }
+
+ private Properties getConfig() throws NotConfiguredException {
+ if (token.isEmpty() || secret.isEmpty())
+ throw new NotConfiguredException();
+ Properties p = new Properties();
+ p.setProperty(ACCESS_TOKEN_PROP, token);
+ p.setProperty(ACCESS_SECRET_PROP, secret);
+ return p;
+ }
+
+ public static final String ACCESS_TOKEN_PROP = "oauth.accessToken";
+ public static final String ACCESS_SECRET_PROP = "oauth.accessTokenSecret";
+
+ private Twitter getTwitter(String key, String secret) throws Exception {
+ if (key.isEmpty() || secret.isEmpty())
+ throw new NoCredentialsException();
+
+ Properties p = getConfig();
+ p.setProperty("oauth.consumerKey", key);
+ p.setProperty("oauth.consumerSecret", secret);
+
+ Configuration config = new PropertyConfiguration(p);
+ TwitterFactory factory = new TwitterFactory(config);
+ Twitter t = factory.getInstance(AuthorizationFactory
+ .getInstance(config));
+ // Verify that we can connect!
+ t.getOAuthAccessToken();
+ return t;
+ }
+
+ // TODO: Get secret from credential manager
+ @Override
+ public void dispatch(String messageSubject, String messageContent,
+ String targetParameter) throws Exception {
+ // messageSubject ignored
+ String[] target = targetParameter.split(":", 2);
+ if (target == null || target.length != 2)
+ throw new Exception("missing consumer key or secret");
+ String who = target[0];
+ if (!isSendAllowed(who))
+ return;
+ Twitter twitter = getTwitter(who, target[1]);
+
+ if (messageContent.length() > MAX_MESSAGE_LENGTH)
+ messageContent = messageContent
+ .substring(0, MAX_MESSAGE_LENGTH - 1) + ELLIPSIS;
+ twitter.updateStatus(messageContent);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ try {
+ // Try to create the configuration and push it through as far as
+ // confirming that we can build an access object (even if it isn't
+ // bound to a user)
+ new TwitterFactory(new PropertyConfiguration(getConfig()))
+ .getInstance();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * Indicates that the dispatcher has not been configured with service
+ * credentials.
+ *
+ * @author Donal Fellows
+ */
+ @SuppressWarnings("serial")
+ public static class NotConfiguredException extends Exception {
+ NotConfiguredException() {
+ super("not configured with xAuth key and secret; "
+ + "dispatch not possible");
+ }
+ }
+
+ /**
+ * Indicates that the user did not supply their credentials.
+ *
+ * @author Donal Fellows
+ */
+ @SuppressWarnings("serial")
+ public static class NoCredentialsException extends Exception {
+ NoCredentialsException() {
+ super("no consumer key and secret present; "
+ + "dispatch not possible");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
new file mode 100644
index 0000000..eda6d9d
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/AtomFeed.java
@@ -0,0 +1,147 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.String.format;
+import static java.util.UUID.randomUUID;
+import static javax.ws.rs.core.UriBuilder.fromUri;
+import static org.taverna.server.master.common.Roles.USER;
+import static org.taverna.server.master.common.Uri.secure;
+
+import java.net.URI;
+import java.util.Date;
+
+import javax.annotation.security.RolesAllowed;
+import javax.servlet.ServletContext;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Entry;
+import org.apache.abdera.model.Feed;
+import org.springframework.beans.factory.annotation.Required;
+import org.springframework.web.context.ServletContextAware;
+import org.taverna.server.master.TavernaServerSupport;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.UriBuilderFactory;
+import org.taverna.server.master.rest.TavernaServerREST.EventFeed;
+import org.taverna.server.master.utils.InvocationCounter.CallCounted;
+
+/**
+ * Simple REST handler that allows an Atom feed to be served up of events
+ * generated by workflow runs.
+ *
+ * @author Donal Fellows
+ */
+public class AtomFeed implements EventFeed, UriBuilderFactory,
+ ServletContextAware {
+ /**
+ * The name of a parameter that states what address we should claim that the
+ * feed's internally-generated URIs are relative to. If not set, a default
+ * will be guessed.
+ */
+ public static final String PREFERRED_URI_PARAM = "taverna.preferredUserUri";
+ private EventDAO eventSource;
+ private TavernaServerSupport support;
+ private URI baseURI;
+ private Abdera abdera;
+ private String feedLanguage = "en";
+ private String uuid = randomUUID().toString();
+
+ @Required
+ public void setEventSource(EventDAO eventSource) {
+ this.eventSource = eventSource;
+ }
+
+ @Required
+ public void setSupport(TavernaServerSupport support) {
+ this.support = support;
+ }
+
+ public void setFeedLanguage(String language) {
+ this.feedLanguage = language;
+ }
+
+ public String getFeedLanguage() {
+ return feedLanguage;
+ }
+
+ @Required
+ public void setAbdera(Abdera abdera) {
+ this.abdera = abdera;
+ }
+
+ @Override
+ @CallCounted
+ @RolesAllowed(USER)
+ public Feed getFeed(UriInfo ui) {
+ Feed feed = abdera.getFactory().newFeed();
+ feed.setTitle("events relating to workflow runs").setLanguage(
+ feedLanguage);
+ String user = support.getPrincipal().toString()
+ .replaceAll("[^A-Za-z0-9]+", "");
+ feed.setId(format("urn:taverna-server:%s:%s", uuid, user));
+ org.joda.time.DateTime modification = null;
+ for (Event e : eventSource.getEvents(support.getPrincipal())) {
+ if (modification == null || e.getPublished().isAfter(modification))
+ modification = e.getPublished();
+ feed.addEntry(e.getEntry(abdera, feedLanguage));
+ }
+ if (modification == null)
+ feed.setUpdated(new Date());
+ else
+ feed.setUpdated(modification.toDate());
+ feed.addLink(ui.getAbsolutePath().toASCIIString(), "self");
+ return feed;
+ }
+
+ @Override
+ @CallCounted
+ @RolesAllowed(USER)
+ public Entry getEvent(String id) {
+ return eventSource.getEvent(support.getPrincipal(), id).getEntry(
+ abdera, feedLanguage);
+ }
+
+ @Override
+ public UriBuilder getRunUriBuilder(TavernaRun run) {
+ return secure(fromUri(getBaseUriBuilder().path("runs/{uuid}").build(
+ run.getId())));
+ }
+
+ @Override
+ public UriBuilder getBaseUriBuilder() {
+ return secure(fromUri(baseURI));
+ }
+
+ @Override
+ public String resolve(String uri) {
+ if (uri == null)
+ return null;
+ return secure(baseURI, uri).toString();
+ }
+
+ @Override
+ public void setServletContext(ServletContext servletContext) {
+ String base = servletContext.getInitParameter(PREFERRED_URI_PARAM);
+ if (base == null)
+ base = servletContext.getContextPath() + "/rest";
+ baseURI = URI.create(base);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
new file mode 100644
index 0000000..f1a9d62
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/Event.java
@@ -0,0 +1,123 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.util.UUID.randomUUID;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Date;
+
+import javax.jdo.annotations.Column;
+import javax.jdo.annotations.Index;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.Queries;
+import javax.jdo.annotations.Query;
+
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Entry;
+import org.joda.time.DateTime;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * Parent class of all events that may appear on the feed for a workflow run.
+ *
+ * @author Donal Fellows
+ */
+@SuppressWarnings("serial")
+@PersistenceCapable(schema = "ATOM", table = "EVENTS")
+@Queries({
+ @Query(name = "eventsForUser", language = "SQL", value = "SELECT id FROM ATOM.EVENTS WHERE owner = ? ORDER BY published DESC", resultClass = String.class),
+ @Query(name = "eventForUserAndId", language = "SQL", value = "SELECT id FROM ATOM.EVENTS WHERE owner = ? AND id = ?", resultClass = String.class),
+ @Query(name = "eventsFromBefore", language = "SQL", value = "SELECT id FROM ATOM.EVENTS where published < ?", resultClass = String.class) })
+public class Event implements Serializable {
+ @Persistent(primaryKey = "true")
+ @Column(length = 48)
+ private String id;
+ @Persistent
+ private String owner;
+ @Persistent
+ @Index
+ private Date published;
+ @Persistent
+ private String message;
+ @Persistent
+ private String title;
+ @Persistent
+ private String link;
+
+ Event() {
+ }
+
+ /**
+ * Initialise the identity of this event and the point at which it was
+ * published.
+ *
+ * @param idPrefix
+ * A prefix for the identity of this event.
+ * @param owner
+ * Who is the owner of this event.
+ */
+ Event(String idPrefix, URI workflowLink, UsernamePrincipal owner,
+ String title, String message) {
+ id = idPrefix + "." + randomUUID().toString();
+ published = new Date();
+ this.owner = owner.getName();
+ this.title = title;
+ this.message = message;
+ this.link = workflowLink.toASCIIString();
+ }
+
+ public final String getId() {
+ return id;
+ }
+
+ public final String getOwner() {
+ return owner;
+ }
+
+ public final DateTime getPublished() {
+ return new DateTime(published);
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public String getLink() {
+ return link;
+ }
+
+ public Entry getEntry(Abdera abdera, String language) {
+ Entry entry = abdera.getFactory().newEntry();
+ entry.setId(id);
+ entry.setPublished(published);
+ entry.addAuthor(owner).setLanguage(language);
+ entry.setUpdated(published);
+ entry.setTitle(title).setLanguage(language);
+ entry.addLink(link, "related").setTitle("workflow run");
+ entry.setContent(message).setLanguage(language);
+ return entry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
new file mode 100644
index 0000000..8bec456
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/EventDAO.java
@@ -0,0 +1,230 @@
+/*
+ */
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.lang.Thread.interrupted;
+import static java.lang.Thread.sleep;
+import static java.util.Arrays.asList;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
+import javax.jdo.annotations.PersistenceAware;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.springframework.beans.factory.annotation.Required;
+import org.taverna.server.master.interfaces.MessageDispatcher;
+import org.taverna.server.master.interfaces.TavernaRun;
+import org.taverna.server.master.interfaces.UriBuilderFactory;
+import org.taverna.server.master.utils.JDOSupport;
+import org.taverna.server.master.utils.UsernamePrincipal;
+
+/**
+ * The database interface that supports the event feed.
+ *
+ * @author Donal Fellows
+ */
+@PersistenceAware
+public class EventDAO extends JDOSupport<Event> implements MessageDispatcher {
+ public EventDAO() {
+ super(Event.class);
+ }
+
+ @Override
+ public String getName() {
+ return "atom";
+ }
+
+ private Log log = LogFactory.getLog("Taverna.Server.Atom");
+ private UriBuilderFactory ubf;
+ private int expiryAgeDays;
+
+ @Required
+ public void setExpiryAgeDays(int expiryAgeDays) {
+ this.expiryAgeDays = expiryAgeDays;
+ }
+
+ @Required
+ public void setUriBuilderFactory(UriBuilderFactory ubf) {
+ this.ubf = ubf;
+ }
+
+ /**
+ * Get the given user's list of events.
+ *
+ * @param user
+ * The identity of the user to get the events for.
+ * @return A copy of the list of events currently known about.
+ */
+ @Nonnull
+ @WithinSingleTransaction
+ public List<Event> getEvents(@Nonnull UsernamePrincipal user) {
+ List<String> ids = eventsForUser(user);
+ if (log.isDebugEnabled())
+ log.debug("found " + ids.size() + " events for user " + user);
+
+ List<Event> result = new ArrayList<>();
+ for (String id : ids) {
+ Event event = getById(id);
+ result.add(detach(event));
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> eventsForUser(UsernamePrincipal user) {
+ return (List<String>) namedQuery("eventsForUser").execute(
+ user.getName());
+ }
+
+ /**
+ * Get a particular event.
+ *
+ * @param user
+ * The identity of the user to get the event for.
+ * @param id
+ * The handle of the event to look up.
+ * @return A copy of the event.
+ */
+ @Nonnull
+ @WithinSingleTransaction
+ public Event getEvent(@Nonnull UsernamePrincipal user, @Nonnull String id) {
+ List<String> ids = eventsForUserAndId(user, id);
+ if (log.isDebugEnabled())
+ log.debug("found " + ids.size() + " events for user " + user
+ + " with id = " + id);
+
+ if (ids.size() != 1)
+ throw new IllegalArgumentException("no such id");
+ return detach(getById(ids.get(0)));
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> eventsForUserAndId(UsernamePrincipal user, String id) {
+ return (List<String>) namedQuery("eventForUserAndId").execute(
+ user.getName(), id);
+ }
+
+ /**
+ * Delete a particular event.
+ *
+ * @param id
+ * The identifier of the event to delete.
+ */
+ @WithinSingleTransaction
+ public void deleteEventById(@Nonnull String id) {
+ delete(getById(id));
+ }
+
+ /**
+ * Delete all events that have expired.
+ */
+ @WithinSingleTransaction
+ public void deleteExpiredEvents() {
+ Date death = new DateTime().plusDays(-expiryAgeDays).toDate();
+ death = new Timestamp(death.getTime()); // UGLY SQL HACK
+
+ List<String> ids = eventsFromBefore(death);
+ if (log.isDebugEnabled() && !ids.isEmpty())
+ log.debug("found " + ids.size()
+ + " events to be squelched (older than " + death + ")");
+
+ for (String id : ids)
+ delete(getById(id));
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<String> eventsFromBefore(Date death) {
+ return (List<String>) namedQuery("eventsFromBefore").execute(death);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ private BlockingQueue<Event> insertQueue = new ArrayBlockingQueue<>(16);
+
+ @Override
+ public void dispatch(TavernaRun originator, String messageSubject,
+ String messageContent, String targetParameter) throws Exception {
+ insertQueue.put(new Event("finish", ubf.getRunUriBuilder(originator)
+ .build(), originator.getSecurityContext().getOwner(),
+ messageSubject, messageContent));
+ }
+
+ public void started(TavernaRun originator, String messageSubject,
+ String messageContent) throws InterruptedException {
+ insertQueue.put(new Event("start", ubf.getRunUriBuilder(originator)
+ .build(), originator.getSecurityContext().getOwner(),
+ messageSubject, messageContent));
+ }
+
+ private Thread eventDaemon;
+ private boolean shuttingDown = false;
+
+ @Required
+ public void setSelf(final EventDAO dao) {
+ eventDaemon = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (!shuttingDown && !interrupted()) {
+ transferEvents(dao, new ArrayList<Event>(
+ asList(insertQueue.take())));
+ sleep(5000);
+ }
+ } catch (InterruptedException e) {
+ } finally {
+ transferEvents(dao, new ArrayList<Event>());
+ }
+ }
+ }, "ATOM event daemon");
+ eventDaemon.setContextClassLoader(null);
+ eventDaemon.setDaemon(true);
+ eventDaemon.start();
+ }
+
+ private void transferEvents(EventDAO dao, List<Event> e) {
+ insertQueue.drainTo(e);
+ dao.storeEvents(e);
+ }
+
+ @PreDestroy
+ void stopDaemon() {
+ shuttingDown = true;
+ if (eventDaemon != null)
+ eventDaemon.interrupt();
+ }
+
+ @WithinSingleTransaction
+ protected void storeEvents(List<Event> events) {
+ for (Event e : events)
+ persist(e);
+ log.info("stored " + events.size() + " notification events");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
new file mode 100644
index 0000000..0a1a52f
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/atom/package-info.java
@@ -0,0 +1,42 @@
+/*
+ */
+/**
+ * This package contains the Atom feed implementation within Taverna Server.
+ * @author Donal Fellows
+ */
+@XmlSchema(namespace = FEED, elementFormDefault = QUALIFIED, attributeFormDefault = QUALIFIED, xmlns = {
+ @XmlNs(prefix = "xlink", namespaceURI = XLINK),
+ @XmlNs(prefix = "ts", namespaceURI = SERVER),
+ @XmlNs(prefix = "ts-rest", namespaceURI = SERVER_REST),
+ @XmlNs(prefix = "ts-soap", namespaceURI = SERVER_SOAP),
+ @XmlNs(prefix = "feed", namespaceURI = FEED),
+ @XmlNs(prefix = "admin", namespaceURI = ADMIN) })
+package org.taverna.server.master.notification.atom;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.xml.bind.annotation.XmlNsForm.QUALIFIED;
+import static org.taverna.server.master.common.Namespaces.ADMIN;
+import static org.taverna.server.master.common.Namespaces.FEED;
+import static org.taverna.server.master.common.Namespaces.SERVER;
+import static org.taverna.server.master.common.Namespaces.SERVER_REST;
+import static org.taverna.server.master.common.Namespaces.SERVER_SOAP;
+import static org.taverna.server.master.common.Namespaces.XLINK;
+
+import javax.xml.bind.annotation.XmlNs;
+import javax.xml.bind.annotation.XmlSchema;
+
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
new file mode 100644
index 0000000..43335cf
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/notification/package-info.java
@@ -0,0 +1,23 @@
+/*
+ */
+/**
+ * The notification fabric and implementations of notification dispatchers
+ * that support subscription.
+ */
+package org.taverna.server.master.notification;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
new file mode 100644
index 0000000..d912ac8
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/package-info.java
@@ -0,0 +1,24 @@
+/*
+ */
+/**
+ * The core of the implementation of Taverna Server, including the
+ * implementations of the SOAP and REST interfaces.
+ */
+package org.taverna.server.master;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
new file mode 100644
index 0000000..d9aef82
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/ContentTypes.java
@@ -0,0 +1,41 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.MediaType.APPLICATION_ATOM_XML;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM;
+import static javax.ws.rs.core.MediaType.APPLICATION_XML;
+import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
+
+/**
+ * Miscellaneous content type constants.
+ *
+ * @author Donal Fellows
+ */
+interface ContentTypes {
+ static final String URI_LIST = "text/uri-list";
+ static final String ZIP = "application/zip";
+ static final String TEXT = TEXT_PLAIN;
+ static final String XML = APPLICATION_XML;
+ static final String JSON = APPLICATION_JSON;
+ static final String BYTES = APPLICATION_OCTET_STREAM;
+ static final String ATOM = APPLICATION_ATOM_XML;
+ static final String ROBUNDLE = "application/vnd.wf4ever.robundle+zip";
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
new file mode 100644
index 0000000..42d4b0e
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/DirectoryContents.java
@@ -0,0 +1,74 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.taverna.server.master.common.Uri.secure;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlSeeAlso;
+import javax.xml.bind.annotation.XmlType;
+
+import org.taverna.server.master.common.DirEntryReference;
+import org.taverna.server.master.interfaces.DirectoryEntry;
+
+/**
+ * The result of a RESTful operation to list the contents of a directory. Done
+ * with JAXB.
+ *
+ * @author Donal Fellows
+ */
+@XmlRootElement
+@XmlType(name = "DirectoryContents")
+@XmlSeeAlso(MakeOrUpdateDirEntry.class)
+public class DirectoryContents {
+ /**
+ * The contents of the directory.
+ */
+ @XmlElementRef
+ public List<DirEntryReference> contents;
+
+ /**
+ * Make an empty directory description. Required for JAXB.
+ */
+ public DirectoryContents() {
+ contents = new ArrayList<>();
+ }
+
+ /**
+ * Make a directory description.
+ *
+ * @param ui
+ * The factory for URIs.
+ * @param collection
+ * The real directory contents that we are to describe.
+ */
+ public DirectoryContents(UriInfo ui, Collection<DirectoryEntry> collection) {
+ contents = new ArrayList<>();
+ UriBuilder ub = secure(ui).path("{filename}");
+ for (DirectoryEntry e : collection)
+ contents.add(DirEntryReference.newInstance(ub, e));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-server/blob/00397eff/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
----------------------------------------------------------------------
diff --git a/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
new file mode 100644
index 0000000..6b7aaff
--- /dev/null
+++ b/taverna-server-webapp/src/main/java/org/apache/taverna/server/master/rest/FileSegment.java
@@ -0,0 +1,91 @@
+/*
+ */
+package org.taverna.server.master.rest;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static javax.ws.rs.core.Response.ok;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.taverna.server.master.exceptions.FilesystemAccessException;
+import org.taverna.server.master.interfaces.File;
+
+/**
+ * Representation of a segment of a file to be read by JAX-RS.
+ *
+ * @author Donal Fellows
+ */
+public class FileSegment {
+ /** The file to read a segment of. */
+ public final File file;
+ /** The offset of the first byte of the segment to read. */
+ public Integer from;
+ /** The offset of the first byte after the segment to read. */
+ public Integer to;
+
+ /**
+ * Parse the HTTP Range header and determine what exact range of the file to
+ * read.
+ *
+ * @param f
+ * The file this refers to
+ * @param range
+ * The content of the Range header.
+ * @throws FilesystemAccessException
+ * If we can't determine the length of the file (shouldn't
+ * happen).
+ */
+ public FileSegment(File f, String range) throws FilesystemAccessException {
+ file = f;
+ Matcher m = Pattern.compile("^\\s*bytes=(\\d*)-(\\d*)\\s*$").matcher(
+ range);
+ if (m.matches()) {
+ if (!m.group(1).isEmpty())
+ from = Integer.valueOf(m.group(1));
+ if (!m.group(2).isEmpty())
+ to = Integer.valueOf(m.group(2)) + 1;
+ int size = (int) f.getSize();
+ if (from == null) {
+ from = size - to;
+ to = size;
+ } else if (to == null)
+ to = size;
+ else if (to > size)
+ to = size;
+ }
+ }
+
+ /**
+ * Convert to a response, as per RFC 2616.
+ *
+ * @param type
+ * The expected type of the data.
+ * @return A JAX-RS response.
+ */
+ public Response toResponse(MediaType type) {
+ if (from == null && to == null)
+ return ok(file).type(type).build();
+ if (from >= to)
+ return ok("Requested range not satisfiable").status(416).build();
+ return ok(this).status(206).type(type).build();
+ }
+}
\ No newline at end of file