You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ad...@apache.org on 2006/02/28 17:05:47 UTC

svn commit: r381686 [21/40] - in /incubator/ode/scratch/bpe: ./ bpelTests/ bpelTests/probeService/ bpelTests/test1/ bpelTests/test10/ bpelTests/test12/ bpelTests/test13/ bpelTests/test14/ bpelTests/test15/ bpelTests/test16/ bpelTests/test17/ bpelTests/...

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,823 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.bped.IInternalEventDirector;
+import com.sybase.bpe.context.IContainer;
+import com.sybase.bpe.context.IContextService;
+import com.sybase.bpe.context.resolver.ContextResolver;
+import com.sybase.bpe.context.resolver.IResolvedObject;
+import com.sybase.bpe.context.test.SerializationContext;
+import com.sybase.bpe.definition.IPMDCorrelationSet;
+import com.sybase.bpe.definition.IPMDLocator;
+import com.sybase.bpe.definition.IPMDOperation;
+import com.sybase.bpe.definition.IPMDProcess;
+import com.sybase.bpe.definition.IPMDRoot;
+import com.sybase.bpe.definition.service.DefinitionService;
+import com.sybase.bpe.engine.ProcessDefinitionKey;
+import com.sybase.bpe.engine.ProcessInstance;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.engine.ReturnMessageLocatorHolder;
+import com.sybase.bpe.engine.StateEnum;
+import com.sybase.bpe.event.Fault;
+import com.sybase.bpe.event.IRequestMessageEvent;
+import com.sybase.bpe.event.IResponseMessage;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.IPMIProcess;
+import com.sybase.bpe.instance.service.InstanceService;
+import com.sybase.bpe.interaction.IInteraction;
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionFactory;
+import com.sybase.bpe.lang.ResourceGetter;
+import com.sybase.bpe.scope.service.BPRuntimeException;
+import com.sybase.bpe.scope.service.ScopePath;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+import com.sybase.bpe.event.BPELStaticKey;
+import com.sybase.bpe.interaction.Invocation;
+import com.sybase.bpe.interaction.query.JaxenXPathSingleNodeQuery;
+
+/**
+ * A CorrelationService implementation will allow processes to  register
+ * correlations  Underneath the CorrelationService is a repostory that stores these
+ * correlations. CorrelationServcies are obtained from the 
+ * CorrelationServiceFactory.getCorrelationService() function. 
+ * @see com.sybase.bpe.correlation.CorrelationServcieFactory
+ * @see com.sybase.bpe.correlation.StaticRegistration
+ */
+public abstract class CorrelationService {
+
+	private static Logger logger =
+		Logger.getLogger(CorrelationService.class.getName());
+	
+
+	/**
+	 * Create a registration in the repostitory.
+	 * @param regisration
+	 * @throws BPException
+	 */
+	public abstract void createRegistration(Registration regisration)
+		throws BPException;
+		
+	/**
+	 * Remove a registration from the repository.
+	 * @param registration
+	 */
+	public abstract void removeRegistration(Registration registration)
+		throws BPException;
+
+	/**
+	 * Find a list of registrations.
+	 * @param key
+	 * @param keyValues
+	 * @return
+	 * @throws BPException
+	 */
+	public abstract Collection getRegistrations(
+		IStaticKey key,
+		Collection keyValues)
+		throws BPException;
+
+	/**
+	 * Find a list of registrations.
+	 * @param key
+	 * @return
+	 * @throws BPException
+	 */
+	
+	public abstract Collection getRegistrations(IPMIProcess key)
+		throws BPException;
+		
+		
+	/**
+	 * Find a list of registrations.
+	 * @param key
+	 * @return
+	 * @throws BPException
+	 */
+	
+	public abstract Collection getRegistrations(IStaticKey key)
+		throws BPException;
+	
+	
+	/**
+	 * Find a list of registrations.
+	 * @param key
+	 * @return
+	 * @throws BPException
+	 */
+	public abstract Registration getRegistration(IStaticKey key, String operationId, 
+		String processId)
+		throws BPException;
+
+	// lock on a key
+	public void lock(IInternalEventDirector ed, String key) throws BPException {
+		ed.getLockingService().lock(key);
+	}
+
+	// update the correlation service
+	public abstract void update() throws BPException;
+
+	public abstract  ProcessService getProcessService() throws BPException;
+	
+	public abstract boolean isBPELCompliant();
+
+	/**
+	 * Send the message to a particular registration.
+	 * @param registration
+	 * @param me
+	 * @param sync
+	 * @param ed
+	 * @return
+	 */
+	public void routeToRegistration(
+		Registration registration,
+		IRequestMessageEvent me,
+		IInternalEventDirector ed)  throws BPException {
+	    
+	    	
+			
+			String rootDefId = registration.getRootDefId();
+			String defId = registration.getDefId();
+			String rootProcId = registration.getRootProcId();
+			String procId = registration.getProcId();
+			String opId = registration.getOperationId();
+			
+			
+			DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+			IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(rootDefId));
+			IPMDOperation op = root.getOperation(opId);
+			
+			
+			ProcessInstance pi = null;
+			// we are sending this to an existing process instance
+			if ( ! op.isInstanceCreating() ) {
+				pi = getProcessService().lookupProcess( rootProcId, procId );
+
+			// we are making a new process instance 
+			} else {
+				// get the defintion to clone
+
+				ProcessDefinitionKey rpdk = new ProcessDefinitionKey(rootDefId);
+				ProcessDefinitionKey pdk = new ProcessDefinitionKey(defId);
+				IPMDProcess ipmd = ds.getProcessDefintion(pdk,rpdk);
+				// get the parent process
+				IPMIProcess ipmi = getProcessService().getInstanceService().getInstance(
+					rootProcId,rootProcId);
+				// create the process
+				pi = getProcessService().createSubProcess(ipmi,ipmd);		
+				pi.setState(StateEnum.STARTED);	
+				// since we are creating  the proc off the root 
+				// we have to set the scope path to the scope path
+				// of the process that made the registration
+				ProcessInstance regProc = getProcessService().lookupProcess( rootProcId, procId );
+				pi.setScopePath(regProc.getScopePath());
+				
+			}
+			
+			// send the message
+			pi.processEvent(me,ed, ed);
+
+	}
+
+	/**
+	 * Send the message to a particular registrations.
+	 * @param operation
+	 * @param me
+	 * @param sync
+	 * @param ed
+	 * @return
+	 */
+	public void createInstanceAndRoute(
+		IPMDOperation operation,
+		IRequestMessageEvent me,
+		IInternalEventDirector ed)  throws BPException {
+	    
+			// creating root definition
+			ProcessDefinitionKey rpdk = new ProcessDefinitionKey(operation.getRootDefId());
+			ProcessInstance pi = getProcessService().createProcess(rpdk);
+			pi.setState(StateEnum.STARTED);
+ 			
+			// create the root context partition
+			IContextService ctxs = pi.getContextService();
+			IContainer root = ctxs.getRoot();
+			IContainer procroot = (IContainer)root.findChild(pi.getRootKey());
+			if ( procroot == null ) {
+				root.createContainer(pi.getRootKey());
+			}
+			
+			// send the message
+			pi.processEvent(me,ed, ed);
+	}
+
+	/**
+	 * Initialize the CorrelationService.
+	 * @param props	Initialization properties.
+	 */
+	public abstract void init(BPEProperties props, ProcessService ps)
+		throws BPException;
+
+	/**
+	 * Correlate an event.
+	 * @param me the message to correlate
+	 * @param ed the EventDirector
+	 * @return
+	 * @throws BPException
+	 */
+	public IResponseMessage correlateEvent(
+		IRequestMessageEvent me,
+		boolean sync,
+		IInternalEventDirector ed)
+		throws BPException {
+
+		// init the process service
+		getProcessService().init();
+
+		// set the message event on the event director
+		ed.setMessageEvent(me);
+		// this is a new message so null out the return message locator object
+		ed.setReturnMessageMetadata(null);
+		
+		// get the static key
+		IStaticKey key = me.getStaticKey();
+		
+		// object to collect data about sent messages
+		SentMessageData smd = new SentMessageData();		
+
+		// look up noninstance creating operations for this static key
+		// inflight and metadata 
+		Collection defOps = getAllDefinedNonInstanceCreatingOperations(key);
+		
+		Collection keys = computeKeys(me,defOps,false);
+		
+		//sync on computed keys
+		// TODO - make sure these keys are ordered constitently so we don't get dead locks
+		// This is not needed when we are not caching events
+		//lockOnKeys(ed, me.getStaticKey().toString(),keys);
+
+		
+		// if defOps all belong to stateless BPs
+		// we don't need to look up registrations
+		Registration reg = null;
+		if ( isStatefull(defOps)) {
+			// find a regiration to route to
+			// null if there are no registrations to be routed to
+			// this method also locks on the process instance id  so execution is 
+			// serialized for the rest of the transaction
+			reg = findRegistration(ed, key,keys);
+		}
+		
+		// we found a registation
+		// route and return
+		if ( reg != null ) {
+			routeToRegistration(reg,me,ed);
+			sentMessage(smd,
+					ed.getReturnMessageMetadata().getRootProcessID());
+			return mapResult(smd,ed,sync);
+		} 
+			
+		IPMDOperation op = findInstanceCreatingOperation(key);
+		
+		// we found an instance creation operation
+		// route and return
+		if ( op !=null ) {
+			
+			// get root definition to see if it is stateless
+			IPMDRoot root = getProcessService().getInstanceService().getDefinitionService().
+				getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+			
+			// if we need to protect process instantiation and
+			// the process is statefull we need to lock on the root id
+			if (   root.getProtectedInstantiation() && ! root.getIsStateless() )  {
+				
+				// this sync is required to prevent 
+				// serialize multiple instance creating events 
+				// racing toward the same process
+				lock(ed, op.getRootDefId());
+	
+				// we have to perform this check again because the instance could have been
+				// created between the time the first check was performed and the 
+				// time root definition id was syncronized upon.
+				Registration  regRecheck = findRegistration(ed, key,keys);
+				 
+					// we found a registation
+					// route and return
+					if ( regRecheck != null ) {
+						routeToRegistration(regRecheck,me,ed);
+						sentMessage(smd,
+								ed.getReturnMessageMetadata().getRootProcessID());
+						return mapResult(smd,ed,sync);
+					}
+				
+			}
+			
+			// we have an operation 
+			// so create an instance and route to it
+			// note this is called if the defintion is stateless or not
+			createInstanceAndRoute(op,me,ed);
+			sentMessage(smd,
+					ed.getReturnMessageMetadata().getRootProcessID());
+			return mapResult(smd,ed,sync);
+		}
+		
+		
+		if ( defOps == null  && op == null ) {
+			CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { key });
+			cse.log(logger,Level.SEVERE);
+			throw cse;
+		}
+		// At this point an exception should be thrown.  The input event cannot be correlated.  Caching
+		// is still not implement and it is not clear if this is even possible.
+		if (op == null)
+		{
+			BPELStaticKey bpelKey = (BPELStaticKey) key;
+			Object[] obj = new Object[5];
+			obj[0] = bpelKey.getOperation();
+			obj[1] = bpelKey.getPortType();
+			obj[2] = bpelKey.getTargetNamespace();
+			obj[3] = getCorrelationValues( me,defOps );
+			obj[4] = getCorrelationKeys( defOps );
+		
+			CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR_R", obj);
+			cse.log(logger,Level.SEVERE);
+			throw cse;
+		}
+		
+		
+		//cacheEvent(keys,me);
+		// TODO return what when cached?
+		return null;
+	}
+	
+	private IPMDOperation findInstanceCreatingOperation (IStaticKey key) throws BPException {
+		// get a Collection of ids that are instance creating 
+		DefinitionService ds = getProcessService()
+			.getInstanceService().getDefinitionService();
+		Collection ops = ds.getCorrelations(key);
+		
+		if ( ops.size() == 0 ) {
+			return null;
+		}
+		if ( ops.size() > 1 ) {
+			throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+		}
+		
+		IPMDOperation op = (IPMDOperation)ops.iterator().next();
+		return op;
+	}
+	
+	private Registration  findRegistration( IInternalEventDirector ed, IStaticKey key, Collection keys  ) throws BPException {
+		
+		Collection registrations  = getRegistrations(key,keys);
+		
+		if ( registrations.size() > 1 ) {
+			throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+		}
+		
+		Registration reg = null;
+		
+		if ( registrations.size() > 0 ) {
+			reg = (Registration)registrations.iterator().next();
+
+			// look the process instance so that we get a protected registration lookup
+			// this is necessary because an in flight crank of the engine could remove 
+			// a registration
+			lockRootProcessInstance(ed, reg);
+			
+			// select again because the instance coulde have been satisfied and gone away			
+/*	This is not needed if we are not caching events.
+ * registrations  = getRegistrations(key,keys);
+			
+			if ( registrations.size() > 1 ) {
+				throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+			}
+			
+			reg = (Registration)registrations.iterator().next();*/
+					
+		} 
+		
+		return reg;
+		
+	}
+	
+	private  Collection getAllDefinedNonInstanceCreatingOperations(IStaticKey key) throws BPException {
+		// get def service
+		DefinitionService ds = getProcessService()
+			.getInstanceService().getDefinitionService();
+		return ds.getNonInstanceCreatingOps(key);
+	}
+
+	private IResponseMessage mapResult(SentMessageData smd, 
+			IInternalEventDirector ed, boolean sync ) throws BPException {
+		// build the return message if needed
+		if (sync && ! smd.spray ) {
+			// create the return message; no fault has occured
+			IResponseMessage mer = ed.getMessageEvent().createResponseMessage();
+			// get the return metadata
+			ReturnMessageLocatorHolder rmlh = ed.getReturnMessageMetadata();
+
+			// Test to see if the reply has a fault
+			Fault replyFault = null;
+			if ( rmlh.getFaultName() != null ) {
+				replyFault = new Fault();
+				replyFault.setFaultString(rmlh.getFaultName());
+				BPRuntimeException bpre = new BPRuntimeException(rmlh.getFaultName(),"");
+				bpre.setNameSpace(rmlh.getFaultNS());
+				replyFault.setFaultException(bpre);
+				mer.setFault(replyFault);
+			}
+
+			Iterator it = rmlh.getLocators();
+			InstanceService is = ed.getProcessService().getInstanceService();
+			IPMIProcess ipmip =
+				is.getInstance(
+					rmlh.getRootProcessID(),
+					rmlh.getRootProcessID());
+			// create a resolver so we can get at the data
+			ContextResolver resolver =
+				new ContextResolver(
+					ipmip,
+					rmlh,
+					ed.getProcessService().getContextService(ipmip),
+					ed.getProcessService().getScopeService(ipmip));
+
+			try
+			{
+//				 Some debugging code that dumps the 
+				// process context.
+				SerializationContext sc = new SerializationContext( System.out );
+				sc.printComment( "Correlation Service Context:");
+				sc.serialize( resolver.getContextService() );
+			} catch (Exception e)
+			{
+				e.printStackTrace();
+			}
+			
+			
+			// we might be building the return message after the scope has
+			// ended so we need to hijack the 
+			// current scope of the process
+			ScopePath saveSP = ipmip.getScopePath();
+			ipmip.setScopePath(rmlh.getScopePath());
+			
+			// loop over the return metadata
+			while (it.hasNext()) {
+				IPMDLocator loc = (IPMDLocator) it.next();
+				String name = loc.getName();
+				String[] name_split = name.split(":");
+
+				// if it's not a correlation set locator
+				Collection corrlSets = rmlh.getCorrlSets();
+				if ( corrlSets == null || ! corrlSets.contains(name_split[0])) {
+					if (logger.isLoggable(Level.FINE)) {
+						logger.fine("Adding " + loc.getName()
+								+ " to return message");
+					}
+					IResolvedObject ro = null;
+
+					// resolve the object
+					ro = resolver.resolveBPContext(loc.getName());
+
+					// set the part 
+					Object obj = ro.getValue();
+					IInteraction interaction = null;
+					if (obj instanceof IInteraction) {
+						interaction = (IInteraction) ro.getValue();
+					} else if (obj instanceof String) {
+						// create a new interaction
+						interaction = InteractionFactory.newInstance()
+								.createXMLInteraction(
+										((String) ro.getValue()).getBytes());
+					}
+
+					if (replyFault == null) {
+						mer.setPart(name_split[1], interaction);
+					} else {
+						BPRuntimeException bpre = (BPRuntimeException) replyFault
+								.getFaultException();
+						bpre.addPartMessage(name_split[1], interaction);
+					}
+				}
+
+			}
+			
+			// put back the scope
+			ipmip.setScopePath(saveSP);
+
+			// update the instance service
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine(
+					"updatting instance service and removing lock for:"
+						+ ipmip.getRootKey());
+			}
+			ed.getProcessService().update(ipmip.getRootKey(), ed, ed);
+
+			return mer;
+		} else {
+
+			Iterator it = smd.rootIds.iterator();
+			while ( it.hasNext() ) {
+				ed.getProcessService().update((String)it.next(), ed, ed);	
+			}
+			return ed.getMessageEvent().createResponseMessage();
+		}
+
+	}
+	
+	private String getCorrelationKeys(Collection operations)
+	{
+		String rtnVal = new String();
+		Iterator it = operations.iterator();
+		while (it.hasNext()) 
+		{
+			IPMDOperation op = (IPMDOperation) it.next();
+			if (op.getCorrelation() != null)
+			{
+				Iterator it2 = op.getCorrelation().getCorrelationSets().iterator();
+				while (it2.hasNext()) 
+				{
+					IPMDCorrelationSet corrlset = (IPMDCorrelationSet) it2.next();
+					Iterator it3 = corrlset.getPartQueries().iterator();
+					while (it3.hasNext())
+					{
+						PartQuery pq = (PartQuery) it3.next();
+						rtnVal += pq.getPartName();
+						if ((pq.getQuery() != null) && (pq.getQuery() instanceof Invocation))
+						{
+							Invocation query = (Invocation) pq.getQuery();
+							if (query.getQuery() instanceof JaxenXPathSingleNodeQuery)
+							{
+								JaxenXPathSingleNodeQuery jsnq = (JaxenXPathSingleNodeQuery)query.getQuery();
+								try
+								{
+									rtnVal += ":";
+									rtnVal += jsnq.getXPathExpression();
+								}
+								catch(Exception e)
+								{
+									// Just eat this error.  We are in the process of building up an error to throw.
+									// All we are trying to do is capture more information to log.
+								}
+							}
+						}
+						rtnVal += " ";
+					}
+				}
+			}
+		}
+		return rtnVal;
+	}
+	
+	// Get the correlation values for error message when something goes wrong.
+	private String getCorrelationValues(IRequestMessageEvent me, Collection operations)
+	{
+		String rtnVal = new String();
+		Iterator it = operations.iterator();
+		while (it.hasNext()) {
+			IPMDOperation op = (IPMDOperation) it.next();
+			if (op.getCorrelation() != null) 
+			{
+				// create the full key, in a process creating instance this should be the only
+				// key created
+				Iterator it2 =
+					op.getCorrelation().getCorrelationSets().iterator();
+				while (it2.hasNext()) {
+					IPMDCorrelationSet corrlset =
+						(IPMDCorrelationSet) it2.next();
+					//rtnVal += makeKeyValue(me, corrlset.getPartQueries());
+					//rtnVal += " ";
+					
+					try
+					{
+						Iterator it3 = corrlset.getPartQueries().iterator();
+						while (it3.hasNext())
+						{
+							String s;
+							PartQuery pq = (PartQuery) it3.next();
+							
+							IInteraction part = me.getPart(pq.getPartName());
+							if (part != null)
+							{
+								IInvocation invocation = ( IInvocation )pq.getQuery();
+								if ( invocation != null ) {			
+									// invoke the interaction
+									s = ( String ) (part.invoke(invocation)).toString();
+								} else {
+									s = part.toString();
+								}
+	
+								// append to the buffer if it is not null
+								if (s != null) {
+									rtnVal += s;
+									rtnVal += " ";
+								}
+							}
+						}
+					}
+					catch (Exception e) {
+						// TODO: Cory - please verify no rethrow
+						logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+						return null;
+					}	
+				}
+			}
+		}
+		return rtnVal;
+	}
+
+	// compute a matching list of key values for a list of registrations
+	private Collection computeKeys(IRequestMessageEvent me, Collection operations, boolean dups) {
+		Collection keys = null;
+		if ( dups ) {
+			keys = new ArrayList();
+		}  else {
+			keys = new HashSet();
+		}
+		
+		Iterator it = operations.iterator();
+		while (it.hasNext()) {
+			IPMDOperation op = (IPMDOperation) it.next();
+			if (op.getCorrelation() == null) {
+				keys.add(null);
+			} else {
+				// create the full key, in a process creating instance this should be the only
+				// key created
+				StringBuffer buff = new StringBuffer("");
+				Iterator it2 =
+					op.getCorrelation().getCorrelationSets().iterator();
+				while (it2.hasNext()) {
+					IPMDCorrelationSet corrlset =
+						(IPMDCorrelationSet) it2.next();
+					buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+				}
+				keys.add(buff.toString());
+				
+			
+				// create a key based on only the non initatiating correlation sets 
+				buff = new StringBuffer("");
+				boolean foundNonInstantiating = false;
+				it2 =
+					op.getCorrelation().getCorrelationSets().iterator();
+				while (it2.hasNext()) {
+					IPMDCorrelationSet corrlset =
+						(IPMDCorrelationSet) it2.next();
+					if ( ! corrlset.isInstantiating() ) {
+						buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+						foundNonInstantiating = true;
+					}
+				}
+				if ( ! buff.toString().equals("")) {
+					keys.add(buff.toString());
+				}
+				// if this operation only had instantiating keys
+				// there could be a registration with a null
+				// key(no correlations)
+				if ( ! foundNonInstantiating ) {
+					keys.add(null);
+				}
+			}
+		}
+		
+		return keys;
+	}
+
+	/**
+	 * Generate a key value
+	 * @param me
+	 * @param keyMetaData
+	 * @return
+	 */
+	private String makeKeyValue(IRequestMessageEvent me, Collection keyMetaData) {
+
+		// StringBuffer to build the key value
+		StringBuffer buff = new StringBuffer();
+
+		try {
+
+			// generate a key value
+			Iterator it = keyMetaData.iterator();
+			String s = null;
+			while (it.hasNext()) {
+
+				PartQuery pq = (PartQuery) it.next();
+
+				// check to see if the message event has a maching part
+				IInteraction part = me.getPart(pq.getPartName());
+				if (part != null) {
+			
+					IInvocation invocation = ( IInvocation )pq.getQuery();	
+					
+					if ( invocation != null ) {			
+						// invoke the interaction
+						s = ( String ) (part.invoke(invocation)).toString();
+					} else {
+						s = part.toString();
+					}
+
+					// append to the buffer if it is not null
+					if (s != null) {
+						buff.append(s);
+					}
+				}
+			}
+
+		} catch (Exception e) {
+			// TODO: Cory - please verify no rethrow
+			logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+			return null;
+		}
+
+		return buff.toString();
+
+	}
+	
+	private boolean isStatefull(Collection defOps) throws BPException {
+		boolean ret = false;
+		DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+		for ( Iterator iter = defOps.iterator(); iter.hasNext(); ) {
+			IPMDOperation op = (IPMDOperation) iter.next();
+			op.getRootDefId();
+			IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+			if ( ! root.getIsStateless() ) {
+				ret = true;
+				break;
+			}
+			
+		}
+		return ret;
+	}
+
+	// if we have already sent a message and this is a bpel compliant engine
+	// throw an error, check for a spray, and gather spray data
+	private void sentMessage(SentMessageData smd, String rootid) 
+		throws CorrelationServiceException {
+			
+			if ( smd.messageSent && isBPELCompliant() ) {
+				CorrelationServiceException cse = new CorrelationServiceException("CS_NON_BPEL",null);
+				cse.log(logger,Level.SEVERE);
+				throw cse;
+			}
+			if ( smd.messageSent ) {
+				smd.spray = true;
+			}
+			smd.rootIds.add(rootid);
+			smd.messageSent = true;
+	}
+	
+//	private void lockOnKeys(IInternalEventDirector ed, String prefix, Collection keys) throws BPException {
+//		Iterator it = keys.iterator();
+//		while ( it.hasNext() ) {
+//			String key = (String)it.next();
+//			lock(ed, prefix+key);
+//		}
+//	}
+	
+//	private void cacheEvent(Collection keys, IRequestMessageEvent me) throws BPException {
+//		// TODO implement caching
+//		CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { me.getStaticKey().toString() });
+//		cse.log(logger,Level.SEVERE);
+//		throw cse;
+//	}
+	
+	private void lockRootProcessInstance(IInternalEventDirector ed, Registration reg) throws BPException {
+		lock(ed, reg.getRootProcId());
+	}
+	
+	class SentMessageData {
+		
+		// spray flag, if this event is routed to more than one process is is a spray
+		boolean spray = false;
+		// a list of rootid to update at the end, needed for the spray case
+		ArrayList rootIds = new ArrayList();
+		// flag to see if we sent this message anywhere
+		boolean messageSent = false;
+		
+	}
+
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,54 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import com.sybase.bpe.util.BPException;
+
+/**
+ * Exception class for the CorrelationService.
+ */
+public class CorrelationServiceException extends BPException {
+	
+	static final long serialVersionUID = -6546885597497431542L;
+
+
+	/**
+	 * @param message_id
+	 * @param msgParams
+	 */
+	public CorrelationServiceException(String message_id, Object[] msgParams) {
+		super(message_id, msgParams);
+	}
+
+	/**
+	 * @param message_id
+	 * @param msgParams
+	 * @param cause
+	 */
+	public CorrelationServiceException(
+		String message_id,
+		Object[] msgParams,
+		Throwable cause) {
+		super(message_id, msgParams, cause);
+	}
+
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,70 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+public class CorrelationServiceFactory
+{
+	
+	private static Logger logger = 
+		Logger.getLogger(CorrelationServiceFactory.class.getName());
+	
+   /** @param properties */
+   public static CorrelationService createCorrelationService(BPEProperties props,
+	ProcessService ps) 
+	throws BPException {
+    	  
+	 CorrelationService cs = null;
+   	
+
+		try {
+			// load the implementation
+			Class instClass = 
+				java.lang.Class.forName(props.getCorrelationServiceClass());
+			// try to instantiate the subclass
+			cs = (CorrelationService) instClass.newInstance();
+			cs.init(props,ps);
+			
+		} catch (ClassNotFoundException e) {
+			CorrelationServiceException bpx = new CorrelationServiceException("CLASS_NOT_FOUND",new Object[] {props.getCorrelationServiceClass()});
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		} catch (InstantiationException e) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"InstantiationException"},e);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		} catch (IllegalAccessException e) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"IllegalAccessException"},e);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		}	
+      
+	  return cs;
+   }
+
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,121 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionException;
+import com.sybase.bpe.interaction.InvocationFactory;
+
+/**
+ * A PartQuery is just a object that holds a part name and a query object for that part.
+ * StaticRegistrations and DynamicRegistrations build an array of these to represent 
+ * metadata that is used to generate a correlation key value.
+ */
+public class PartQuery implements Serializable {
+	
+    static final long serialVersionUID = -2542067136142474418L;
+	
+	private String part;
+	private Object obj;
+	private String locatorName;
+	
+	public PartQuery(){}
+	
+	/**
+	 * Constructor for a PartQuery
+	 */
+	public PartQuery(String part, Object obj, String locatorName, HashMap nsMap) {
+		this.part = part;
+		if ( obj instanceof String )
+		{
+			try
+			{
+				IInvocation invocation = 
+				InvocationFactory.newInstance().
+				//createXPathQueryNodeValueInvocation( ( String ) obj, new HashMap());
+				createXPathQueryNodeValueInvocation( ( String ) obj, nsMap);
+				this.obj = invocation;
+			} catch (InteractionException e)
+			{
+				e.printStackTrace();
+			}
+			
+		}
+		else
+		{
+			this.obj = obj;
+		}
+		
+		this.locatorName = locatorName;
+	}
+	
+	/**
+	 * Set the part name.
+	 * @param	part 	The part name.
+	 */
+	public void setPartName(String part) {
+		this.part = part;
+	}
+	
+	/**
+	 * Get the part name.
+	 * @return	The part name.
+	 */
+	public String getPartName() {
+		return this.part;
+	}
+	
+	/**
+	 * Set the query object.
+	 * @param	obj	The query.
+	 */
+	public void setQuery(Object obj) {
+		this.obj = obj;
+	}
+	
+	/**
+	 * Get the query object.
+	 * @return The query object.
+	 */
+	public Object getQuery() {
+		return this.obj;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getLocatorName() {
+		return locatorName;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setLocatorName(String string) {
+		locatorName = string;
+	}
+	
+
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,210 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+/*
+ * Created on Aug 12, 2003
+ *
+ */
+package com.sybase.bpe.correlation;
+
+import java.io.Serializable;
+
+/**
+ * @author charper
+ *
+ */
+public class Registration implements Serializable {
+	
+    static final long serialVersionUID = 2061953613411330775L;
+	
+	private String staticKeyValue;
+	private String rootDefId;
+	private String defId;
+	private String rootProcId;
+	private String procId;
+	private String keyValue;
+	private String operationId;
+	
+	public Registration(String staticKeyValue, String operationId, String rootDefId, String defId,
+		String rootProcId, String procId, String keyValue) {
+			this.staticKeyValue = staticKeyValue;
+			this.rootDefId = rootDefId;
+			this.defId = defId;
+			this.rootProcId = rootProcId;
+			this.procId = procId;
+			this.keyValue = keyValue;
+			this.operationId = operationId;
+		}
+
+	/**
+	 * @return
+	 */
+	public String getDefId() {
+		return defId;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getKeyValue() {
+		return keyValue;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getProcId() {
+		return procId;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getRootDefId() {
+		return rootDefId;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getRootProcId() {
+		return rootProcId;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getStaticKeyValue() {
+		return staticKeyValue;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setDefId(String string) {
+		defId = string;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setKeyValue(String string) {
+		keyValue = string;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setProcId(String string) {
+		procId = string;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setRootDefId(String string) {
+		rootDefId = string;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setRootProcId(String string) {
+		rootProcId = string;
+	}
+
+	/**
+	 * @param key
+	 */
+	public void setStaticKeyValue(String key) {
+		staticKeyValue = key;
+	}
+
+	/**
+	 * @return
+	 */
+	public String getOperationId() {
+		return operationId;
+	}
+
+	/**
+	 * @param string
+	 */
+	public void setOperationId(String string) {
+		operationId = string;
+	}
+
+	/* (non-Javadoc)
+	 * @see java.lang.Object#toString()
+	 */
+	public String toString() {
+		return "Registration(staticKeyValue:"+staticKeyValue+
+			" rootDefId:"+rootDefId+
+			" defId:"+defId+
+			" rootProcId:"+rootProcId+
+			" procId:"+procId+
+			" keyValue:"+keyValue+
+			" operationId:"+operationId+")";
+	}
+	/* (non-Javadoc)
+	 * @see java.lang.Object#equals(java.lang.Object)
+	 */
+	public boolean equals(Object obj) {
+		if ( obj instanceof Registration ) {
+			Registration reg = (Registration)obj;
+			if ( 
+				staticKeyValue == null ? 
+						reg.getStaticKeyValue() == null : 
+							( reg.getStaticKeyValue() != null && 
+							        reg.getStaticKeyValue().equals(staticKeyValue)) &&
+				rootDefId == null ?
+						reg.getRootDefId() == null :
+							( reg.getRootDefId() != null && 
+							        reg.getRootDefId().equals(rootDefId) )&&
+				defId == null ? 
+						reg.getDefId() == null :
+						    ( reg.getDefId() != null  &&
+						            reg.getDefId().equals(defId) ) &&
+				rootProcId == null ? 
+						reg.getRootProcId() == null : 
+						    ( reg.getRootProcId() != null &&
+						            reg.getRootProcId().equals(rootProcId) ) &&
+				procId == null ? 
+						reg.getProcId() == null :
+						    ( reg.getProcId() != null &&
+						            reg.getProcId().equals(procId) ) &&
+				keyValue == null ? 
+						reg.getKeyValue() == null : 
+							( reg.getKeyValue() != null &&
+							        reg.getKeyValue().equals(keyValue) ) &&
+				operationId == null ?
+						reg.getOperationId() == null :
+						    ( reg.getOperationId() != null &&
+						            reg.getOperationId().equals(operationId) ) ) {
+				return true;
+			} else {
+				return false;
+			}
+		} else {
+			return false;
+		}
+	}
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,156 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+
+package com.sybase.bpe.correlation.keys;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.action.internal.ActionException;
+import com.sybase.bpe.context.resolver.ContextResolvedObject;
+import com.sybase.bpe.context.resolver.ContextResolver;
+import com.sybase.bpe.correlation.PartQuery;
+import com.sybase.bpe.definition.IPMDCorrelation;
+import com.sybase.bpe.definition.IPMDCorrelationSet;
+import com.sybase.bpe.definition.IPMDOperation;
+import com.sybase.bpe.interaction.IInteraction;
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionException;
+import com.sybase.bpe.interaction.InteractionFactory;
+import com.sybase.bpe.util.BPException;
+
+public class CorrelationKeysUtil {
+	
+	private static Logger logger =
+		Logger.getLogger(CorrelationKeysUtil.class.getName());
+	
+	public static boolean hasCorrelationSets(Collection operations) {
+		boolean ret = false;
+		for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+			IPMDOperation op = (IPMDOperation) opIter.next();
+			IPMDCorrelation correlation = op.getCorrelation();
+			if ( correlation != null && 
+					! correlation.getCorrelationSets().isEmpty() ) {
+				ret = true;
+				break;
+			}
+		}
+		return ret;
+	}
+	
+
+	public static boolean setCorrelationKeys(
+		ContextResolver resolver, 
+		Map parts,
+		Collection operations,
+		String pattern)
+		throws BPException {
+
+		boolean initializedSet = false;
+
+		logger.fine("Trying to Initailize a correlation for set.");
+
+		for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+			IPMDOperation op = (IPMDOperation) opIter.next();
+
+			IPMDCorrelation correlation = op.getCorrelation();
+
+			if (correlation == null)
+				continue;
+			
+
+			// loop over all the correlation sets and initialize them if uninitalized
+			for (Iterator csIter = correlation.getCorrelationSets().iterator();
+				csIter.hasNext();
+				) {
+
+				IPMDCorrelationSet cs = (IPMDCorrelationSet) csIter.next();
+
+				if (cs.isInstantiating()) {
+
+					if ( pattern == null || cs.getPattern().equals(pattern) ) {
+						
+						for (Iterator pqsIter = cs.getPartQueries().iterator(); pqsIter
+								.hasNext();) {
+
+							PartQuery pq = (PartQuery) pqsIter.next();
+
+							ContextResolvedObject robj = (ContextResolvedObject) resolver
+									.resolveBPContext(pq.getLocatorName());
+
+							// if the correlation has been set then break the set loop
+							if (robj.getValue() != null)
+								break;
+
+							// check to see if the message event has a maching part
+							initializedSet = true;
+							Object part = parts.get(pq.getPartName());
+							if (part != null) {
+								robj.setObject(getPartData(part, pq));
+							}
+						}
+					}
+				}
+			}
+		}
+		return initializedSet;
+	}
+
+	
+	private static Object getPartData(Object inputObj, PartQuery pq)
+	throws InteractionException, ActionException {
+
+	IInteraction interaction = null;
+
+	if (inputObj instanceof String) {
+		String part = (String) inputObj;
+		// create a new interaction
+		interaction =
+			InteractionFactory.newInstance().createXMLInteraction(
+				part.getBytes());
+	} else if (inputObj instanceof IInteraction) {
+		interaction = (IInteraction) inputObj;
+	} else {
+		ActionException bpx = new ActionException("UNKNOWN_DATATYPE", null);
+		bpx.log(logger,Level.SEVERE);
+		throw bpx;
+	}
+
+	// create an invocation
+	IInvocation invocation = ( IInvocation ) ( pq.getQuery());
+
+	// invoke the interaction
+	Object resultObject = null;
+	if ( invocation != null ) {
+		resultObject = interaction.invoke(invocation);
+	} else {
+		resultObject = interaction.toString();
+	}
+
+	return resultObject;
+}
+
+
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,483 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package com.sybase.bpe.correlation.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ejb.CreateException;
+import javax.ejb.EJBException;
+import javax.ejb.FinderException;
+import javax.ejb.ObjectNotFoundException;
+import javax.ejb.RemoveException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.sybase.bpe.correlation.CorrelationService;
+import com.sybase.bpe.correlation.CorrelationServiceException;
+import com.sybase.bpe.correlation.Registration;
+import com.sybase.bpe.correlation.managed.RegistrationEntityLocal;
+import com.sybase.bpe.correlation.managed.RegistrationEntityLocalHome;
+import com.sybase.bpe.correlation.unmanaged.CorrelationServiceSLImpl;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.IPMIProcess;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+/**
+ * @author charper
+ * 
+ * <bkl>
+ * I added an in-memory caching layer to this service so that
+ * cmp registration operations could be bundled up at the
+ * end of the transaction rather than interspersed throughout.
+ * This bundling reduces the deadlock hazard.
+ * </bkl>
+ *
+ */
+public class CorrelationServiceEjbImpl extends CorrelationService 
+{
+    
+	
+	private static Logger logger =
+		Logger.getLogger(CorrelationServiceEjbImpl.class.getName());
+	
+	private LinkedList registrationOperations = new LinkedList();
+	private LinkedList registrationsForRemoval = new LinkedList();
+	private ProcessService ps;
+	private boolean isBPELCompliant;
+	private RegistrationEntityLocalHome relh;
+	public static final String JNDI_RELH_NAME = "java:comp/env/registrationBean";
+	private CorrelationServiceSLImpl  transientCS;
+	
+/*
+	private void trace( String message )
+	{
+	    System.out.println("CSTrace - " + message + " iid=" + this.hashCode());
+	    System.out.flush();
+	}
+
+	private void trace( String message, Object obj )
+	{
+	    trace(message);
+	    
+	    XStream xstream = new XStream();
+	    String xmlString = xstream.toXML(obj);
+	    System.out.println(xmlString);
+	    System.out.flush();
+	    
+	}
+*/
+	public CorrelationServiceEjbImpl()
+	{
+	    //trace( "Inside CorrelationServiceEjbImpl()");
+	    transientCS = new CorrelationServiceSLImpl();
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#createRegistration(com.sybase.bpe.correlation.Registration)
+	 */
+	public void createRegistration(Registration registration)
+	   throws CorrelationServiceException 
+	{
+	    //trace( "Inside createRegistration(Registration registration)", registration);
+	    CreateRegistration cr = new CreateRegistration();
+	    cr.reg = registration;
+	    registrationOperations.add(cr);
+	    transientCS.createRegistration(registration);
+	}
+	
+	private void createRegistrationEntity(Registration registration)
+		throws CorrelationServiceException {
+			
+	    //trace( "Inside createRegistrationEntity(Registration registration)", registration );
+			try {
+				relh.create(
+					registration.getStaticKeyValue(),
+					registration.getRootDefId(),
+					registration.getDefId(),
+					registration.getRootProcId(),
+					registration.getProcId(),
+					registration.getKeyValue(),
+					registration.getOperationId());
+			} catch (CreateException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"CreateException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey, java.util.Collection)
+	 */
+	public Collection getRegistrations(IStaticKey key, Collection keyValues)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IStaticKey key, Collection keyValues)", 
+	    	//        new Object[] {key, keyValues});
+		Collection regs = new HashSet();
+		Iterator it = keyValues.iterator();
+		while (it.hasNext()){
+			String keyValue = (String)it.next();
+			Collection beans;
+			try {
+				// if the key value is null look for all registration
+				// with the given static key, the correlation service
+				// will decide how to handle this if there are multiple
+				// registrations found
+				if ( keyValue == null ) {
+					beans =
+						relh.findByStaticKey(key.toString());
+				} else {
+					beans =
+						relh.findByStaticKeyandKeyValue(key.toString(), keyValue);
+				}
+			} catch (FinderException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+			
+			Collection regObjs = makeRegistrations(key.toString(),beans);
+			for (Iterator iter = regObjs.iterator(); iter.hasNext();) {
+				Object element = (Object) iter.next();
+				boolean dup = false;
+				for (Iterator iterator = regs.iterator(); iterator
+						.hasNext();) {
+					Object reg = (Object) iterator.next();
+					if ( element.equals(reg)) {
+						dup = true;
+						break;
+					}
+				}
+				if ( ! dup ) {
+					regs.add(element);
+				}
+			}
+			
+			
+			regs = screenRegs(regs);
+			
+			//Add the registrations which were created in the current transaction
+			//but have not been persisted yet.
+			regs.addAll(transientCS.getRegistrations(key, keyValues));
+		}
+		return regs;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.instance.service.IPMIProcess)
+	 */
+	public Collection getRegistrations(IPMIProcess key)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IPMIProcess key)", key);
+			Collection beans;
+			try {
+				beans = relh.findByProcId(key.getKey());
+			} catch (FinderException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+			Collection regs = makeRegistrations(key.getKey(),beans);
+			regs = screenRegs(regs);
+			
+			//Add the registrations which were created in the current transaction
+			//but have not been persisted yet.
+			regs.addAll( transientCS.getRegistrations(key));
+			return regs;
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey)
+	 */
+	public Collection getRegistrations(IStaticKey key)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IStaticKey key)", key);
+			Collection beans;
+			try {
+				beans = relh.findByStaticKey(key.toString());
+			} catch (FinderException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+			Collection regs = makeRegistrations(key.toString(), beans);
+			regs = screenRegs(regs);
+			
+			//Add the registrations which were created in the current transaction
+			//but have not been persisted yet.
+			Collection transientRegs = transientCS.getRegistrations(key);
+			//trace("Adding transient regs", transientRegs);
+			regs.addAll( transientRegs );
+			return regs;
+		
+	}
+	
+	// Called by the various getRegistrations methods.
+	// If the registrations have been removed but the removal
+	// has not been persistent yet, don't report the registration.
+	private Collection screenRegs( Collection regs )
+	{
+	    LinkedList retVal = new LinkedList();
+	    Iterator iter = regs.iterator();
+	    while( iter.hasNext() )
+	    {
+	        Registration candidate  = ( Registration )( iter.next());
+	        if ( !( registrationsForRemoval.contains( candidate) ) ) 
+	        {
+	            retVal.add(candidate);
+	        }
+	    }
+	    return retVal;
+	}
+	
+	
+	private Collection makeRegistrations(String key, Collection beans){
+	    //trace( "Inside makeRegistrations(String key, Collection beans)", 
+	    //        new Object[]{key, beans});
+		ArrayList regs = new ArrayList ();
+		Iterator it = beans.iterator();
+		while ( it.hasNext() ){
+			RegistrationEntityLocal bean = (RegistrationEntityLocal)it.next();
+			regs.add(
+				new Registration(
+					key,
+					bean.getOperationId(),
+					bean.getRootDefId(),
+					bean.getDefId(),
+					bean.getRootProcId(),
+					bean.getProcId(),
+					bean.getKeyValue()
+					));
+		}
+		return regs;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#update()
+	 */
+	public void update() throws BPException {
+	    //trace( "Inside update()");
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getProcessService()
+	 */
+	public ProcessService getProcessService() throws BPException {
+	    //trace( "Inside getProcessService()");
+		return ps;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#isBPELCompliant()
+	 */
+	public boolean isBPELCompliant() {
+	    //trace( "Inside isBPELCompliant()");
+		return isBPELCompliant;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#init(com.sybase.bpe.util.BPEProperties, com.sybase.bpe.engine.ProcessService)
+	 */
+	public void init(BPEProperties props, ProcessService ps)
+		throws CorrelationServiceException {
+	    //trace( "Inside init(BPEProperties props, ProcessService ps)", new Object[]{props, ps});
+		this.ps = ps;
+		if (props.getBPELCompliant().equals("TRUE")) {
+			this.isBPELCompliant = true;
+		}
+		
+		try {
+			InitialContext ic = new InitialContext();
+			relh = (RegistrationEntityLocalHome) ic.lookup(JNDI_RELH_NAME);
+		} catch (NamingException ne) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"NamingException"},ne);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		}
+
+	}
+	
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistration(com.sybase.bpe.event.IStaticKey, java.lang.String, java.lang.String)
+	 */
+	public Registration getRegistration(IStaticKey key, String operationId, String processId) throws CorrelationServiceException {
+	    //trace( "Inside getRegistration(IStaticKey key, String operationId, String processId)",
+	    //        new Object[]{key, operationId, processId});
+	    Collection beans;
+		try {
+			beans = relh.findByStaticKeyandOpIdandProcId(key.toString(),operationId,processId);
+		} catch (FinderException e) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		}
+		Iterator it = makeRegistrations(key.toString(),beans).iterator();
+		
+		Registration retVal = null;
+		
+		// there should only be one of these
+		if ( it.hasNext() ) 
+		{
+			retVal = (Registration)it.next();
+			
+			//The registration could already have been
+			//deleted but the deletion may not have been
+			//persisted yet.  Consult the removal list to
+			//find out.
+			//trace( "Checking transient registration removal list.");
+			if ( registrationsForRemoval.contains(retVal))
+			{
+			    //trace("Transient deletion detected.");
+			    retVal = null;
+			}
+		}
+		
+		
+		if ( retVal == null )
+		{
+		    //There could be a registration which has not been persisted yet.
+		    //Consult the transient correlation service to find out.
+			retVal = transientCS.getRegistration(key, operationId, processId);
+			if ( retVal != null )
+			{
+			    //trace("Transient registration found.");
+			}
+		}
+		
+		return retVal;
+	}
+	
+	public void removeRegistration(Registration registration) 
+	   throws CorrelationServiceException 
+	{
+	    //trace( "Inside removeRegistration(Registration registration)", registration);
+	    RemoveRegistration rr = new RemoveRegistration();
+	    rr.reg = registration;
+	    registrationOperations.add(rr);
+	    transientCS.removeRegistration(registration); 
+	    registrationsForRemoval.add(registration);
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#removeRegistration(com.sybase.bpe.correlation.Registration)
+	 */
+	protected void removeRegistrationEntity(Registration registration) 
+	  throws CorrelationServiceException {
+	    //trace( "Inside removeRegistrationEntity(Registration registration)", registration);
+		try {
+			RegistrationEntityPK pk = new RegistrationEntityPK();
+			pk.setOperationId(registration.getOperationId());
+			pk.setProcId(registration.getProcId());
+			pk.setRootDefId(registration.getRootDefId());
+			pk.setRootProcId(registration.getRootProcId());
+			pk.setStaticKeyValue(registration.getStaticKeyValue());
+			RegistrationEntityLocal bean = relh.findByPrimaryKey(pk);
+			bean.remove();
+		} catch ( ObjectNotFoundException e ) {
+			// ignore requests to remove regstrations that are not there
+		} catch (FinderException e) {
+			logger.log(Level.WARNING,"",e);
+		} catch (EJBException e) {
+			logger.log(Level.WARNING,"",e);
+		} catch (RemoveException e) {
+			logger.log(Level.WARNING,"",e);
+		}
+	}
+	
+	// Delete old regs and insert new regs.  This method
+	// was added so that the removal of the registrations from the 
+	// database could be sequenced at the end of the transaction
+	// rather than interspersed throughout the transaction causing
+	// a deadlock hazard.
+	public void persistRegistrationChanges() throws CorrelationServiceException
+	{
+	    //trace( "Inside persistRegistrationChanges()");
+	    Iterator iter = registrationOperations.iterator();
+	    while( iter.hasNext() )
+	    {
+	        Object obj = iter.next();
+	        if ( obj instanceof RemoveRegistration )
+	        {
+	            RemoveRegistration rr = 
+	                (RemoveRegistration)(obj);
+	            removeRegistrationEntity( rr.reg );
+	        }
+	        if ( obj instanceof CreateRegistration )
+	        {
+	            CreateRegistration cr = 
+	                (CreateRegistration)(obj);
+	            createRegistrationEntity( cr.reg );
+	        }
+	    }
+	    clearState();
+	    
+	    /*
+	    Iterator iter = registrationsForRemoval.iterator();
+	    while( iter.hasNext() )
+	    {
+	        Registration reg = ( Registration )( iter.next() );
+	        removeRegistrationEntity( reg );
+	    }
+	    registrationsForRemoval.clear();
+	    
+	    iter = newRegistrations.iterator(); 
+	    while( iter.hasNext() )
+	    {
+	        Registration reg = ( Registration )( iter.next() );
+	        createRegistrationEntity( reg );
+	    }
+	    newRegistrations.clear();
+	    */
+	}
+	
+	public void clearState()
+	{
+	    //trace( "Inside clearState()");
+	    registrationsForRemoval.clear();
+	    registrationOperations.clear();
+	    //newRegistrations.clear();
+	    transientCS = new CorrelationServiceSLImpl();
+	}
+	
+	private class RemoveRegistration
+	{
+	    public Registration reg;
+	}
+	
+	private class CreateRegistration
+	{
+	    public Registration reg;
+	}
+}

Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep Tue Feb 28 08:02:48 2006
@@ -0,0 +1,644 @@
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package com.sybase.bpe.correlation.ejbimpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ejb.CreateException;
+import javax.ejb.EJBException;
+import javax.ejb.FinderException;
+import javax.ejb.ObjectNotFoundException;
+import javax.ejb.RemoveException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.sybase.bpe.context.IContextService;
+import com.sybase.bpe.context.base.IDataObjectFactory;
+import com.sybase.bpe.context.ejb.EJBDataObjectFactory;
+import com.sybase.bpe.context.persistent.PersistentContextService;
+import com.sybase.bpe.correlation.CorrelationService;
+import com.sybase.bpe.correlation.CorrelationServiceException;
+import com.sybase.bpe.correlation.CorrelationServiceSLImpl;
+import com.sybase.bpe.correlation.Registration;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.service.IPMIProcess;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+/**
+ * @author charper
+ * 
+ * <bkl>
+ * I added an in-memory caching layer to this service so that
+ * cmp registration operations could be bundled up at the
+ * end of the transaction rather than interspersed throughout.
+ * This bundling reduces the deadlock hazard.
+ * </bkl>
+ * <charper>
+ * Deadlock occur when doing a BP to BP invoke in the same thread/tx so,
+ * the in-memory chaching layer is now on a per thread basis.  This will
+ * allow the correlation service to bundle registration opperations
+ * per thread/tx.
+ * </charper>
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CorrelationServiceEjbImpl extends CorrelationService 
+{
+    
+	
+	private static Logger logger =
+		Logger.getLogger(CorrelationServiceEjbImpl.class.getName());
+	
+	//private LinkedList registrationOperations = new LinkedList();
+	//private LinkedList registrationsForRemoval = new LinkedList();
+	private static Map<Thread,LinkedList<RegistrationType>>  registrationOperationsByThread =
+		Collections.synchronizedMap(new HashMap<Thread,LinkedList<RegistrationType>>());
+	private static Map<Thread,LinkedList<Registration>> registrationsForRemovalByThread =
+		Collections.synchronizedMap(new HashMap<Thread,LinkedList<Registration>>());
+	private ProcessService ps;
+	private boolean isBPELCompliant;
+	private RegistrationEntityLocalHome relh;
+	public static final String JNDI_RELH_NAME = "java:comp/env/registrationBean";
+	private static Map<Thread,CorrelationServiceSLImpl> transientCSByThread = 
+		Collections.synchronizedMap(new HashMap<Thread,CorrelationServiceSLImpl>());
+	private static Map<Thread,Integer> nestedCorrelationSerivceInvokesByThreadId = 
+		Collections.synchronizedMap(new HashMap<Thread,Integer>());
+	
+	private void addRegistrationOperation(RegistrationType reg) {
+		LinkedList<RegistrationType> regOps = 
+			registrationOperationsByThread.get(Thread.currentThread());
+		if ( regOps == null ) {
+			regOps = new LinkedList<RegistrationType>();
+			registrationOperationsByThread.put(Thread.currentThread(),regOps);
+		}
+		regOps.add(reg);
+	}
+	
+	private Collection<RegistrationType> getRegistrationOperations() {
+		Collection<RegistrationType> regOps = registrationOperationsByThread.get(Thread.currentThread());
+		if ( regOps == null ) {
+			return new LinkedList<RegistrationType>();
+		} else {
+			return regOps;
+		}
+	}
+	
+	private void addRegistrationOperationForRemoval(Registration reg) {
+		LinkedList<Registration> regs = 
+			registrationsForRemovalByThread.get(Thread.currentThread());
+		if ( regs == null ) {
+			regs = new LinkedList<Registration>();
+			registrationsForRemovalByThread.put(Thread.currentThread(),regs);
+		}
+		regs.add(reg);
+	}
+	
+	private Collection<Registration> getRegistrationOperationsForRemoval() {
+		Collection<Registration> regs =  registrationsForRemovalByThread.get(Thread.currentThread());
+		if ( regs == null ) {
+			return new LinkedList<Registration>();
+		} else {
+			return regs;
+		}
+	}
+	
+	private int getNestedCorrelationCount() {
+		Integer count = nestedCorrelationSerivceInvokesByThreadId.
+			get(Thread.currentThread());
+		if ( count == null ) {
+			return 0;
+		} else {
+			return count.intValue();
+		}
+	}
+	private void setNestedCorrelationCount(int val) {
+		nestedCorrelationSerivceInvokesByThreadId.
+			put(Thread.currentThread(),new Integer(val));
+	}
+	private void clearNestedCorreclationCount() {
+		nestedCorrelationSerivceInvokesByThreadId.
+			remove(Thread.currentThread());
+	}
+	public void incNestedCorrelation() {
+		int count = getNestedCorrelationCount();
+		count++;
+		setNestedCorrelationCount(count);
+	}
+	
+	public int decNestedCorrelation() {
+		int count = getNestedCorrelationCount();
+		count--;
+		setNestedCorrelationCount(count);
+		return count;
+		
+	}
+	private CorrelationServiceSLImpl getCachedTransientCS() {
+		CorrelationServiceSLImpl cs =  transientCSByThread.get(Thread.currentThread());
+		if ( cs == null ) {
+			return createCachedCS();
+		} else {
+			return cs;
+		}
+		
+	}
+	private void clearCachedCS() {
+		transientCSByThread.remove(Thread.currentThread());
+	}
+	
+	private CorrelationServiceSLImpl createCachedCS() {
+		CorrelationServiceSLImpl cs = new CorrelationServiceSLImpl();
+		transientCSByThread.put(Thread.currentThread(),cs);
+		return cs;
+	}
+	
+/*
+	private void trace( String message )
+	{
+	    System.out.println("CSTrace - " + message + " iid=" + this.hashCode());
+	    System.out.flush();
+	}
+
+	private void trace( String message, Object obj )
+	{
+	    trace(message);
+	    
+	    XStream xstream = new XStream();
+	    String xmlString = xstream.toXML(obj);
+	    System.out.println(xmlString);
+	    System.out.flush();
+	    
+	}
+*/
+	public CorrelationServiceEjbImpl()
+	{
+	    //trace( "Inside CorrelationServiceEjbImpl()");
+	    //transientCS = new CorrelationServiceSLImpl();
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#createRegistration(com.sybase.bpe.correlation.Registration)
+	 */
+	public void createRegistration(Registration registration)
+	   throws CorrelationServiceException 
+	{
+	    //trace( "Inside createRegistration(Registration registration)", registration);
+	    CreateRegistration cr = new CreateRegistration();
+	    cr.reg = registration;
+	    //registrationOperations.add(cr);
+	    addRegistrationOperation(cr);
+	    getCachedTransientCS().createRegistration(registration);
+	}
+	
+	private void createRegistrationEntity(Registration registration)
+		throws CorrelationServiceException {
+			
+	    //trace( "Inside createRegistrationEntity(Registration registration)", registration );
+			try {
+				relh.create(
+					registration.getStaticKeyValue(),
+					registration.getRootDefId(),
+					registration.getDefId(),
+					registration.getRootProcId(),
+					registration.getProcId(),
+					registration.getKeyValue(),
+					registration.getOperationId());
+			} catch (CreateException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"CreateException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey, java.util.Collection)
+	 */
+	public Collection getRegistrations(IStaticKey key, Collection keyValues)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IStaticKey key, Collection keyValues)", 
+	    	//        new Object[] {key, keyValues});
+			Collection regs = new HashSet();
+			Iterator it = keyValues.iterator();
+			while (it.hasNext()){
+				String keyValue = (String)it.next();
+				Collection beans;
+				try {
+					// if the key value is null look for all registration
+					// with the given static key, the correlation service
+					// will decide how to handle this if there are multiple
+					// registrations found
+					if ( keyValue == null ) {
+						beans =
+							relh.findByStaticKey(key.toString());
+					} else {
+						beans =
+							relh.findByStaticKeyandKeyValue(key.toString(), keyValue);
+					}
+				} catch (FinderException e) {
+					CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+					bpx.log(logger,Level.SEVERE);
+					throw bpx;
+				}
+				
+				Collection regObjs = makeRegistrations(key.toString(),beans);
+				for (Iterator iter = regObjs.iterator(); iter.hasNext();) {
+					Object element = (Object) iter.next();
+					boolean dup = false;
+					for (Iterator iterator = regs.iterator(); iterator
+							.hasNext();) {
+						Object reg = (Object) iterator.next();
+						if ( element.equals(reg)) {
+							dup = true;
+							break;
+						}
+					}
+					if ( ! dup ) {
+						regs.add(element);
+					}
+				}
+				
+				
+				regs = screenRegs(regs);
+				
+				//Add the registrations which were created in the current transaction
+				//but have not been persisted yet.
+				regs.addAll(getCachedTransientCS().getRegistrations(key, keyValues));
+			}
+			return regs;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.instance.service.IPMIProcess)
+	 */
+	public Collection getRegistrations(IPMIProcess key)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IPMIProcess key)", key);
+			Collection beans;
+			try {
+				beans = relh.findByProcId(key.getKey());
+			} catch (FinderException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+			Collection regs = makeRegistrations(key.getKey(),beans);
+			regs = screenRegs(regs);
+			
+			//Add the registrations which were created in the current transaction
+			//but have not been persisted yet.
+			regs.addAll( getCachedTransientCS().getRegistrations(key));
+			return regs;
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey)
+	 */
+	public Collection getRegistrations(IStaticKey key)
+		throws CorrelationServiceException {
+	    	//trace( "Inside getRegistrations(IStaticKey key)", key);
+			Collection beans;
+			try {
+				beans = relh.findByStaticKey(key.toString());
+			} catch (FinderException e) {
+				CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+				bpx.log(logger,Level.SEVERE);
+				throw bpx;
+			}
+			Collection regs = makeRegistrations(key.toString(), beans);
+			regs = screenRegs(regs);
+			
+			//Add the registrations which were created in the current transaction
+			//but have not been persisted yet.
+			Collection transientRegs = getCachedTransientCS().getRegistrations(key);
+			//trace("Adding transient regs", transientRegs);
+			regs.addAll( transientRegs );
+			return regs;
+		
+	}
+	
+	// Called by the various getRegistrations methods.
+	// If the registrations have been removed but the removal
+	// has not been persistent yet, don't report the registration.
+	private Collection screenRegs( Collection regs )
+	{
+	    Collection retVal = new HashSet();
+	    Iterator iter = regs.iterator();
+	    while( iter.hasNext() )
+	    {
+	        Registration candidate  = ( Registration )( iter.next());
+	        //if ( !( registrationsForRemoval.contains( candidate) ) ) 
+	        if ( !( getRegistrationOperationsForRemoval().contains( candidate) ) ) 
+	        {
+	            retVal.add(candidate);
+	        }
+	    }
+	    return retVal;
+	}
+	
+	
+	private Collection makeRegistrations(String key, Collection beans){
+	    //trace( "Inside makeRegistrations(String key, Collection beans)", 
+	    //        new Object[]{key, beans});
+		ArrayList regs = new ArrayList ();
+		Iterator it = beans.iterator();
+		while ( it.hasNext() ){
+			RegistrationEntityLocal bean = (RegistrationEntityLocal)it.next();
+			regs.add(
+				new Registration(
+					key,
+					bean.getOperationId(),
+					bean.getRootDefId(),
+					bean.getDefId(),
+					bean.getRootProcId(),
+					bean.getProcId(),
+					bean.getKeyValue()
+					));
+		}
+		return regs;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#update()
+	 */
+	public void update() throws BPException {
+	    //trace( "Inside update()");
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getProcessService()
+	 */
+	public ProcessService getProcessService() throws BPException {
+	    //trace( "Inside getProcessService()");
+		return ps;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#isBPELCompliant()
+	 */
+	public boolean isBPELCompliant() {
+	    //trace( "Inside isBPELCompliant()");
+		return isBPELCompliant;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#init(com.sybase.bpe.util.BPEProperties, com.sybase.bpe.engine.ProcessService)
+	 */
+	public void init(BPEProperties props, ProcessService ps)
+		throws CorrelationServiceException {
+	    //trace( "Inside init(BPEProperties props, ProcessService ps)", new Object[]{props, ps});
+		this.ps = ps;
+		if (props.getBPELCompliant().equals("TRUE")) {
+			this.isBPELCompliant = true;
+		}
+		
+		try {
+			InitialContext ic = new InitialContext();
+			relh = (RegistrationEntityLocalHome) ic.lookup(JNDI_RELH_NAME);
+		} catch (NamingException ne) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"NamingException"},ne);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		}
+
+	}
+	
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#getRegistration(com.sybase.bpe.event.IStaticKey, java.lang.String, java.lang.String)
+	 */
+	public Registration getRegistration(IStaticKey key, String operationId, String processId) throws CorrelationServiceException {
+	    //trace( "Inside getRegistration(IStaticKey key, String operationId, String processId)",
+	    //        new Object[]{key, operationId, processId});
+	    Collection beans;
+		try {
+			beans = relh.findByStaticKeyandOpIdandProcId(key.toString(),operationId,processId);
+		} catch (FinderException e) {
+			CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+			bpx.log(logger,Level.SEVERE);
+			throw bpx;
+		}
+		Iterator it = makeRegistrations(key.toString(),beans).iterator();
+		
+		Registration retVal = null;
+		
+		// there should only be one of these
+		if ( it.hasNext() ) 
+		{
+			retVal = (Registration)it.next();
+			
+			//The registration could already have been
+			//deleted but the deletion may not have been
+			//persisted yet.  Consult the removal list to
+			//find out.
+			//trace( "Checking transient registration removal list.");
+			//if ( registrationsForRemoval.contains(retVal))
+			if ( getRegistrationOperationsForRemoval().contains(retVal))
+			{
+			    //trace("Transient deletion detected.");
+			    retVal = null;
+			}
+		}
+		
+		
+		if ( retVal == null )
+		{
+		    //There could be a registration which has not been persisted yet.
+		    //Consult the transient correlation service to find out.
+			retVal = getCachedTransientCS().getRegistration(key, operationId, processId);
+			if ( retVal != null )
+			{
+			    //trace("Transient registration found.");
+			}
+		}
+		
+		return retVal;
+	}
+	
+	public void removeRegistration(Registration registration) 
+	   throws CorrelationServiceException 
+	{
+	    //trace( "Inside removeRegistration(Registration registration)", registration);
+	    RemoveRegistration rr = new RemoveRegistration();
+	    rr.reg = registration;
+	    //registrationOperations.add(rr);
+	    addRegistrationOperation(rr);
+	    getCachedTransientCS().removeRegistration(registration); 
+	    //registrationsForRemoval.add(registration);
+	    addRegistrationOperationForRemoval(registration);
+	}
+
+	/* (non-Javadoc)
+	 * @see com.sybase.bpe.correlation.CorrelationService#removeRegistration(com.sybase.bpe.correlation.Registration)
+	 */
+	protected void removeRegistrationEntity(Registration registration) 
+	  throws CorrelationServiceException {
+	    //trace( "Inside removeRegistrationEntity(Registration registration)", registration);
+		try {
+			RegistrationEntityPK pk = new RegistrationEntityPK();
+			pk.setOperationId(registration.getOperationId());
+			pk.setProcId(registration.getProcId());
+			pk.setRootDefId(registration.getRootDefId());
+			pk.setRootProcId(registration.getRootProcId());
+			pk.setStaticKeyValue(registration.getStaticKeyValue());
+			RegistrationEntityLocal bean = relh.findByPrimaryKey(pk);
+			bean.remove();
+		} catch ( ObjectNotFoundException e ) {
+			// ignore requests to remove regstrations that are not there
+		} catch (FinderException e) {
+			logger.log(Level.WARNING,"",e);
+		} catch (EJBException e) {
+			logger.log(Level.WARNING,"",e);
+		} catch (RemoveException e) {
+			logger.log(Level.WARNING,"",e);
+		}
+	}
+	
+	private void doContextDeletes(String key) throws CorrelationServiceException {
+		
+		try {
+			IContextService cs = ps.getContextService(key);
+			if (cs instanceof PersistentContextService) {
+				PersistentContextService pcs = (PersistentContextService) cs;
+				IDataObjectFactory dof = pcs.getPersistentDataObjectFactory();
+				if ( dof instanceof EJBDataObjectFactory ) {
+					EJBDataObjectFactory ejbdof = (EJBDataObjectFactory)dof;
+					ejbdof.removeObjects();
+				}
+			}
+		} catch (Exception e) {
+			throw new CorrelationServiceException(e.getLocalizedMessage(),new Object[]{},e);
+		}
+		
+	}
+	
+	private void clearContextDeletes(String key) {
+		
+		try {
+			IContextService cs = ps.getContextService(key);
+			if (cs instanceof PersistentContextService) {
+				PersistentContextService pcs = (PersistentContextService) cs;
+				IDataObjectFactory dof = pcs.getPersistentDataObjectFactory();
+				if ( dof instanceof EJBDataObjectFactory ) {
+					EJBDataObjectFactory ejbdof = (EJBDataObjectFactory)dof;
+					ejbdof.clearRemoveObject();
+				}
+			}
+		} catch (Exception e) {
+			// problem cleaning up
+			logger.log(Level.SEVERE,e.getLocalizedMessage(),e);
+		}
+		
+	}
+	
+	// Delete old regs and insert new regs.  This method
+	// was added so that the removal of the registrations from the 
+	// database could be sequenced at the end of the transaction
+	// rather than interspersed throughout the transaction causing
+	// a deadlock hazard.
+	public void persistRegistrationChanges() throws CorrelationServiceException
+	{
+
+	    //trace( "Inside persistRegistrationChanges()");
+	    //Iterator iter = registrationOperations.iterator();
+	    //while( iter.hasNext() )
+		for (RegistrationType obj : getRegistrationOperations())
+	    {
+	        //Object obj = iter.next();
+	        if ( obj instanceof RemoveRegistration )
+	        {
+	            RemoveRegistration rr = 
+	                (RemoveRegistration)(obj);
+	            
+	            doContextDeletes(rr.reg.getRootProcId());
+	            
+	            removeRegistrationEntity( rr.reg );
+	        }
+	        if ( obj instanceof CreateRegistration )
+	        {
+	            CreateRegistration cr = 
+	                (CreateRegistration)(obj);
+	            
+	            doContextDeletes(cr.reg.getRootProcId());
+	            
+	            createRegistrationEntity( cr.reg );
+	        }
+	    }
+	    clearState();
+	    
+	    /*
+	    Iterator iter = registrationsForRemoval.iterator();
+	    while( iter.hasNext() )
+	    {
+	        Registration reg = ( Registration )( iter.next() );
+	        removeRegistrationEntity( reg );
+	    }
+	    registrationsForRemoval.clear();
+	    
+	    iter = newRegistrations.iterator(); 
+	    while( iter.hasNext() )
+	    {
+	        Registration reg = ( Registration )( iter.next() );
+	        createRegistrationEntity( reg );
+	    }
+	    newRegistrations.clear();
+	    */
+	}
+	
+	public void clearState()
+	{
+	    //trace( "Inside clearState()");
+	    //registrationsForRemoval.clear();
+	    //registrationOperations.clear();
+		Collection<RegistrationType> regtypes = getRegistrationOperations();
+		for ( RegistrationType regt : regtypes ) {
+	        if ( regt instanceof RemoveRegistration )
+	        {
+	            RemoveRegistration rr = 
+	                (RemoveRegistration)(regt);
+	            clearContextDeletes(rr.reg.getRootProcId());
+	        }
+	        if ( regt instanceof CreateRegistration )
+	        {
+	            CreateRegistration cr = 
+	                (CreateRegistration)(regt);
+	            clearContextDeletes(cr.reg.getRootProcId());
+	        }
+		}
+		regtypes.clear();
+		
+		Collection<Registration> regs = getRegistrationOperationsForRemoval();
+		for ( Registration reg : regs ) {
+			clearContextDeletes(reg.getRootProcId());
+		}
+		regs.clear();
+		
+	    //newRegistrations.clear();
+	    clearCachedCS();
+	    clearNestedCorreclationCount();
+	}
+	
+	private interface RegistrationType {}
+	
+	private class RemoveRegistration implements RegistrationType
+	{
+	    public Registration reg;
+	}
+	
+	private class CreateRegistration implements RegistrationType
+	{
+	    public Registration reg;
+	}
+}