You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/03/08 22:46:59 UTC

svn commit: r1079558 - in /oodt/branches/wengine-branch/wengine/src/main: java/org/apache/oodt/cas/workflow/engine/ java/org/apache/oodt/cas/workflow/instance/ java/org/apache/oodt/cas/workflow/processor/ java/org/apache/oodt/cas/workflow/server/channe...

Author: bfoster
Date: Tue Mar  8 21:46:58 2011
New Revision: 1079558

URL: http://svn.apache.org/viewvc?rev=1079558&view=rev
Log:

- improvements to ConnectWorkflowTaskInstance

- additional methods added to WorkflowEngine

--------------------

Modified:
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java
    oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngine.java Tue Mar  8 21:46:58 2011
@@ -97,14 +97,20 @@ public interface WorkflowEngine {
 
     public ProcessorInfo getProcessorInfo(String instanceId, String modelId) throws EngineException;
     
+    public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException;
+
     public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws EngineException;
     
     public void updateInstanceMetadata(String jobId, Metadata metadata) throws EngineException;
 
     public void updateWorkflowAndInstance(String instanceId, String modelId, WorkflowState state, Metadata metadata, String jobId, Metadata instanceMetadata) throws EngineException;
     
+    public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException;
+
     public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws EngineException;
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException;
+
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException;
     
     public Metadata getWorkflowMetadata(String instanceId) throws EngineException;

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineClient.java Tue Mar  8 21:46:58 2011
@@ -238,6 +238,14 @@ public class WorkflowEngineClient implem
 		}
 	}
 
+	public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException {
+		try {
+			this.client.setWorkflowState(instanceId, state);
+		}catch (Exception e) {
+			throw new EngineException("Failed to update workflow state : " + e.getMessage(), e);
+		}
+	}
+	
 	public void setWorkflowState(String instanceId,
 			String modelId, WorkflowState state) throws EngineException {
 		try {
@@ -247,6 +255,14 @@ public class WorkflowEngineClient implem
 		}
 	}
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException {
+		try {
+			this.client.setWorkflowPriority(instanceId, priority);
+		}catch (Exception e) {
+			throw new EngineException("Failed to update workflow priority : " + e.getMessage(), e);
+		}
+    }
+    
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException {
 		try {
 			this.client.setWorkflowPriority(instanceId, modelId, priority);
@@ -307,6 +323,14 @@ public class WorkflowEngineClient implem
 		}
 	}
 
+	public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException {
+		try {
+			this.client.updateWorkflowMetadata(instanceId, metadata);
+		}catch (Exception e) {
+			throw new EngineException(e);
+		}
+	}
+	
 	public void updateWorkflowMetadata(String instanceId,
 			String modelId, Metadata metadata) throws EngineException {
 		try {

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowEngineLocal.java Tue Mar  8 21:46:58 2011
@@ -259,10 +259,18 @@ public class WorkflowEngineLocal impleme
     	return WorkflowUtils.findProcessor(this.queueManager.getWorkflowProcessor(instanceId), modelId).getStub();
     }
     
+	public void setWorkflowState(String instanceId, WorkflowState state) throws EngineException {
+		this.queueManager.setState(instanceId, null, state);
+	}
+	
 	public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws EngineException {
 		this.queueManager.setState(instanceId, modelId, state);
 	}
 	
+    public void setWorkflowPriority(String instanceId, Priority priority) throws EngineException {
+		this.queueManager.setPriority(instanceId, null, priority);
+    }
+
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws EngineException {
 		this.queueManager.setPriority(instanceId, modelId, priority);
     }
@@ -356,6 +364,10 @@ public class WorkflowEngineLocal impleme
 		}
 	}
 
