You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/03/08 22:46:59 UTC
svn commit: r1079558 - in /oodt/branches/wengine-branch/wengine/src/main:
java/org/apache/oodt/cas/workflow/engine/
java/org/apache/oodt/cas/workflow/instance/
java/org/apache/oodt/cas/workflow/processor/
java/org/apache/oodt/cas/workflow/server/channe...
Author: bfoster
Date: Tue Mar 8 21:46:58 2011
New Revision: 1079558
URL: http://svn.apache.org/viewvc?rev=1079558&view=rev
Log:
- improvements to ConnectWorkflowTaskInstance
- additional methods added to WorkflowEngine
--------------------
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java Tue Mar 8 21:46:58 2011
@@ -97,14 +97,20 @@ public interface WorkflowEngine {
public ProcessorInfo getProcessorInfo(String instanceId, String modelId) throws EngineException;
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException;
+
public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws EngineException;
public void updateInstanceMetadata(String jobId, Metadata metadata) throws EngineException;
public void updateWorkflowAndInstance(String instanceId, String modelId, WorkflowState state, Metadata metadata, String jobId, Metadata instanceMetadata) throws EngineException;
+ public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException;
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws EngineException;
+ public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException;
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException;
public Metadata getWorkflowMetadata(String instanceId) throws EngineException;
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java Tue Mar 8 21:46:58 2011
@@ -238,6 +238,14 @@ public class WorkflowEngineClient implem
}
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException {
+ try {
+ this.client.setWorkflowState(instanceId, state);
+ }catch (Exception e) {
+ throw new EngineException("Failed to update workflow state : " + e.getMessage(), e);
+ }
+ }
+
public void setWorkflowState(String instanceId,
String modelId, WorkflowState state) throws EngineException {
try {
@@ -247,6 +255,14 @@ public class WorkflowEngineClient implem
}
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException {
+ try {
+ this.client.setWorkflowPriority(instanceId, priority);
+ }catch (Exception e) {
+ throw new EngineException("Failed to update workflow priority : " + e.getMessage(), e);
+ }
+ }
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException {
try {
this.client.setWorkflowPriority(instanceId, modelId, priority);
@@ -307,6 +323,14 @@ public class WorkflowEngineClient implem
}
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException {
+ try {
+ this.client.updateWorkflowMetadata(instanceId, metadata);
+ }catch (Exception e) {
+ throw new EngineException(e);
+ }
+ }
+
public void updateWorkflowMetadata(String instanceId,
String modelId, Metadata metadata) throws EngineException {
try {
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java Tue Mar 8 21:46:58 2011
@@ -259,10 +259,18 @@ public class WorkflowEngineLocal impleme
return WorkflowUtils.findProcessor(this.queueManager.getWorkflowProcessor(instanceId), modelId).getStub();
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException {
+ this.queueManager.setState(instanceId, null, state);
+ }
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws EngineException {
this.queueManager.setState(instanceId, modelId, state);
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException {
+ this.queueManager.setPriority(instanceId, null, priority);
+ }
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException {
this.queueManager.setPriority(instanceId, modelId, priority);
}
@@ -356,6 +364,10 @@ public class WorkflowEngineLocal impleme
}
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException {
+ this.queueManager.setMetadata(instanceId, null, metadata);
+ }
+
public void updateWorkflowMetadata(String instanceId,
String modelId, Metadata metadata) throws EngineException {
this.queueManager.setMetadata(instanceId, modelId, metadata);
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java Tue Mar 8 21:46:58 2011
@@ -1,16 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.oodt.cas.workflow.instance;
+//JDK imports
+import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
+//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.engine.WorkflowEngineClient;
import org.apache.oodt.cas.workflow.metadata.ControlMetadata;
import org.apache.oodt.cas.workflow.processor.WorkflowProcessor;
import org.apache.oodt.cas.workflow.state.WorkflowState;
import org.apache.oodt.cas.workflow.state.done.FailureState;
+import org.apache.oodt.cas.workflow.state.done.StoppedState;
import org.apache.oodt.cas.workflow.state.done.SuccessState;
import org.apache.oodt.cas.workflow.state.results.ResultsBailState;
import org.apache.oodt.cas.workflow.state.results.ResultsFailureState;
@@ -18,6 +38,15 @@ import org.apache.oodt.cas.workflow.stat
import org.apache.oodt.cas.workflow.state.results.ResultsSuccessState;
import org.apache.oodt.cas.workflow.util.WorkflowUtils;
+/**
+ *
+ * @author bfoster
+ * @version $Revision$
+ *
+ * <p>
+ * Connect Workflows Spawn/Wait Task Instance
+ * </p>.
+ */
public class WorkflowConnectTaskInstance extends TaskInstance {
private static final Logger LOG = Logger.getLogger(WorkflowConnectTaskInstance.class.getName());
@@ -26,9 +55,12 @@ public class WorkflowConnectTaskInstance
public static final String N_MET_MOD_CLASS = "WorkflowConnect/NMetadataModifier/Class";
public static final String SPAWN_MODEL_ID = "WorkflowConnect/ModelId";
- public static final String SPAWNED_WORKFLOWS = WorkflowProcessor.LOCAL_KEYS_GROUP + "/WorkflowConnect/SpawnedWorkflows/InstanceIds";
- public static final String SPAWNED_BY_WORKFLOW = WorkflowProcessor.LOCAL_KEYS_GROUP + "/WorkflowConnect/SpawnedByWorkflow/InstanceId";
+ public static final String SPAWNED_WORKFLOWS = "WorkflowConnect/SpawnedWorkflows/InstanceIds";
+ public static final String SPAWNED_BY_WORKFLOW = "WorkflowConnect/SpawnedByWorkflow/InstanceId";
+ public static final String JOIN_METADATA = "WorkflowConnect/JoinMetadata";
+ public static final String JOIN_ONLY_METADATA_KEYS = "WorkflowConnect/JoinMetadata/RestrictToKeys";
+
private WorkflowEngineClient weClient;
@Override
@@ -41,6 +73,14 @@ public class WorkflowConnectTaskInstance
protected ResultsState performExecution(ControlMetadata ctrlMetadata) {
if (ctrlMetadata.getMetadata(SPAWNED_WORKFLOWS) == null) {
+ //Add spawning keys to localized workflow metadata keys
+ Vector<String> localKeys = new Vector<String>();
+ if (ctrlMetadata.getMetadata(WorkflowProcessor.LOCAL_KEYS) != null)
+ localKeys.addAll(ctrlMetadata.getAllMetadata(WorkflowProcessor.LOCAL_KEYS));
+ localKeys.addAll(Arrays.asList(SPAWNED_WORKFLOWS, SPAWNED_BY_WORKFLOW));
+ ctrlMetadata.replaceLocalMetadata(WorkflowProcessor.LOCAL_KEYS, localKeys);
+ ctrlMetadata.setAsWorkflowMetadataKey(WorkflowProcessor.LOCAL_KEYS);
+
//Get Spawn ModelId
String spawnModelId = ctrlMetadata.getMetadata(SPAWN_MODEL_ID);
if (spawnModelId == null)
@@ -72,22 +112,36 @@ public class WorkflowConnectTaskInstance
}
}
- int n = calculator.determineN(ctrlMetadata);
+ int n = -1;
+ try {
+ n = calculator.determineN(ctrlMetadata);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to determine N : " + e.getMessage(), e);
+ return new ResultsFailureState("Failed to determine N : " + e.getMessage());
+ }
Metadata spawnWorkflowMet = ctrlMetadata.asMetadata();
- Vector<String> spawnedInstanceId = new Vector<String>();
+ Vector<String> spawnedInstanceIds = new Vector<String>();
for (int i = 0; i < n; i++) {
Metadata curWorkflowMet = new Metadata(spawnWorkflowMet);
- if (nMetMod != null)
- nMetMod.prepare(i+1, n, curWorkflowMet);
try {
+ if (nMetMod != null)
+ nMetMod.prepare(i+1, n, curWorkflowMet);
curWorkflowMet.replaceMetadata(SPAWNED_BY_WORKFLOW, this.getInstanceId());
- spawnedInstanceId.add(this.weClient.startWorkflow(spawnModelId, curWorkflowMet));
+ spawnedInstanceIds.add(this.weClient.startWorkflow(spawnModelId, curWorkflowMet));
}catch (Exception e) {
- LOG.log(Level.SEVERE, "Failed to start workflow ModelId '" + spawnModelId + "' : " + e.getMessage(), e);
- return new ResultsFailureState("Failed to start workflow ModelId '" + spawnModelId + "' : " + e.getMessage());
+ LOG.log(Level.SEVERE, "Failed to start workflow ModelId '" + spawnModelId + "' [i = '" + i + "'] : " + e.getMessage(), e);
+ return new ResultsFailureState("Failed to start workflow ModelId '" + spawnModelId + "' [i = '" + i + "'] : " + e.getMessage());
+ }finally {
+ for (String spawenedInstanceId : spawnedInstanceIds) {
+ try {
+ this.weClient.setWorkflowState(spawenedInstanceId, new StoppedState("Spawing workflow failed to spawn sibling workflow [i = '" + i + "']"));
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to stop workflow InstanceId = '" + spawenedInstanceId + "' : " + e.getMessage(), e);
+ }
+ }
}
}
- ctrlMetadata.replaceLocalMetadata(SPAWNED_WORKFLOWS, spawnedInstanceId);
+ ctrlMetadata.replaceLocalMetadata(SPAWNED_WORKFLOWS, spawnedInstanceIds);
ctrlMetadata.setAsWorkflowMetadataKey(SPAWNED_WORKFLOWS);
return new ResultsBailState("Waiting for " + n + " of " + n + " spawned workflows to complete");
@@ -110,7 +164,17 @@ public class WorkflowConnectTaskInstance
Metadata dynMet = new Metadata();
for (String spawnedInstanceId : spawnedInstanceIds) {
try {
- dynMet = WorkflowUtils.mergeMetadata(dynMet, this.weClient.getWorkflowMetadata(spawnedInstanceId));
+ if (Boolean.parseBoolean(ctrlMetadata.getMetadata(JOIN_METADATA))) {
+ if (ctrlMetadata.getMetadata(JOIN_ONLY_METADATA_KEYS) != null) {
+ dynMet = WorkflowUtils.mergeMetadata(dynMet, this.weClient.getWorkflowMetadata(spawnedInstanceId));
+ }else {
+ Metadata spawnedMetadata = this.weClient.getWorkflowMetadata(spawnedInstanceId);
+ Metadata joinMetadata = new Metadata();
+ for (String key : ctrlMetadata.getAllMetadata(JOIN_ONLY_METADATA_KEYS))
+ joinMetadata.replaceMetadata(key, spawnedMetadata.getAllMetadata(key));
+ dynMet = WorkflowUtils.mergeMetadata(dynMet, joinMetadata);
+ }
+ }
}catch (Exception e) {
return new ResultsFailureState("Failed to get metadata of spawned workflow [InstanceId='" + spawnedInstanceId + "']");
}
@@ -132,11 +196,13 @@ public class WorkflowConnectTaskInstance
metadata.removeMetadata(SPAWN_MODEL_ID);
metadata.removeMetadata(SPAWNED_WORKFLOWS);
metadata.removeMetadata(SPAWNED_BY_WORKFLOW);
+ metadata.removeMetadata(JOIN_METADATA);
+ metadata.removeMetadata(JOIN_ONLY_METADATA_KEYS);
}
public interface NCalculator {
- public int determineN(ControlMetadata ctrlMetadata);
+ public int determineN(ControlMetadata ctrlMetadata) throws Exception;
}
@@ -147,7 +213,7 @@ public class WorkflowConnectTaskInstance
* @param n Total number of workflows submitted
* @param metadata Metadata for current workflow being submitted
*/
- public void prepare(int i, int n, Metadata metadata);
+ public void prepare(int i, int n, Metadata metadata) throws Exception;
}
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java Tue Mar 8 21:46:58 2011
@@ -57,7 +57,7 @@ import org.apache.oodt.cas.workflow.stat
*/
public abstract class WorkflowProcessor implements WorkflowProcessorListener, Comparable<WorkflowProcessor> {
- public static final String LOCAL_KEYS_GROUP = "WorkflowProcessor/Local";
+ public static final String LOCAL_KEYS = "WorkflowProcessor/Local/Keys";
private String instanceId;
private String modelId;
@@ -240,7 +240,11 @@ public abstract class WorkflowProcessor
public synchronized Metadata getPassThroughDynamicMetadata() {
Metadata passThroughMet = new Metadata(this.dynamicMetadata);
- passThroughMet.removeMetadataGroup(LOCAL_KEYS_GROUP);
+ passThroughMet.removeMetadata(LOCAL_KEYS);
+ for (String key : this.dynamicMetadata.getAllMetadata(LOCAL_KEYS))
+ passThroughMet.removeMetadata(key);
+ for (String key : this.staticMetadata.getAllMetadata(LOCAL_KEYS))
+ passThroughMet.removeMetadata(key);
return passThroughMet;
}
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java Tue Mar 8 21:46:58 2011
@@ -266,6 +266,15 @@ public abstract class AbstractCommunicat
}
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+ try {
+ this.workflowEngine.setWorkflowState(instanceId, state);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to set state '" + state + "' for workflow [instanceid='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ throw new Exception("Failed to set state '" + state + "' for workflow [instanceid='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ }
+ }
+
public void setWorkflowState(String instanceId,
String modelId, WorkflowState state) throws Exception {
try {
@@ -276,6 +285,15 @@ public abstract class AbstractCommunicat
}
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+ try {
+ this.workflowEngine.setWorkflowPriority(instanceId, priority);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to set priority '" + priority + "' for workflow [instanceid='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ throw new Exception("Failed to set priority '" + priority + "' for workflow [instanceid='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ }
+ }
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception {
try {
this.workflowEngine.setWorkflowPriority(instanceId, modelId, priority);
@@ -344,6 +362,15 @@ public abstract class AbstractCommunicat
}
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+ try {
+ this.workflowEngine.updateWorkflowMetadata(instanceId, metadata);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to update workflow metadata [InstanceId='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ throw new Exception("Failed to update workflow metadata [InstanceId='" + instanceId + "'] in engine : " + e.getMessage(), e);
+ }
+ }
+
public void updateWorkflowMetadata(String instanceId,
String modelId, Metadata metadata) throws Exception {
try {
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java Tue Mar 8 21:46:58 2011
@@ -91,6 +91,8 @@ public interface CommunicationChannelCli
public WorkflowInstanceRepository getInstanceRepository() throws Exception;
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
@@ -101,8 +103,12 @@ public interface CommunicationChannelCli
public ProcessorStub getWorkflowStub(String instanceId, String modelId) throws Exception;
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
public Metadata getWorkflowMetadata(String instanceId) throws Exception;
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java Tue Mar 8 21:46:58 2011
@@ -96,6 +96,8 @@ public interface CommunicationChannelSer
public WorkflowInstanceRepository getInstanceRepository() throws Exception;
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
@@ -106,8 +108,12 @@ public interface CommunicationChannelSer
public ProcessorStub getWorkflowStub(String instanceId, String modelId) throws Exception;
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
public Metadata getWorkflowMetadata(String instanceId, String modelId) throws Exception;
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java Tue Mar 8 21:46:58 2011
@@ -233,11 +233,19 @@ public class MultiCommunicationChannelCl
useClient.resumeWorkflow(instanceId);
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+ useClient.setWorkflowPriority(instanceId, priority);
+ }
+
public void setWorkflowPriority(String instanceId, String modelId,
Priority priority) throws Exception {
useClient.setWorkflowPriority(instanceId, modelId, priority);
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+ useClient.setWorkflowState(instanceId, state);
+ }
+
public void setWorkflowState(String instanceId, String modelId,
WorkflowState state) throws Exception {
useClient.setWorkflowState(instanceId, modelId, state);
@@ -277,6 +285,10 @@ public class MultiCommunicationChannelCl
useClient.updateInstanceMetadata(jobId, metadata);
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+ useClient.updateWorkflowMetadata(instanceId, metadata);
+ }
+
public void updateWorkflowMetadata(String instanceId, String modelId,
Metadata metadata) throws Exception {
useClient.updateWorkflowMetadata(instanceId, modelId, metadata);
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java Tue Mar 8 21:46:58 2011
@@ -151,6 +151,10 @@ public class RmiCommunicationChannelClie
return this.serializer.deserializeObject(WorkflowInstanceRepository.class, this.getRmiServer().rmi_getInstanceRepository());
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+ this.getRmiServer().rmi_updateWorkflowMetadata(instanceId, this.serializer.serializeObject(metadata));
+ }
+
public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception {
this.getRmiServer().rmi_updateWorkflowMetadata(instanceId, modelId, this.serializer.serializeObject(metadata));
}
@@ -163,10 +167,18 @@ public class RmiCommunicationChannelClie
this.getRmiServer().rmi_updateWorkflowAndInstance(instanceId, modelId, this.serializer.serializeObject(state), this.serializer.serializeObject(metadata), jobId, this.serializer.serializeObject(instanceMetadata));
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+ this.getRmiServer().rmi_setWorkflowState(instanceId, this.serializer.serializeObject(state));
+ }
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception {
this.getRmiServer().rmi_setWorkflowState(instanceId, modelId, this.serializer.serializeObject(state));
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+ this.getRmiServer().rmi_setWorkflowPriority(instanceId, this.serializer.serializeObject(priority));
+ }
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception {
this.getRmiServer().rmi_setWorkflowPriority(instanceId, modelId, this.serializer.serializeObject(priority));
}
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java Tue Mar 8 21:46:58 2011
@@ -279,6 +279,14 @@ public class RmiCommunicationChannelServ
}
}
+ public void rmi_updateWorkflowMetadata(String instanceId, String metadata) throws RemoteException {
+ try {
+ this.updateWorkflowMetadata(instanceId, this.serializer.deserializeObject(Metadata.class, metadata));
+ }catch (Exception e) {
+ throw new RemoteException(e.getMessage(), e);
+ }
+ }
+
public void rmi_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws RemoteException {
try {
this.updateWorkflowMetadata(instanceId, modelId, this.serializer.deserializeObject(Metadata.class, metadata));
@@ -303,6 +311,14 @@ public class RmiCommunicationChannelServ
}
}
+ public void rmi_setWorkflowState(String instanceId, String state) throws RemoteException {
+ try {
+ this.setWorkflowState(instanceId, this.serializer.deserializeObject(WorkflowState.class, state));
+ }catch (Exception e) {
+ throw new RemoteException(e.getMessage(), e);
+ }
+ }
+
public void rmi_setWorkflowState(String instanceId, String modelId, String state) throws RemoteException {
try {
this.setWorkflowState(instanceId, modelId, this.serializer.deserializeObject(WorkflowState.class, state));
@@ -311,6 +327,14 @@ public class RmiCommunicationChannelServ
}
}
+ public void rmi_setWorkflowPriority(String instanceId, String priority) throws RemoteException {
+ try {
+ this.setWorkflowPriority(instanceId, this.serializer.deserializeObject(Priority.class, priority));
+ }catch (Exception e) {
+ throw new RemoteException(e.getMessage(), e);
+ }
+ }
+
public void rmi_setWorkflowPriority(String instanceId, String modelId, String priority) throws RemoteException {
try {
this.setWorkflowPriority(instanceId, modelId, this.serializer.deserializeObject(Priority.class, priority));
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java Tue Mar 8 21:46:58 2011
@@ -67,14 +67,20 @@ public interface RmiCommunicationChannel
public String rmi_getInstanceRepository() throws RemoteException;
+ public void rmi_updateWorkflowMetadata(String instanceId, String metadata) throws RemoteException;
+
public void rmi_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws RemoteException;
public void rmi_updateInstanceMetadata(String jobId, String metadata) throws RemoteException;
public void rmi_updateWorkflowAndInstance(String instanceId, String modelId, String state, String metadata, String jobId, String instanceMetadata) throws RemoteException;
+ public void rmi_setWorkflowState(String instanceId, String state) throws RemoteException;
+
public void rmi_setWorkflowState(String instanceId, String modelId, String state) throws RemoteException;
+ public void rmi_setWorkflowPriority(String instanceId, String priority) throws RemoteException;
+
public void rmi_setWorkflowPriority(String instanceId, String modelId, String priority) throws RemoteException;
public String rmi_getWorkflowMetadata(String instanceId) throws RemoteException;
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java Tue Mar 8 21:46:58 2011
@@ -90,13 +90,19 @@ public interface RmiCommunicationChannel
public ProcessorInfo getProcessorInfo(String instanceId, String modelId) throws Exception;
public WorkflowState getWorkflowState(String instanceId) throws Exception;
-
+
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
public String rmi_getWorkflowStub(String instanceId) throws RemoteException;
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java Tue Mar 8 21:46:58 2011
@@ -231,6 +231,13 @@ public class XmlRpcCommunicationChannelC
this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_resumeWorkflow", args);
}
+ public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+ Vector<Object> args = new Vector<Object>();
+ args.add(instanceId);
+ args.add(this.serializer.serializeObject(state));
+ this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowState", args);
+ }
+
public void setWorkflowState(String instanceId,
String modelId, WorkflowState state) throws Exception {
Vector<Object> args = new Vector<Object>();
@@ -240,6 +247,13 @@ public class XmlRpcCommunicationChannelC
this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowState", args);
}
+ public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+ Vector<Object> args = new Vector<Object>();
+ args.add(instanceId);
+ args.add(this.serializer.serializeObject(priority));
+ this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowPriority", args);
+ }
+
public void setWorkflowPriority(String instanceId,
String modelId, Priority priority) throws Exception {
Vector<Object> args = new Vector<Object>();
@@ -296,6 +310,13 @@ public class XmlRpcCommunicationChannelC
this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_updateInstanceMetadata", args);
}
+ public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+ Vector<Object> args = new Vector<Object>();
+ args.add(instanceId);
+ args.add(this.serializer.serializeObject(metadata));
+ this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_updateWorkflowMetadata", args);
+ }
+
public void updateWorkflowMetadata(String instanceId,
String modelId, Metadata metadata) throws Exception {
Vector<Object> args = new Vector<Object>();
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java Tue Mar 8 21:46:58 2011
@@ -147,11 +147,21 @@ public class XmlRpcCommunicationChannelS
return this.serializer.serializeObject(Boolean.TRUE);
}
+ public String xmlrpc_setWorkflowState(String instanceId, String state) throws Exception {
+ this.setWorkflowState(instanceId, this.serializer.deserializeObject(WorkflowState.class, state));
+ return this.serializer.serializeObject(Boolean.TRUE);
+ }
+
public String xmlrpc_setWorkflowState(String instanceId, String modelId, String state) throws Exception {
this.setWorkflowState(instanceId, modelId, this.serializer.deserializeObject(WorkflowState.class, state));
return this.serializer.serializeObject(Boolean.TRUE);
}
+ public String xmlrpc_setWorkflowPriority(String instanceId, String priority) throws Exception {
+ this.setWorkflowPriority(instanceId, this.serializer.deserializeObject(Priority.class, priority));
+ return this.serializer.serializeObject(Boolean.TRUE);
+ }
+
public String xmlrpc_setWorkflowPriority(String instanceId, String modelId, String priority) throws Exception {
this.setWorkflowPriority(instanceId, modelId, this.serializer.deserializeObject(Priority.class, priority));
return this.serializer.serializeObject(Boolean.TRUE);
@@ -183,6 +193,11 @@ public class XmlRpcCommunicationChannelS
return this.serializer.serializeObject(Boolean.TRUE);
}
+ public String xmlrpc_updateWorkflowMetadata(String instanceId, String metadata) throws Exception {
+ this.updateWorkflowMetadata(instanceId, this.serializer.deserializeObject(Metadata.class, metadata));
+ return this.serializer.serializeObject(Boolean.TRUE);
+ }
+
public String xmlrpc_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws Exception {
this.updateWorkflowMetadata(instanceId, modelId, this.serializer.deserializeObject(Metadata.class, metadata));
return this.serializer.serializeObject(Boolean.TRUE);
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java Tue Mar 8 21:46:58 2011
@@ -66,8 +66,12 @@ public interface XmlRpcCommunicationServ
public String xmlrpc_resumeWorkflow(String instanceId) throws Exception;
+ public String xmlrpc_setWorkflowState(String instanceId, String state) throws Exception;
+
public String xmlrpc_setWorkflowState(String instanceId, String modelId, String state) throws Exception;
+ public String xmlrpc_setWorkflowPriority(String instanceId, String priority) throws Exception;
+
public String xmlrpc_setWorkflowPriority(String instanceId, String modelId, String priority) throws Exception;
public String xmlrpc_startWorkflow(String workflow, String metadata) throws Exception;
@@ -82,6 +86,8 @@ public interface XmlRpcCommunicationServ
public String xmlrpc_updateInstanceMetadata(String jobId, String metadata) throws Exception;
+ public String xmlrpc_updateWorkflowMetadata(String instanceId, String metadata) throws Exception;
+
public String xmlrpc_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws Exception;
public String xmlrpc_registerEvent(String event) throws Exception;
Modified: oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml (original)
+++ oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml Tue Mar 8 21:46:58 2011
@@ -12,6 +12,8 @@
<property name="WorkflowConnect/ModelId" value="TestConnectTask"/>
<property name="WorkflowConnect/NCalculator/Class" value="org.apache.oodt.cas.workflow.instance.SimpleNCalculator"/>
<property name="WorkflowConnect/NMetadataModifier/Class" value="org.apache.oodt.cas.workflow.instance.SimpleNMetadataModification"/>
+ <property name="WorkflowConnect/JoinMetadata" value="true"/>
+ <property name="WorkflowConnect/JoinMetadata/RestrictToKeys" value="TestConnectTask_key"/>
<property name="SimpleNCalculator/N" value="3"/>
</configuration>
</task>