+	public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws EngineException {
+		this.queueManager.setMetadata(instanceId, null, metadata);
+	}
+	
 	public void updateWorkflowMetadata(String instanceId,
 			String modelId, Metadata metadata) throws EngineException {
 		this.queueManager.setMetadata(instanceId, modelId, metadata);

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/WorkflowConnectTaskInstance.java Tue Mar  8 21:46:58 2011
@@ -1,16 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.oodt.cas.workflow.instance;
 
+//JDK imports
+import java.util.Arrays;
 import java.util.List;
 import java.util.Vector;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+//OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.workflow.engine.WorkflowEngineClient;
 import org.apache.oodt.cas.workflow.metadata.ControlMetadata;
 import org.apache.oodt.cas.workflow.processor.WorkflowProcessor;
 import org.apache.oodt.cas.workflow.state.WorkflowState;
 import org.apache.oodt.cas.workflow.state.done.FailureState;
+import org.apache.oodt.cas.workflow.state.done.StoppedState;
 import org.apache.oodt.cas.workflow.state.done.SuccessState;
 import org.apache.oodt.cas.workflow.state.results.ResultsBailState;
 import org.apache.oodt.cas.workflow.state.results.ResultsFailureState;
@@ -18,6 +38,15 @@ import org.apache.oodt.cas.workflow.stat
 import org.apache.oodt.cas.workflow.state.results.ResultsSuccessState;
 import org.apache.oodt.cas.workflow.util.WorkflowUtils;
 
+/**
+ * 
+ * @author bfoster
+ * @version $Revision$
+ * 
+ * <p>
+ * Connect Workflows Spawn/Wait Task Instance
+ * </p>.
+ */
 public class WorkflowConnectTaskInstance extends TaskInstance {
 	
 	private static final Logger LOG = Logger.getLogger(WorkflowConnectTaskInstance.class.getName());
@@ -26,9 +55,12 @@ public class WorkflowConnectTaskInstance
 	public static final String N_MET_MOD_CLASS = "WorkflowConnect/NMetadataModifier/Class"; 
 	public static final String SPAWN_MODEL_ID = "WorkflowConnect/ModelId"; 
 	
-	public static final String SPAWNED_WORKFLOWS = WorkflowProcessor.LOCAL_KEYS_GROUP + "/WorkflowConnect/SpawnedWorkflows/InstanceIds";
-	public static final String SPAWNED_BY_WORKFLOW = WorkflowProcessor.LOCAL_KEYS_GROUP + "/WorkflowConnect/SpawnedByWorkflow/InstanceId";
+	public static final String SPAWNED_WORKFLOWS = "WorkflowConnect/SpawnedWorkflows/InstanceIds";
+	public static final String SPAWNED_BY_WORKFLOW = "WorkflowConnect/SpawnedByWorkflow/InstanceId";
 
+	public static final String JOIN_METADATA = "WorkflowConnect/JoinMetadata";
+	public static final String JOIN_ONLY_METADATA_KEYS = "WorkflowConnect/JoinMetadata/RestrictToKeys";
+	
 	private WorkflowEngineClient weClient;
 	
 	@Override
@@ -41,6 +73,14 @@ public class WorkflowConnectTaskInstance
 	protected ResultsState performExecution(ControlMetadata ctrlMetadata) {
 		if (ctrlMetadata.getMetadata(SPAWNED_WORKFLOWS) == null) {
 			
+			//Add spawning keys to localized workflow metadata keys
+			Vector<String> localKeys = new Vector<String>();
+			if (ctrlMetadata.getMetadata(WorkflowProcessor.LOCAL_KEYS) != null)
+				localKeys.addAll(ctrlMetadata.getAllMetadata(WorkflowProcessor.LOCAL_KEYS));
+			localKeys.addAll(Arrays.asList(SPAWNED_WORKFLOWS, SPAWNED_BY_WORKFLOW));
+			ctrlMetadata.replaceLocalMetadata(WorkflowProcessor.LOCAL_KEYS, localKeys);
+			ctrlMetadata.setAsWorkflowMetadataKey(WorkflowProcessor.LOCAL_KEYS);
+			
 			//Get Spawn ModelId
 			String spawnModelId = ctrlMetadata.getMetadata(SPAWN_MODEL_ID);
 			if (spawnModelId == null)
@@ -72,22 +112,36 @@ public class WorkflowConnectTaskInstance
 				}
 			}
 			
-			int n = calculator.determineN(ctrlMetadata);
+			int n = -1;
+			try {
+				n = calculator.determineN(ctrlMetadata);
+			}catch (Exception e) {
+				LOG.log(Level.SEVERE, "Failed to determine N : " + e.getMessage(), e);
+				return new ResultsFailureState("Failed to determine N : " + e.getMessage());
+			}
 			Metadata spawnWorkflowMet = ctrlMetadata.asMetadata();
-			Vector<String> spawnedInstanceId = new Vector<String>();
+			Vector<String> spawnedInstanceIds = new Vector<String>();
 			for (int i = 0; i < n; i++) {
 				Metadata curWorkflowMet = new Metadata(spawnWorkflowMet);
-				if (nMetMod != null)
-					nMetMod.prepare(i+1, n, curWorkflowMet);
 				try {
+					if (nMetMod != null)
+						nMetMod.prepare(i+1, n, curWorkflowMet);
 					curWorkflowMet.replaceMetadata(SPAWNED_BY_WORKFLOW, this.getInstanceId());
-					spawnedInstanceId.add(this.weClient.startWorkflow(spawnModelId, curWorkflowMet));
+					spawnedInstanceIds.add(this.weClient.startWorkflow(spawnModelId, curWorkflowMet));
 				}catch (Exception e) {
-					LOG.log(Level.SEVERE, "Failed to start workflow ModelId '" + spawnModelId + "' : " + e.getMessage(), e);
-					return new ResultsFailureState("Failed to start workflow ModelId '" + spawnModelId + "' : " + e.getMessage());
+					LOG.log(Level.SEVERE, "Failed to start workflow ModelId '" + spawnModelId + "' [i = '" + i + "'] : " + e.getMessage(), e);
+					return new ResultsFailureState("Failed to start workflow ModelId '" + spawnModelId + "' [i = '" + i + "'] : " + e.getMessage());
+				}finally {
+					for (String spawenedInstanceId : spawnedInstanceIds) {
+						try {
+							this.weClient.setWorkflowState(spawenedInstanceId, new StoppedState("Spawing workflow failed to spawn sibling workflow [i = '" + i + "']"));
+						}catch (Exception e) {
+							LOG.log(Level.SEVERE, "Failed to stop workflow InstanceId = '" + spawenedInstanceId + "' : " + e.getMessage(), e);
+						}
+					}
 				}
 			}
-			ctrlMetadata.replaceLocalMetadata(SPAWNED_WORKFLOWS, spawnedInstanceId);
+			ctrlMetadata.replaceLocalMetadata(SPAWNED_WORKFLOWS, spawnedInstanceIds);
 			ctrlMetadata.setAsWorkflowMetadataKey(SPAWNED_WORKFLOWS);
 			
 			return new ResultsBailState("Waiting for " + n + " of " + n + " spawned workflows to complete");
@@ -110,7 +164,17 @@ public class WorkflowConnectTaskInstance
 				Metadata dynMet = new Metadata();
 				for (String spawnedInstanceId : spawnedInstanceIds) {
 					try {
-						dynMet = WorkflowUtils.mergeMetadata(dynMet, this.weClient.getWorkflowMetadata(spawnedInstanceId));
+						if (Boolean.parseBoolean(ctrlMetadata.getMetadata(JOIN_METADATA))) {
+							if (ctrlMetadata.getMetadata(JOIN_ONLY_METADATA_KEYS) != null) {
+								dynMet = WorkflowUtils.mergeMetadata(dynMet, this.weClient.getWorkflowMetadata(spawnedInstanceId));
+							}else { 
+								Metadata spawnedMetadata = this.weClient.getWorkflowMetadata(spawnedInstanceId);
+								Metadata joinMetadata = new Metadata();
+								for (String key : ctrlMetadata.getAllMetadata(JOIN_ONLY_METADATA_KEYS))
+									joinMetadata.replaceMetadata(key, spawnedMetadata.getAllMetadata(key));
+								dynMet = WorkflowUtils.mergeMetadata(dynMet, joinMetadata);									
+							}
+						}
 					}catch (Exception e) {
 						return new ResultsFailureState("Failed to get metadata of spawned workflow [InstanceId='" + spawnedInstanceId + "']");
 					}
@@ -132,11 +196,13 @@ public class WorkflowConnectTaskInstance
 		metadata.removeMetadata(SPAWN_MODEL_ID);
 		metadata.removeMetadata(SPAWNED_WORKFLOWS);
 		metadata.removeMetadata(SPAWNED_BY_WORKFLOW);
+		metadata.removeMetadata(JOIN_METADATA);
+		metadata.removeMetadata(JOIN_ONLY_METADATA_KEYS);
 	}
 	
 	public interface NCalculator {
 		
-		public int determineN(ControlMetadata ctrlMetadata);
+		public int determineN(ControlMetadata ctrlMetadata) throws Exception;
 		
 	}
 	
@@ -147,7 +213,7 @@ public class WorkflowConnectTaskInstance
 		 * @param n Total number of workflows submitted
 		 * @param metadata Metadata for current workflow being submitted
 		 */
-		public void prepare(int i, int n, Metadata metadata);
+		public void prepare(int i, int n, Metadata metadata) throws Exception;
 		
 	}
 	

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/processor/WorkflowProcessor.java Tue Mar  8 21:46:58 2011
@@ -57,7 +57,7 @@ import org.apache.oodt.cas.workflow.stat
  */
 public abstract class WorkflowProcessor implements WorkflowProcessorListener, Comparable<WorkflowProcessor> {
 
-	public static final String LOCAL_KEYS_GROUP = "WorkflowProcessor/Local";
+	public static final String LOCAL_KEYS = "WorkflowProcessor/Local/Keys";
 	
 	private String instanceId;
 	private String modelId;
@@ -240,7 +240,11 @@ public abstract class WorkflowProcessor 
 	
 	public synchronized Metadata getPassThroughDynamicMetadata() {
 		Metadata passThroughMet = new Metadata(this.dynamicMetadata);
-		passThroughMet.removeMetadataGroup(LOCAL_KEYS_GROUP);
+		passThroughMet.removeMetadata(LOCAL_KEYS);
+		for (String key : this.dynamicMetadata.getAllMetadata(LOCAL_KEYS))
+			passThroughMet.removeMetadata(key);
+		for (String key : this.staticMetadata.getAllMetadata(LOCAL_KEYS))
+			passThroughMet.removeMetadata(key);
 		return passThroughMet;
 	}
 	

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/AbstractCommunicationChannelServer.java Tue Mar  8 21:46:58 2011
@@ -266,6 +266,15 @@ public abstract class AbstractCommunicat
 		}
 	}
 
+	public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+		try {
+			this.workflowEngine.setWorkflowState(instanceId, state);
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to set state '" + state + "' for workflow [instanceid='" + instanceId  + "'] in engine : " + e.getMessage(), e);
+			throw new Exception("Failed to set state '" + state + "' for workflow [instanceid='" + instanceId  + "'] in engine : " + e.getMessage(), e);
+		}
+	}
+	
 	public void setWorkflowState(String instanceId,
 			String modelId, WorkflowState state) throws Exception {
 		try {
@@ -276,6 +285,15 @@ public abstract class AbstractCommunicat
 		}
 	}
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+    	try {
+    		this.workflowEngine.setWorkflowPriority(instanceId, priority);
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to set priority '" + priority + "' for workflow [instanceid='" + instanceId  + "'] in engine : " + e.getMessage(), e);
+			throw new Exception("Failed to set priority '" + priority + "' for workflow [instanceid='" + instanceId  + "'] in engine : " + e.getMessage(), e);
+		}
+    }
+	
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception {
     	try {
     		this.workflowEngine.setWorkflowPriority(instanceId, modelId, priority);
@@ -344,6 +362,15 @@ public abstract class AbstractCommunicat
 		}
 	}
 
+	public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+		try {
+			this.workflowEngine.updateWorkflowMetadata(instanceId, metadata);
+		}catch (Exception e) {
+			LOG.log(Level.SEVERE, "Failed to update workflow metadata [InstanceId='" + instanceId + "'] in engine : " + e.getMessage(), e);
+			throw new Exception("Failed to update workflow metadata [InstanceId='" + instanceId + "'] in engine : " + e.getMessage(), e);
+		}
+	}
+	
 	public void updateWorkflowMetadata(String instanceId,
 			String modelId, Metadata metadata) throws Exception {
 		try {

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelClient.java Tue Mar  8 21:46:58 2011
@@ -91,6 +91,8 @@ public interface CommunicationChannelCli
 
     public WorkflowInstanceRepository getInstanceRepository() throws Exception;
 
+    public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
     public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
     
     public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
@@ -101,8 +103,12 @@ public interface CommunicationChannelCli
 
     public ProcessorStub getWorkflowStub(String instanceId, String modelId) throws Exception;
 
+    public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
     public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
     
     public Metadata getWorkflowMetadata(String instanceId) throws Exception;

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/CommunicationChannelServer.java Tue Mar  8 21:46:58 2011
@@ -96,6 +96,8 @@ public interface CommunicationChannelSer
     
     public WorkflowInstanceRepository getInstanceRepository() throws Exception;
 
+    public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
     public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
     
     public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
@@ -106,8 +108,12 @@ public interface CommunicationChannelSer
 
     public ProcessorStub getWorkflowStub(String instanceId, String modelId) throws Exception;
 
+    public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
     public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
 
     public Metadata getWorkflowMetadata(String instanceId, String modelId) throws Exception;

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/MultiCommunicationChannelClient.java Tue Mar  8 21:46:58 2011
@@ -233,11 +233,19 @@ public class MultiCommunicationChannelCl
 		useClient.resumeWorkflow(instanceId);
 	}
 
+	public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+		useClient.setWorkflowPriority(instanceId, priority);
+	}
+	
 	public void setWorkflowPriority(String instanceId, String modelId,
 			Priority priority) throws Exception {
 		useClient.setWorkflowPriority(instanceId, modelId, priority);
 	}
 
+	public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+		useClient.setWorkflowState(instanceId, state);
+	}
+	
 	public void setWorkflowState(String instanceId, String modelId,
 			WorkflowState state) throws Exception {
 		useClient.setWorkflowState(instanceId, modelId, state);
@@ -277,6 +285,10 @@ public class MultiCommunicationChannelCl
 		useClient.updateInstanceMetadata(jobId, metadata);
 	}
 
+	public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+		useClient.updateWorkflowMetadata(instanceId, metadata);
+	}
+	
 	public void updateWorkflowMetadata(String instanceId, String modelId,
 			Metadata metadata) throws Exception {
 		useClient.updateWorkflowMetadata(instanceId, modelId, metadata);

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelClient.java Tue Mar  8 21:46:58 2011
@@ -151,6 +151,10 @@ public class RmiCommunicationChannelClie
 		return this.serializer.deserializeObject(WorkflowInstanceRepository.class, this.getRmiServer().rmi_getInstanceRepository());
     }
 
+    public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+		this.getRmiServer().rmi_updateWorkflowMetadata(instanceId, this.serializer.serializeObject(metadata));
+    }
+    
     public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception {
 		this.getRmiServer().rmi_updateWorkflowMetadata(instanceId, modelId, this.serializer.serializeObject(metadata));
     }
@@ -163,10 +167,18 @@ public class RmiCommunicationChannelClie
 		this.getRmiServer().rmi_updateWorkflowAndInstance(instanceId, modelId, this.serializer.serializeObject(state), this.serializer.serializeObject(metadata), jobId, this.serializer.serializeObject(instanceMetadata));
     }
     
+    public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+		this.getRmiServer().rmi_setWorkflowState(instanceId, this.serializer.serializeObject(state));
+    }
+    
     public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception {
 		this.getRmiServer().rmi_setWorkflowState(instanceId, modelId, this.serializer.serializeObject(state));
     }
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+		this.getRmiServer().rmi_setWorkflowPriority(instanceId, this.serializer.serializeObject(priority));
+    }
+    
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception {
 		this.getRmiServer().rmi_setWorkflowPriority(instanceId, modelId, this.serializer.serializeObject(priority));
     }

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServer.java Tue Mar  8 21:46:58 2011
@@ -279,6 +279,14 @@ public class RmiCommunicationChannelServ
 		}
     }
 
+    public void rmi_updateWorkflowMetadata(String instanceId, String metadata) throws RemoteException {
+		try {
+			this.updateWorkflowMetadata(instanceId, this.serializer.deserializeObject(Metadata.class, metadata));
+		}catch (Exception e) {
+			throw new RemoteException(e.getMessage(), e);
+		}
+    }
+    
     public void rmi_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws RemoteException {
 		try {
 			this.updateWorkflowMetadata(instanceId, modelId, this.serializer.deserializeObject(Metadata.class, metadata));
@@ -303,6 +311,14 @@ public class RmiCommunicationChannelServ
 		}
     }
     
+    public void rmi_setWorkflowState(String instanceId, String state) throws RemoteException {
+		try {
+			this.setWorkflowState(instanceId, this.serializer.deserializeObject(WorkflowState.class, state));
+		}catch (Exception e) {
+			throw new RemoteException(e.getMessage(), e);
+		}
+    }
+    
     public void rmi_setWorkflowState(String instanceId, String modelId, String state) throws RemoteException {
 		try {
 			this.setWorkflowState(instanceId, modelId, this.serializer.deserializeObject(WorkflowState.class, state));
@@ -311,6 +327,14 @@ public class RmiCommunicationChannelServ
 		}
     }
 
+    public void rmi_setWorkflowPriority(String instanceId, String priority) throws RemoteException {
+		try {
+			this.setWorkflowPriority(instanceId, this.serializer.deserializeObject(Priority.class, priority));
+		}catch (Exception e) {
+			throw new RemoteException(e.getMessage(), e);
+		}
+    }
+    
     public void rmi_setWorkflowPriority(String instanceId, String modelId, String priority) throws RemoteException {
 		try {
 			this.setWorkflowPriority(instanceId, modelId, this.serializer.deserializeObject(Priority.class, priority));

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerInterface.java Tue Mar  8 21:46:58 2011
@@ -67,14 +67,20 @@ public interface RmiCommunicationChannel
 
     public String rmi_getInstanceRepository() throws RemoteException;
 
+    public void rmi_updateWorkflowMetadata(String instanceId, String metadata) throws RemoteException;
+
     public void rmi_updateWorkflowMetadata(String instanceId, String modelId, String metadata) throws RemoteException;
     
     public void rmi_updateInstanceMetadata(String jobId, String metadata) throws RemoteException;
 
     public void rmi_updateWorkflowAndInstance(String instanceId, String modelId, String state, String metadata, String jobId, String instanceMetadata) throws RemoteException;
 
+    public void rmi_setWorkflowState(String instanceId, String state) throws RemoteException;
+
     public void rmi_setWorkflowState(String instanceId, String modelId, String state) throws RemoteException;
 
+    public void rmi_setWorkflowPriority(String instanceId, String priority) throws RemoteException;
+
     public void rmi_setWorkflowPriority(String instanceId, String modelId, String priority) throws RemoteException;
 
     public String rmi_getWorkflowMetadata(String instanceId) throws RemoteException;

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/rmi/RmiCommunicationChannelServerMBean.java Tue Mar  8 21:46:58 2011
@@ -90,13 +90,19 @@ public interface RmiCommunicationChannel
     public ProcessorInfo getProcessorInfo(String instanceId, String modelId) throws Exception;
     
     public WorkflowState getWorkflowState(String instanceId) throws Exception;
-    
+
+    public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception;
+
     public void updateWorkflowMetadata(String instanceId, String modelId, Metadata metadata) throws Exception;
     
     public void updateInstanceMetadata(String jobId, Metadata metadata) throws Exception;
 
+    public void setWorkflowState(String instanceId, WorkflowState state) throws Exception;
+
     public void setWorkflowState(String instanceId, String modelId, WorkflowState state) throws Exception;
 
+    public void setWorkflowPriority(String instanceId, Priority priority) throws Exception;
+
     public void setWorkflowPriority(String instanceId, String modelId, Priority priority) throws Exception;
 
     public String rmi_getWorkflowStub(String instanceId) throws RemoteException;

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelClient.java Tue Mar  8 21:46:58 2011
@@ -231,6 +231,13 @@ public class XmlRpcCommunicationChannelC
 		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_resumeWorkflow", args);
 	}
 
+	public void setWorkflowState(String instanceId, WorkflowState state) throws Exception {
+		Vector<Object> args = new Vector<Object>();
+		args.add(instanceId);
+		args.add(this.serializer.serializeObject(state));
+		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowState", args);
+	}
+	
 	public void setWorkflowState(String instanceId,
 			String modelId, WorkflowState state) throws Exception {
 		Vector<Object> args = new Vector<Object>();
@@ -240,6 +247,13 @@ public class XmlRpcCommunicationChannelC
 		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowState", args);
 	}
 	
+	public void setWorkflowPriority(String instanceId, Priority priority) throws Exception {
+		Vector<Object> args = new Vector<Object>();
+		args.add(instanceId);
+		args.add(this.serializer.serializeObject(priority));
+		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_setWorkflowPriority", args);
+	}
+	
 	public void setWorkflowPriority(String instanceId,
 			String modelId, Priority priority) throws Exception {
 		Vector<Object> args = new Vector<Object>();
@@ -296,6 +310,13 @@ public class XmlRpcCommunicationChannelC
 		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_updateInstanceMetadata", args);
 	}
 
+	public void updateWorkflowMetadata(String instanceId, Metadata metadata) throws Exception {
+		Vector<Object> args = new Vector<Object>();
+		args.add(instanceId);
+		args.add(this.serializer.serializeObject(metadata));
+		this.client.execute(XmlRpcCommunicationChannelServer.class.getSimpleName() + ".xmlrpc_updateWorkflowMetadata", args);
+	}
+	
 	public void updateWorkflowMetadata(String instanceId,
 			String modelId, Metadata metadata) throws Exception {
 		Vector<Object> args = new Vector<Object>();

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationChannelServer.java Tue Mar  8 21:46:58 2011
@@ -147,11 +147,21 @@ public class XmlRpcCommunicationChannelS
 		return this.serializer.serializeObject(Boolean.TRUE);
 	}
 
+	public String xmlrpc_setWorkflowState(String instanceId, String state) throws Exception {
+		this.setWorkflowState(instanceId, this.serializer.deserializeObject(WorkflowState.class, state));
+		return this.serializer.serializeObject(Boolean.TRUE);
+	}
+	
 	public String xmlrpc_setWorkflowState(String instanceId, String modelId, String state) throws Exception {
 		this.setWorkflowState(instanceId, modelId, this.serializer.deserializeObject(WorkflowState.class, state));
 		return this.serializer.serializeObject(Boolean.TRUE);
 	}
 	
+    public String xmlrpc_setWorkflowPriority(String instanceId, String priority) throws Exception {
+    	this.setWorkflowPriority(instanceId, this.serializer.deserializeObject(Priority.class, priority));
+		return this.serializer.serializeObject(Boolean.TRUE);
+    }
+	
     public String xmlrpc_setWorkflowPriority(String instanceId, String modelId, String priority) throws Exception {
     	this.setWorkflowPriority(instanceId, modelId, this.serializer.deserializeObject(Priority.class, priority));
 		return this.serializer.serializeObject(Boolean.TRUE);
@@ -183,6 +193,11 @@ public class XmlRpcCommunicationChannelS
 		return this.serializer.serializeObject(Boolean.TRUE);
 	}
 	
+	public String xmlrpc_updateWorkflowMetadata(String instanceId, String metadata)  throws Exception {
+		this.updateWorkflowMetadata(instanceId, this.serializer.deserializeObject(Metadata.class, metadata));
+		return this.serializer.serializeObject(Boolean.TRUE);
+	}
+	
 	public String xmlrpc_updateWorkflowMetadata(String instanceId, String modelId, String metadata)  throws Exception {
 		this.updateWorkflowMetadata(instanceId, modelId, this.serializer.deserializeObject(Metadata.class, metadata));
 		return this.serializer.serializeObject(Boolean.TRUE);

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/server/channel/xmlrpc/XmlRpcCommunicationServerInterface.java Tue Mar  8 21:46:58 2011
@@ -66,8 +66,12 @@ public interface XmlRpcCommunicationServ
 
 	public String xmlrpc_resumeWorkflow(String instanceId) throws Exception;
 
+	public String xmlrpc_setWorkflowState(String instanceId, String state) throws Exception;
+
 	public String xmlrpc_setWorkflowState(String instanceId, String modelId, String state) throws Exception;
 
+    public String xmlrpc_setWorkflowPriority(String instanceId, String priority) throws Exception;
+
     public String xmlrpc_setWorkflowPriority(String instanceId, String modelId, String priority) throws Exception;
 	
 	public String xmlrpc_startWorkflow(String workflow, String metadata) throws Exception;
@@ -82,6 +86,8 @@ public interface XmlRpcCommunicationServ
 
 	public String xmlrpc_updateInstanceMetadata(String jobId, String metadata) throws Exception;
 	
+	public String xmlrpc_updateWorkflowMetadata(String instanceId, String metadata)  throws Exception;
+
 	public String xmlrpc_updateWorkflowMetadata(String instanceId, String modelId, String metadata)  throws Exception;
 
     public String xmlrpc_registerEvent(String event) throws Exception;

Modified: oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml?rev=1079558&r1=1079557&r2=1079558&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml (original)
+++ oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowModelTestFile.xml Tue Mar  8 21:46:58 2011
@@ -12,6 +12,8 @@
 				<property name="WorkflowConnect/ModelId" value="TestConnectTask"/>
 				<property name="WorkflowConnect/NCalculator/Class" value="org.apache.oodt.cas.workflow.instance.SimpleNCalculator"/>
 				<property name="WorkflowConnect/NMetadataModifier/Class" value="org.apache.oodt.cas.workflow.instance.SimpleNMetadataModification"/>
+				<property name="WorkflowConnect/JoinMetadata" value="true"/>
+				<property name="WorkflowConnect/JoinMetadata/RestrictToKeys" value="TestConnectTask_key"/>
 				<property name="SimpleNCalculator/N" value="3"/>
 			</configuration>
 		</task>