You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ad...@apache.org on 2006/02/28 17:05:47 UTC
svn commit: r381686 [21/40] - in /incubator/ode/scratch/bpe: ./ bpelTests/
bpelTests/probeService/ bpelTests/test1/ bpelTests/test10/
bpelTests/test12/ bpelTests/test13/ bpelTests/test14/ bpelTests/test15/
bpelTests/test16/ bpelTests/test17/ bpelTests/...
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationService.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,823 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.bped.IInternalEventDirector;
+import com.sybase.bpe.context.IContainer;
+import com.sybase.bpe.context.IContextService;
+import com.sybase.bpe.context.resolver.ContextResolver;
+import com.sybase.bpe.context.resolver.IResolvedObject;
+import com.sybase.bpe.context.test.SerializationContext;
+import com.sybase.bpe.definition.IPMDCorrelationSet;
+import com.sybase.bpe.definition.IPMDLocator;
+import com.sybase.bpe.definition.IPMDOperation;
+import com.sybase.bpe.definition.IPMDProcess;
+import com.sybase.bpe.definition.IPMDRoot;
+import com.sybase.bpe.definition.service.DefinitionService;
+import com.sybase.bpe.engine.ProcessDefinitionKey;
+import com.sybase.bpe.engine.ProcessInstance;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.engine.ReturnMessageLocatorHolder;
+import com.sybase.bpe.engine.StateEnum;
+import com.sybase.bpe.event.Fault;
+import com.sybase.bpe.event.IRequestMessageEvent;
+import com.sybase.bpe.event.IResponseMessage;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.IPMIProcess;
+import com.sybase.bpe.instance.service.InstanceService;
+import com.sybase.bpe.interaction.IInteraction;
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionFactory;
+import com.sybase.bpe.lang.ResourceGetter;
+import com.sybase.bpe.scope.service.BPRuntimeException;
+import com.sybase.bpe.scope.service.ScopePath;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+import com.sybase.bpe.event.BPELStaticKey;
+import com.sybase.bpe.interaction.Invocation;
+import com.sybase.bpe.interaction.query.JaxenXPathSingleNodeQuery;
+
+/**
+ * A CorrelationService implementation will allow processes to register
+ * correlations Underneath the CorrelationService is a repostory that stores these
+ * correlations. CorrelationServcies are obtained from the
+ * CorrelationServiceFactory.getCorrelationService() function.
+ * @see com.sybase.bpe.correlation.CorrelationServcieFactory
+ * @see com.sybase.bpe.correlation.StaticRegistration
+ */
+public abstract class CorrelationService {
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationService.class.getName());
+
+
+ /**
+ * Create a registration in the repostitory.
+ * @param regisration
+ * @throws BPException
+ */
+ public abstract void createRegistration(Registration regisration)
+ throws BPException;
+
+ /**
+ * Remove a registration from the repository.
+ * @param registration
+ */
+ public abstract void removeRegistration(Registration registration)
+ throws BPException;
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @param keyValues
+ * @return
+ * @throws BPException
+ */
+ public abstract Collection getRegistrations(
+ IStaticKey key,
+ Collection keyValues)
+ throws BPException;
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+
+ public abstract Collection getRegistrations(IPMIProcess key)
+ throws BPException;
+
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+
+ public abstract Collection getRegistrations(IStaticKey key)
+ throws BPException;
+
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+ public abstract Registration getRegistration(IStaticKey key, String operationId,
+ String processId)
+ throws BPException;
+
+ // lock on a key
+ public void lock(IInternalEventDirector ed, String key) throws BPException {
+ ed.getLockingService().lock(key);
+ }
+
+ // update the correlation service
+ public abstract void update() throws BPException;
+
+ public abstract ProcessService getProcessService() throws BPException;
+
+ public abstract boolean isBPELCompliant();
+
+ /**
+ * Send the message to a particular registration.
+ * @param registration
+ * @param me
+ * @param sync
+ * @param ed
+ * @return
+ */
+ public void routeToRegistration(
+ Registration registration,
+ IRequestMessageEvent me,
+ IInternalEventDirector ed) throws BPException {
+
+
+
+ String rootDefId = registration.getRootDefId();
+ String defId = registration.getDefId();
+ String rootProcId = registration.getRootProcId();
+ String procId = registration.getProcId();
+ String opId = registration.getOperationId();
+
+
+ DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+ IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(rootDefId));
+ IPMDOperation op = root.getOperation(opId);
+
+
+ ProcessInstance pi = null;
+ // we are sending this to an existing process instance
+ if ( ! op.isInstanceCreating() ) {
+ pi = getProcessService().lookupProcess( rootProcId, procId );
+
+ // we are making a new process instance
+ } else {
+ // get the defintion to clone
+
+ ProcessDefinitionKey rpdk = new ProcessDefinitionKey(rootDefId);
+ ProcessDefinitionKey pdk = new ProcessDefinitionKey(defId);
+ IPMDProcess ipmd = ds.getProcessDefintion(pdk,rpdk);
+ // get the parent process
+ IPMIProcess ipmi = getProcessService().getInstanceService().getInstance(
+ rootProcId,rootProcId);
+ // create the process
+ pi = getProcessService().createSubProcess(ipmi,ipmd);
+ pi.setState(StateEnum.STARTED);
+ // since we are creating the proc off the root
+ // we have to set the scope path to the scope path
+ // of the process that made the registration
+ ProcessInstance regProc = getProcessService().lookupProcess( rootProcId, procId );
+ pi.setScopePath(regProc.getScopePath());
+
+ }
+
+ // send the message
+ pi.processEvent(me,ed, ed);
+
+ }
+
+ /**
+ * Send the message to a particular registrations.
+ * @param operation
+ * @param me
+ * @param sync
+ * @param ed
+ * @return
+ */
+ public void createInstanceAndRoute(
+ IPMDOperation operation,
+ IRequestMessageEvent me,
+ IInternalEventDirector ed) throws BPException {
+
+ // creating root definition
+ ProcessDefinitionKey rpdk = new ProcessDefinitionKey(operation.getRootDefId());
+ ProcessInstance pi = getProcessService().createProcess(rpdk);
+ pi.setState(StateEnum.STARTED);
+
+ // create the root context partition
+ IContextService ctxs = pi.getContextService();
+ IContainer root = ctxs.getRoot();
+ IContainer procroot = (IContainer)root.findChild(pi.getRootKey());
+ if ( procroot == null ) {
+ root.createContainer(pi.getRootKey());
+ }
+
+ // send the message
+ pi.processEvent(me,ed, ed);
+ }
+
+ /**
+ * Initialize the CorrelationService.
+ * @param props Initialization properties.
+ */
+ public abstract void init(BPEProperties props, ProcessService ps)
+ throws BPException;
+
+ /**
+ * Correlate an event.
+ * @param me the message to correlate
+ * @param ed the EventDirector
+ * @return
+ * @throws BPException
+ */
+ public IResponseMessage correlateEvent(
+ IRequestMessageEvent me,
+ boolean sync,
+ IInternalEventDirector ed)
+ throws BPException {
+
+ // init the process service
+ getProcessService().init();
+
+ // set the message event on the event director
+ ed.setMessageEvent(me);
+ // this is a new message so null out the return message locator object
+ ed.setReturnMessageMetadata(null);
+
+ // get the static key
+ IStaticKey key = me.getStaticKey();
+
+ // object to collect data about sent messages
+ SentMessageData smd = new SentMessageData();
+
+ // look up noninstance creating operations for this static key
+ // inflight and metadata
+ Collection defOps = getAllDefinedNonInstanceCreatingOperations(key);
+
+ Collection keys = computeKeys(me,defOps,false);
+
+ //sync on computed keys
+ // TODO - make sure these keys are ordered constitently so we don't get dead locks
+ // This is not needed when we are not caching events
+ //lockOnKeys(ed, me.getStaticKey().toString(),keys);
+
+
+ // if defOps all belong to stateless BPs
+ // we don't need to look up registrations
+ Registration reg = null;
+ if ( isStatefull(defOps)) {
+ // find a regiration to route to
+ // null if there are no registrations to be routed to
+ // this method also locks on the process instance id so execution is
+ // serialized for the rest of the transaction
+ reg = findRegistration(ed, key,keys);
+ }
+
+ // we found a registation
+ // route and return
+ if ( reg != null ) {
+ routeToRegistration(reg,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+ IPMDOperation op = findInstanceCreatingOperation(key);
+
+ // we found an instance creation operation
+ // route and return
+ if ( op !=null ) {
+
+ // get root definition to see if it is stateless
+ IPMDRoot root = getProcessService().getInstanceService().getDefinitionService().
+ getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+
+ // if we need to protect process instantiation and
+ // the process is statefull we need to lock on the root id
+ if ( root.getProtectedInstantiation() && ! root.getIsStateless() ) {
+
+ // this sync is required to prevent
+ // serialize multiple instance creating events
+ // racing toward the same process
+ lock(ed, op.getRootDefId());
+
+ // we have to perform this check again because the instance could have been
+ // created between the time the first check was performed and the
+ // time root definition id was syncronized upon.
+ Registration regRecheck = findRegistration(ed, key,keys);
+
+ // we found a registation
+ // route and return
+ if ( regRecheck != null ) {
+ routeToRegistration(regRecheck,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+ }
+
+ // we have an operation
+ // so create an instance and route to it
+ // note this is called if the defintion is stateless or not
+ createInstanceAndRoute(op,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+
+ if ( defOps == null && op == null ) {
+ CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { key });
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+ // At this point an exception should be thrown. The input event cannot be correlated. Caching
+ // is still not implement and it is not clear if this is even possible.
+ if (op == null)
+ {
+ BPELStaticKey bpelKey = (BPELStaticKey) key;
+ Object[] obj = new Object[5];
+ obj[0] = bpelKey.getOperation();
+ obj[1] = bpelKey.getPortType();
+ obj[2] = bpelKey.getTargetNamespace();
+ obj[3] = getCorrelationValues( me,defOps );
+ obj[4] = getCorrelationKeys( defOps );
+
+ CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR_R", obj);
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+
+
+ //cacheEvent(keys,me);
+ // TODO return what when cached?
+ return null;
+ }
+
+ private IPMDOperation findInstanceCreatingOperation (IStaticKey key) throws BPException {
+ // get a Collection of ids that are instance creating
+ DefinitionService ds = getProcessService()
+ .getInstanceService().getDefinitionService();
+ Collection ops = ds.getCorrelations(key);
+
+ if ( ops.size() == 0 ) {
+ return null;
+ }
+ if ( ops.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ IPMDOperation op = (IPMDOperation)ops.iterator().next();
+ return op;
+ }
+
+ private Registration findRegistration( IInternalEventDirector ed, IStaticKey key, Collection keys ) throws BPException {
+
+ Collection registrations = getRegistrations(key,keys);
+
+ if ( registrations.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ Registration reg = null;
+
+ if ( registrations.size() > 0 ) {
+ reg = (Registration)registrations.iterator().next();
+
+ // look the process instance so that we get a protected registration lookup
+ // this is necessary because an in flight crank of the engine could remove
+ // a registration
+ lockRootProcessInstance(ed, reg);
+
+ // select again because the instance coulde have been satisfied and gone away
+/* This is not needed if we are not caching events.
+ * registrations = getRegistrations(key,keys);
+
+ if ( registrations.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ reg = (Registration)registrations.iterator().next();*/
+
+ }
+
+ return reg;
+
+ }
+
+ private Collection getAllDefinedNonInstanceCreatingOperations(IStaticKey key) throws BPException {
+ // get def service
+ DefinitionService ds = getProcessService()
+ .getInstanceService().getDefinitionService();
+ return ds.getNonInstanceCreatingOps(key);
+ }
+
+ private IResponseMessage mapResult(SentMessageData smd,
+ IInternalEventDirector ed, boolean sync ) throws BPException {
+ // build the return message if needed
+ if (sync && ! smd.spray ) {
+ // create the return message; no fault has occured
+ IResponseMessage mer = ed.getMessageEvent().createResponseMessage();
+ // get the return metadata
+ ReturnMessageLocatorHolder rmlh = ed.getReturnMessageMetadata();
+
+ // Test to see if the reply has a fault
+ Fault replyFault = null;
+ if ( rmlh.getFaultName() != null ) {
+ replyFault = new Fault();
+ replyFault.setFaultString(rmlh.getFaultName());
+ BPRuntimeException bpre = new BPRuntimeException(rmlh.getFaultName(),"");
+ bpre.setNameSpace(rmlh.getFaultNS());
+ replyFault.setFaultException(bpre);
+ mer.setFault(replyFault);
+ }
+
+ Iterator it = rmlh.getLocators();
+ InstanceService is = ed.getProcessService().getInstanceService();
+ IPMIProcess ipmip =
+ is.getInstance(
+ rmlh.getRootProcessID(),
+ rmlh.getRootProcessID());
+ // create a resolver so we can get at the data
+ ContextResolver resolver =
+ new ContextResolver(
+ ipmip,
+ rmlh,
+ ed.getProcessService().getContextService(ipmip),
+ ed.getProcessService().getScopeService(ipmip));
+
+ try
+ {
+// Some debugging code that dumps the
+ // process context.
+ SerializationContext sc = new SerializationContext( System.out );
+ sc.printComment( "Correlation Service Context:");
+ sc.serialize( resolver.getContextService() );
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+
+ // we might be building the return message after the scope has
+ // ended so we need to hijack the
+ // current scope of the process
+ ScopePath saveSP = ipmip.getScopePath();
+ ipmip.setScopePath(rmlh.getScopePath());
+
+ // loop over the return metadata
+ while (it.hasNext()) {
+ IPMDLocator loc = (IPMDLocator) it.next();
+ String name = loc.getName();
+ String[] name_split = name.split(":");
+
+ // if it's not a correlation set locator
+ Collection corrlSets = rmlh.getCorrlSets();
+ if ( corrlSets == null || ! corrlSets.contains(name_split[0])) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Adding " + loc.getName()
+ + " to return message");
+ }
+ IResolvedObject ro = null;
+
+ // resolve the object
+ ro = resolver.resolveBPContext(loc.getName());
+
+ // set the part
+ Object obj = ro.getValue();
+ IInteraction interaction = null;
+ if (obj instanceof IInteraction) {
+ interaction = (IInteraction) ro.getValue();
+ } else if (obj instanceof String) {
+ // create a new interaction
+ interaction = InteractionFactory.newInstance()
+ .createXMLInteraction(
+ ((String) ro.getValue()).getBytes());
+ }
+
+ if (replyFault == null) {
+ mer.setPart(name_split[1], interaction);
+ } else {
+ BPRuntimeException bpre = (BPRuntimeException) replyFault
+ .getFaultException();
+ bpre.addPartMessage(name_split[1], interaction);
+ }
+ }
+
+ }
+
+ // put back the scope
+ ipmip.setScopePath(saveSP);
+
+ // update the instance service
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(
+ "updatting instance service and removing lock for:"
+ + ipmip.getRootKey());
+ }
+ ed.getProcessService().update(ipmip.getRootKey(), ed, ed);
+
+ return mer;
+ } else {
+
+ Iterator it = smd.rootIds.iterator();
+ while ( it.hasNext() ) {
+ ed.getProcessService().update((String)it.next(), ed, ed);
+ }
+ return ed.getMessageEvent().createResponseMessage();
+ }
+
+ }
+
+ private String getCorrelationKeys(Collection operations)
+ {
+ String rtnVal = new String();
+ Iterator it = operations.iterator();
+ while (it.hasNext())
+ {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() != null)
+ {
+ Iterator it2 = op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext())
+ {
+ IPMDCorrelationSet corrlset = (IPMDCorrelationSet) it2.next();
+ Iterator it3 = corrlset.getPartQueries().iterator();
+ while (it3.hasNext())
+ {
+ PartQuery pq = (PartQuery) it3.next();
+ rtnVal += pq.getPartName();
+ if ((pq.getQuery() != null) && (pq.getQuery() instanceof Invocation))
+ {
+ Invocation query = (Invocation) pq.getQuery();
+ if (query.getQuery() instanceof JaxenXPathSingleNodeQuery)
+ {
+ JaxenXPathSingleNodeQuery jsnq = (JaxenXPathSingleNodeQuery)query.getQuery();
+ try
+ {
+ rtnVal += ":";
+ rtnVal += jsnq.getXPathExpression();
+ }
+ catch(Exception e)
+ {
+ // Just eat this error. We are in the process of building up an error to throw.
+ // All we are trying to do is capture more information to log.
+ }
+ }
+ }
+ rtnVal += " ";
+ }
+ }
+ }
+ }
+ return rtnVal;
+ }
+
+ // Get the correlation values for error message when something goes wrong.
+ private String getCorrelationValues(IRequestMessageEvent me, Collection operations)
+ {
+ String rtnVal = new String();
+ Iterator it = operations.iterator();
+ while (it.hasNext()) {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() != null)
+ {
+ // create the full key, in a process creating instance this should be the only
+ // key created
+ Iterator it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ //rtnVal += makeKeyValue(me, corrlset.getPartQueries());
+ //rtnVal += " ";
+
+ try
+ {
+ Iterator it3 = corrlset.getPartQueries().iterator();
+ while (it3.hasNext())
+ {
+ String s;
+ PartQuery pq = (PartQuery) it3.next();
+
+ IInteraction part = me.getPart(pq.getPartName());
+ if (part != null)
+ {
+ IInvocation invocation = ( IInvocation )pq.getQuery();
+ if ( invocation != null ) {
+ // invoke the interaction
+ s = ( String ) (part.invoke(invocation)).toString();
+ } else {
+ s = part.toString();
+ }
+
+ // append to the buffer if it is not null
+ if (s != null) {
+ rtnVal += s;
+ rtnVal += " ";
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ // TODO: Cory - please verify no rethrow
+ logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+ return null;
+ }
+ }
+ }
+ }
+ return rtnVal;
+ }
+
+ // compute a matching list of key values for a list of registrations
+ private Collection computeKeys(IRequestMessageEvent me, Collection operations, boolean dups) {
+ Collection keys = null;
+ if ( dups ) {
+ keys = new ArrayList();
+ } else {
+ keys = new HashSet();
+ }
+
+ Iterator it = operations.iterator();
+ while (it.hasNext()) {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() == null) {
+ keys.add(null);
+ } else {
+ // create the full key, in a process creating instance this should be the only
+ // key created
+ StringBuffer buff = new StringBuffer("");
+ Iterator it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+ }
+ keys.add(buff.toString());
+
+
+ // create a key based on only the non initatiating correlation sets
+ buff = new StringBuffer("");
+ boolean foundNonInstantiating = false;
+ it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ if ( ! corrlset.isInstantiating() ) {
+ buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+ foundNonInstantiating = true;
+ }
+ }
+ if ( ! buff.toString().equals("")) {
+ keys.add(buff.toString());
+ }
+ // if this operation only had instantiating keys
+ // there could be a registration with a null
+ // key(no correlations)
+ if ( ! foundNonInstantiating ) {
+ keys.add(null);
+ }
+ }
+ }
+
+ return keys;
+ }
+
+ /**
+ * Generate a key value
+ * @param me
+ * @param keyMetaData
+ * @return
+ */
+ private String makeKeyValue(IRequestMessageEvent me, Collection keyMetaData) {
+
+ // StringBuffer to build the key value
+ StringBuffer buff = new StringBuffer();
+
+ try {
+
+ // generate a key value
+ Iterator it = keyMetaData.iterator();
+ String s = null;
+ while (it.hasNext()) {
+
+ PartQuery pq = (PartQuery) it.next();
+
+ // check to see if the message event has a maching part
+ IInteraction part = me.getPart(pq.getPartName());
+ if (part != null) {
+
+ IInvocation invocation = ( IInvocation )pq.getQuery();
+
+ if ( invocation != null ) {
+ // invoke the interaction
+ s = ( String ) (part.invoke(invocation)).toString();
+ } else {
+ s = part.toString();
+ }
+
+ // append to the buffer if it is not null
+ if (s != null) {
+ buff.append(s);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ // TODO: Cory - please verify no rethrow
+ logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+ return null;
+ }
+
+ return buff.toString();
+
+ }
+
+ private boolean isStatefull(Collection defOps) throws BPException {
+ boolean ret = false;
+ DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+ for ( Iterator iter = defOps.iterator(); iter.hasNext(); ) {
+ IPMDOperation op = (IPMDOperation) iter.next();
+ op.getRootDefId();
+ IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+ if ( ! root.getIsStateless() ) {
+ ret = true;
+ break;
+ }
+
+ }
+ return ret;
+ }
+
+ // if we have already sent a message and this is a bpel compliant engine
+ // throw an error, check for a spray, and gather spray data
+ private void sentMessage(SentMessageData smd, String rootid)
+ throws CorrelationServiceException {
+
+ if ( smd.messageSent && isBPELCompliant() ) {
+ CorrelationServiceException cse = new CorrelationServiceException("CS_NON_BPEL",null);
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+ if ( smd.messageSent ) {
+ smd.spray = true;
+ }
+ smd.rootIds.add(rootid);
+ smd.messageSent = true;
+ }
+
+// private void lockOnKeys(IInternalEventDirector ed, String prefix, Collection keys) throws BPException {
+// Iterator it = keys.iterator();
+// while ( it.hasNext() ) {
+// String key = (String)it.next();
+// lock(ed, prefix+key);
+// }
+// }
+
+// private void cacheEvent(Collection keys, IRequestMessageEvent me) throws BPException {
+// // TODO implement caching
+// CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { me.getStaticKey().toString() });
+// cse.log(logger,Level.SEVERE);
+// throw cse;
+// }
+
+ private void lockRootProcessInstance(IInternalEventDirector ed, Registration reg) throws BPException {
+ lock(ed, reg.getRootProcId());
+ }
+
+ class SentMessageData {
+
+ // spray flag, if this event is routed to more than one process is is a spray
+ boolean spray = false;
+ // a list of rootid to update at the end, needed for the spray case
+ ArrayList rootIds = new ArrayList();
+ // flag to see if we sent this message anywhere
+ boolean messageSent = false;
+
+ }
+
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceException.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,54 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import com.sybase.bpe.util.BPException;
+
+/**
+ * Exception class for the CorrelationService.
+ */
+public class CorrelationServiceException extends BPException {
+
+ static final long serialVersionUID = -6546885597497431542L;
+
+
+ /**
+ * @param message_id
+ * @param msgParams
+ */
+ public CorrelationServiceException(String message_id, Object[] msgParams) {
+ super(message_id, msgParams);
+ }
+
+ /**
+ * @param message_id
+ * @param msgParams
+ * @param cause
+ */
+ public CorrelationServiceException(
+ String message_id,
+ Object[] msgParams,
+ Throwable cause) {
+ super(message_id, msgParams, cause);
+ }
+
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/CorrelationServiceFactory.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,70 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+public class CorrelationServiceFactory
+{
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationServiceFactory.class.getName());
+
+ /** @param properties */
+ public static CorrelationService createCorrelationService(BPEProperties props,
+ ProcessService ps)
+ throws BPException {
+
+ CorrelationService cs = null;
+
+
+ try {
+ // load the implementation
+ Class instClass =
+ java.lang.Class.forName(props.getCorrelationServiceClass());
+ // try to instantiate the subclass
+ cs = (CorrelationService) instClass.newInstance();
+ cs.init(props,ps);
+
+ } catch (ClassNotFoundException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("CLASS_NOT_FOUND",new Object[] {props.getCorrelationServiceClass()});
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ } catch (InstantiationException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"InstantiationException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ } catch (IllegalAccessException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"IllegalAccessException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ return cs;
+ }
+
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/PartQuery.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,121 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+package com.sybase.bpe.correlation;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionException;
+import com.sybase.bpe.interaction.InvocationFactory;
+
+/**
+ * A PartQuery is just a object that holds a part name and a query object for that part.
+ * StaticRegistrations and DynamicRegistrations build an array of these to represent
+ * metadata that is used to generate a correlation key value.
+ */
+public class PartQuery implements Serializable {
+
+ static final long serialVersionUID = -2542067136142474418L;
+
+ private String part;
+ private Object obj;
+ private String locatorName;
+
+ public PartQuery(){}
+
+ /**
+ * Constructor for a PartQuery
+ */
+ public PartQuery(String part, Object obj, String locatorName, HashMap nsMap) {
+ this.part = part;
+ if ( obj instanceof String )
+ {
+ try
+ {
+ IInvocation invocation =
+ InvocationFactory.newInstance().
+ //createXPathQueryNodeValueInvocation( ( String ) obj, new HashMap());
+ createXPathQueryNodeValueInvocation( ( String ) obj, nsMap);
+ this.obj = invocation;
+ } catch (InteractionException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+ else
+ {
+ this.obj = obj;
+ }
+
+ this.locatorName = locatorName;
+ }
+
+ /**
+ * Set the part name.
+ * @param part The part name.
+ */
+ public void setPartName(String part) {
+ this.part = part;
+ }
+
+ /**
+ * Get the part name.
+ * @return The part name.
+ */
+ public String getPartName() {
+ return this.part;
+ }
+
+ /**
+ * Set the query object.
+ * @param obj The query.
+ */
+ public void setQuery(Object obj) {
+ this.obj = obj;
+ }
+
+ /**
+ * Get the query object.
+ * @return The query object.
+ */
+ public Object getQuery() {
+ return this.obj;
+ }
+
+ /**
+ * @return
+ */
+ public String getLocatorName() {
+ return locatorName;
+ }
+
+ /**
+ * @param string
+ */
+ public void setLocatorName(String string) {
+ locatorName = string;
+ }
+
+
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/Registration.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,210 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+/*
+ * Created on Aug 12, 2003
+ *
+ */
+package com.sybase.bpe.correlation;
+
+import java.io.Serializable;
+
+/**
+ * @author charper
+ *
+ */
+public class Registration implements Serializable {
+
+ static final long serialVersionUID = 2061953613411330775L;
+
+ private String staticKeyValue;
+ private String rootDefId;
+ private String defId;
+ private String rootProcId;
+ private String procId;
+ private String keyValue;
+ private String operationId;
+
+ public Registration(String staticKeyValue, String operationId, String rootDefId, String defId,
+ String rootProcId, String procId, String keyValue) {
+ this.staticKeyValue = staticKeyValue;
+ this.rootDefId = rootDefId;
+ this.defId = defId;
+ this.rootProcId = rootProcId;
+ this.procId = procId;
+ this.keyValue = keyValue;
+ this.operationId = operationId;
+ }
+
+ /**
+ * @return
+ */
+ public String getDefId() {
+ return defId;
+ }
+
+ /**
+ * @return
+ */
+ public String getKeyValue() {
+ return keyValue;
+ }
+
+ /**
+ * @return
+ */
+ public String getProcId() {
+ return procId;
+ }
+
+ /**
+ * @return
+ */
+ public String getRootDefId() {
+ return rootDefId;
+ }
+
+ /**
+ * @return
+ */
+ public String getRootProcId() {
+ return rootProcId;
+ }
+
+ /**
+ * @return
+ */
+ public String getStaticKeyValue() {
+ return staticKeyValue;
+ }
+
+ /**
+ * @param string
+ */
+ public void setDefId(String string) {
+ defId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setKeyValue(String string) {
+ keyValue = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setProcId(String string) {
+ procId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setRootDefId(String string) {
+ rootDefId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setRootProcId(String string) {
+ rootProcId = string;
+ }
+
+ /**
+ * @param key
+ */
+ public void setStaticKeyValue(String key) {
+ staticKeyValue = key;
+ }
+
+ /**
+ * @return
+ */
+ public String getOperationId() {
+ return operationId;
+ }
+
+ /**
+ * @param string
+ */
+ public void setOperationId(String string) {
+ operationId = string;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "Registration(staticKeyValue:"+staticKeyValue+
+ " rootDefId:"+rootDefId+
+ " defId:"+defId+
+ " rootProcId:"+rootProcId+
+ " procId:"+procId+
+ " keyValue:"+keyValue+
+ " operationId:"+operationId+")";
+ }
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object obj) {
+ if ( obj instanceof Registration ) {
+ Registration reg = (Registration)obj;
+ if (
+ staticKeyValue == null ?
+ reg.getStaticKeyValue() == null :
+ ( reg.getStaticKeyValue() != null &&
+ reg.getStaticKeyValue().equals(staticKeyValue)) &&
+ rootDefId == null ?
+ reg.getRootDefId() == null :
+ ( reg.getRootDefId() != null &&
+ reg.getRootDefId().equals(rootDefId) )&&
+ defId == null ?
+ reg.getDefId() == null :
+ ( reg.getDefId() != null &&
+ reg.getDefId().equals(defId) ) &&
+ rootProcId == null ?
+ reg.getRootProcId() == null :
+ ( reg.getRootProcId() != null &&
+ reg.getRootProcId().equals(rootProcId) ) &&
+ procId == null ?
+ reg.getProcId() == null :
+ ( reg.getProcId() != null &&
+ reg.getProcId().equals(procId) ) &&
+ keyValue == null ?
+ reg.getKeyValue() == null :
+ ( reg.getKeyValue() != null &&
+ reg.getKeyValue().equals(keyValue) ) &&
+ operationId == null ?
+ reg.getOperationId() == null :
+ ( reg.getOperationId() != null &&
+ reg.getOperationId().equals(operationId) ) ) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/keys/CorrelationKeysUtil.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,156 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+
+package com.sybase.bpe.correlation.keys;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sybase.bpe.action.internal.ActionException;
+import com.sybase.bpe.context.resolver.ContextResolvedObject;
+import com.sybase.bpe.context.resolver.ContextResolver;
+import com.sybase.bpe.correlation.PartQuery;
+import com.sybase.bpe.definition.IPMDCorrelation;
+import com.sybase.bpe.definition.IPMDCorrelationSet;
+import com.sybase.bpe.definition.IPMDOperation;
+import com.sybase.bpe.interaction.IInteraction;
+import com.sybase.bpe.interaction.IInvocation;
+import com.sybase.bpe.interaction.InteractionException;
+import com.sybase.bpe.interaction.InteractionFactory;
+import com.sybase.bpe.util.BPException;
+
+public class CorrelationKeysUtil {
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationKeysUtil.class.getName());
+
+ public static boolean hasCorrelationSets(Collection operations) {
+ boolean ret = false;
+ for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+ IPMDOperation op = (IPMDOperation) opIter.next();
+ IPMDCorrelation correlation = op.getCorrelation();
+ if ( correlation != null &&
+ ! correlation.getCorrelationSets().isEmpty() ) {
+ ret = true;
+ break;
+ }
+ }
+ return ret;
+ }
+
+
+ public static boolean setCorrelationKeys(
+ ContextResolver resolver,
+ Map parts,
+ Collection operations,
+ String pattern)
+ throws BPException {
+
+ boolean initializedSet = false;
+
+ logger.fine("Trying to Initailize a correlation for set.");
+
+ for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+ IPMDOperation op = (IPMDOperation) opIter.next();
+
+ IPMDCorrelation correlation = op.getCorrelation();
+
+ if (correlation == null)
+ continue;
+
+
+ // loop over all the correlation sets and initialize them if uninitalized
+ for (Iterator csIter = correlation.getCorrelationSets().iterator();
+ csIter.hasNext();
+ ) {
+
+ IPMDCorrelationSet cs = (IPMDCorrelationSet) csIter.next();
+
+ if (cs.isInstantiating()) {
+
+ if ( pattern == null || cs.getPattern().equals(pattern) ) {
+
+ for (Iterator pqsIter = cs.getPartQueries().iterator(); pqsIter
+ .hasNext();) {
+
+ PartQuery pq = (PartQuery) pqsIter.next();
+
+ ContextResolvedObject robj = (ContextResolvedObject) resolver
+ .resolveBPContext(pq.getLocatorName());
+
+ // if the correlation has been set then break the set loop
+ if (robj.getValue() != null)
+ break;
+
+ // check to see if the message event has a maching part
+ initializedSet = true;
+ Object part = parts.get(pq.getPartName());
+ if (part != null) {
+ robj.setObject(getPartData(part, pq));
+ }
+ }
+ }
+ }
+ }
+ }
+ return initializedSet;
+ }
+
+
+ private static Object getPartData(Object inputObj, PartQuery pq)
+ throws InteractionException, ActionException {
+
+ IInteraction interaction = null;
+
+ if (inputObj instanceof String) {
+ String part = (String) inputObj;
+ // create a new interaction
+ interaction =
+ InteractionFactory.newInstance().createXMLInteraction(
+ part.getBytes());
+ } else if (inputObj instanceof IInteraction) {
+ interaction = (IInteraction) inputObj;
+ } else {
+ ActionException bpx = new ActionException("UNKNOWN_DATATYPE", null);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ // create an invocation
+ IInvocation invocation = ( IInvocation ) ( pq.getQuery());
+
+ // invoke the interaction
+ Object resultObject = null;
+ if ( invocation != null ) {
+ resultObject = interaction.invoke(invocation);
+ } else {
+ resultObject = interaction.toString();
+ }
+
+ return resultObject;
+}
+
+
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java Tue Feb 28 08:02:48 2006
@@ -0,0 +1,483 @@
+/*
+* Confidential property of Sybase, Inc.
+*
+* Copyright 1987 - 2006.
+*
+* Sybase, Inc. All rights reserved.
+*
+* Unpublished rights reserved under U.S. copyright laws.
+*
+* This software contains confidential and trade secret information
+* of Sybase, Inc. Use, duplication or disclosure of the software and
+* documentation by the U.S. Government is subject to restrictions
+* set forth in a license agreement between the Government and Sybase,
+* Inc. or other written agreement specifying the Government's rights
+* to use the software and any applicable FAR provisions, for example,
+* FAR 52.227-19.
+*
+* Sybase, Inc. One Sybase Drive, Dublin, CA 94568, USA
+*
+* http://www.sybase.com
+*/
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package com.sybase.bpe.correlation.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ejb.CreateException;
+import javax.ejb.EJBException;
+import javax.ejb.FinderException;
+import javax.ejb.ObjectNotFoundException;
+import javax.ejb.RemoveException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.sybase.bpe.correlation.CorrelationService;
+import com.sybase.bpe.correlation.CorrelationServiceException;
+import com.sybase.bpe.correlation.Registration;
+import com.sybase.bpe.correlation.managed.RegistrationEntityLocal;
+import com.sybase.bpe.correlation.managed.RegistrationEntityLocalHome;
+import com.sybase.bpe.correlation.unmanaged.CorrelationServiceSLImpl;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.IPMIProcess;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+/**
+ * @author charper
+ *
+ * <bkl>
+ * I added an in-memory caching layer to this service so that
+ * cmp registration operations could be bundled up at the
+ * end of the transaction rather than interspersed throughout.
+ * This bundling reduces the deadlock hazard.
+ * </bkl>
+ *
+ */
+public class CorrelationServiceEjbImpl extends CorrelationService
+{
+
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationServiceEjbImpl.class.getName());
+
+ private LinkedList registrationOperations = new LinkedList();
+ private LinkedList registrationsForRemoval = new LinkedList();
+ private ProcessService ps;
+ private boolean isBPELCompliant;
+ private RegistrationEntityLocalHome relh;
+ public static final String JNDI_RELH_NAME = "java:comp/env/registrationBean";
+ private CorrelationServiceSLImpl transientCS;
+
+/*
+ private void trace( String message )
+ {
+ System.out.println("CSTrace - " + message + " iid=" + this.hashCode());
+ System.out.flush();
+ }
+
+ private void trace( String message, Object obj )
+ {
+ trace(message);
+
+ XStream xstream = new XStream();
+ String xmlString = xstream.toXML(obj);
+ System.out.println(xmlString);
+ System.out.flush();
+
+ }
+*/
+ public CorrelationServiceEjbImpl()
+ {
+ //trace( "Inside CorrelationServiceEjbImpl()");
+ transientCS = new CorrelationServiceSLImpl();
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#createRegistration(com.sybase.bpe.correlation.Registration)
+ */
+ public void createRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside createRegistration(Registration registration)", registration);
+ CreateRegistration cr = new CreateRegistration();
+ cr.reg = registration;
+ registrationOperations.add(cr);
+ transientCS.createRegistration(registration);
+ }
+
+ private void createRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+
+ //trace( "Inside createRegistrationEntity(Registration registration)", registration );
+ try {
+ relh.create(
+ registration.getStaticKeyValue(),
+ registration.getRootDefId(),
+ registration.getDefId(),
+ registration.getRootProcId(),
+ registration.getProcId(),
+ registration.getKeyValue(),
+ registration.getOperationId());
+ } catch (CreateException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"CreateException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey, java.util.Collection)
+ */
+ public Collection getRegistrations(IStaticKey key, Collection keyValues)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key, Collection keyValues)",
+ // new Object[] {key, keyValues});
+ Collection regs = new HashSet();
+ Iterator it = keyValues.iterator();
+ while (it.hasNext()){
+ String keyValue = (String)it.next();
+ Collection beans;
+ try {
+ // if the key value is null look for all registration
+ // with the given static key, the correlation service
+ // will decide how to handle this if there are multiple
+ // registrations found
+ if ( keyValue == null ) {
+ beans =
+ relh.findByStaticKey(key.toString());
+ } else {
+ beans =
+ relh.findByStaticKeyandKeyValue(key.toString(), keyValue);
+ }
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ Collection regObjs = makeRegistrations(key.toString(),beans);
+ for (Iterator iter = regObjs.iterator(); iter.hasNext();) {
+ Object element = (Object) iter.next();
+ boolean dup = false;
+ for (Iterator iterator = regs.iterator(); iterator
+ .hasNext();) {
+ Object reg = (Object) iterator.next();
+ if ( element.equals(reg)) {
+ dup = true;
+ break;
+ }
+ }
+ if ( ! dup ) {
+ regs.add(element);
+ }
+ }
+
+
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll(transientCS.getRegistrations(key, keyValues));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.instance.service.IPMIProcess)
+ */
+ public Collection getRegistrations(IPMIProcess key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IPMIProcess key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByProcId(key.getKey());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.getKey(),beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll( transientCS.getRegistrations(key));
+ return regs;
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey)
+ */
+ public Collection getRegistrations(IStaticKey key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByStaticKey(key.toString());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.toString(), beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ Collection transientRegs = transientCS.getRegistrations(key);
+ //trace("Adding transient regs", transientRegs);
+ regs.addAll( transientRegs );
+ return regs;
+
+ }
+
+ // Called by the various getRegistrations methods.
+ // If the registrations have been removed but the removal
+ // has not been persistent yet, don't report the registration.
+ private Collection screenRegs( Collection regs )
+ {
+ LinkedList retVal = new LinkedList();
+ Iterator iter = regs.iterator();
+ while( iter.hasNext() )
+ {
+ Registration candidate = ( Registration )( iter.next());
+ if ( !( registrationsForRemoval.contains( candidate) ) )
+ {
+ retVal.add(candidate);
+ }
+ }
+ return retVal;
+ }
+
+
+ private Collection makeRegistrations(String key, Collection beans){
+ //trace( "Inside makeRegistrations(String key, Collection beans)",
+ // new Object[]{key, beans});
+ ArrayList regs = new ArrayList ();
+ Iterator it = beans.iterator();
+ while ( it.hasNext() ){
+ RegistrationEntityLocal bean = (RegistrationEntityLocal)it.next();
+ regs.add(
+ new Registration(
+ key,
+ bean.getOperationId(),
+ bean.getRootDefId(),
+ bean.getDefId(),
+ bean.getRootProcId(),
+ bean.getProcId(),
+ bean.getKeyValue()
+ ));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#update()
+ */
+ public void update() throws BPException {
+ //trace( "Inside update()");
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getProcessService()
+ */
+ public ProcessService getProcessService() throws BPException {
+ //trace( "Inside getProcessService()");
+ return ps;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#isBPELCompliant()
+ */
+ public boolean isBPELCompliant() {
+ //trace( "Inside isBPELCompliant()");
+ return isBPELCompliant;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#init(com.sybase.bpe.util.BPEProperties, com.sybase.bpe.engine.ProcessService)
+ */
+ public void init(BPEProperties props, ProcessService ps)
+ throws CorrelationServiceException {
+ //trace( "Inside init(BPEProperties props, ProcessService ps)", new Object[]{props, ps});
+ this.ps = ps;
+ if (props.getBPELCompliant().equals("TRUE")) {
+ this.isBPELCompliant = true;
+ }
+
+ try {
+ InitialContext ic = new InitialContext();
+ relh = (RegistrationEntityLocalHome) ic.lookup(JNDI_RELH_NAME);
+ } catch (NamingException ne) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"NamingException"},ne);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistration(com.sybase.bpe.event.IStaticKey, java.lang.String, java.lang.String)
+ */
+ public Registration getRegistration(IStaticKey key, String operationId, String processId) throws CorrelationServiceException {
+ //trace( "Inside getRegistration(IStaticKey key, String operationId, String processId)",
+ // new Object[]{key, operationId, processId});
+ Collection beans;
+ try {
+ beans = relh.findByStaticKeyandOpIdandProcId(key.toString(),operationId,processId);
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Iterator it = makeRegistrations(key.toString(),beans).iterator();
+
+ Registration retVal = null;
+
+ // there should only be one of these
+ if ( it.hasNext() )
+ {
+ retVal = (Registration)it.next();
+
+ //The registration could already have been
+ //deleted but the deletion may not have been
+ //persisted yet. Consult the removal list to
+ //find out.
+ //trace( "Checking transient registration removal list.");
+ if ( registrationsForRemoval.contains(retVal))
+ {
+ //trace("Transient deletion detected.");
+ retVal = null;
+ }
+ }
+
+
+ if ( retVal == null )
+ {
+ //There could be a registration which has not been persisted yet.
+ //Consult the transient correlation service to find out.
+ retVal = transientCS.getRegistration(key, operationId, processId);
+ if ( retVal != null )
+ {
+ //trace("Transient registration found.");
+ }
+ }
+
+ return retVal;
+ }
+
+ public void removeRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside removeRegistration(Registration registration)", registration);
+ RemoveRegistration rr = new RemoveRegistration();
+ rr.reg = registration;
+ registrationOperations.add(rr);
+ transientCS.removeRegistration(registration);
+ registrationsForRemoval.add(registration);
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#removeRegistration(com.sybase.bpe.correlation.Registration)
+ */
+ protected void removeRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+ //trace( "Inside removeRegistrationEntity(Registration registration)", registration);
+ try {
+ RegistrationEntityPK pk = new RegistrationEntityPK();
+ pk.setOperationId(registration.getOperationId());
+ pk.setProcId(registration.getProcId());
+ pk.setRootDefId(registration.getRootDefId());
+ pk.setRootProcId(registration.getRootProcId());
+ pk.setStaticKeyValue(registration.getStaticKeyValue());
+ RegistrationEntityLocal bean = relh.findByPrimaryKey(pk);
+ bean.remove();
+ } catch ( ObjectNotFoundException e ) {
+ // ignore requests to remove regstrations that are not there
+ } catch (FinderException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (EJBException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (RemoveException e) {
+ logger.log(Level.WARNING,"",e);
+ }
+ }
+
+ // Delete old regs and insert new regs. This method
+ // was added so that the removal of the registrations from the
+ // database could be sequenced at the end of the transaction
+ // rather than interspersed throughout the transaction causing
+ // a deadlock hazard.
+ public void persistRegistrationChanges() throws CorrelationServiceException
+ {
+ //trace( "Inside persistRegistrationChanges()");
+ Iterator iter = registrationOperations.iterator();
+ while( iter.hasNext() )
+ {
+ Object obj = iter.next();
+ if ( obj instanceof RemoveRegistration )
+ {
+ RemoveRegistration rr =
+ (RemoveRegistration)(obj);
+ removeRegistrationEntity( rr.reg );
+ }
+ if ( obj instanceof CreateRegistration )
+ {
+ CreateRegistration cr =
+ (CreateRegistration)(obj);
+ createRegistrationEntity( cr.reg );
+ }
+ }
+ clearState();
+
+ /*
+ Iterator iter = registrationsForRemoval.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ removeRegistrationEntity( reg );
+ }
+ registrationsForRemoval.clear();
+
+ iter = newRegistrations.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ createRegistrationEntity( reg );
+ }
+ newRegistrations.clear();
+ */
+ }
+
+ public void clearState()
+ {
+ //trace( "Inside clearState()");
+ registrationsForRemoval.clear();
+ registrationOperations.clear();
+ //newRegistrations.clear();
+ transientCS = new CorrelationServiceSLImpl();
+ }
+
+ private class RemoveRegistration
+ {
+ public Registration reg;
+ }
+
+ private class CreateRegistration
+ {
+ public Registration reg;
+ }
+}
Added: incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep?rev=381686&view=auto
==============================================================================
--- incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep (added)
+++ incubator/ode/scratch/bpe/src/main/java/com/sybase/bpe/correlation/managed/CorrelationServiceEjbImpl.java.keep Tue Feb 28 08:02:48 2006
@@ -0,0 +1,644 @@
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package com.sybase.bpe.correlation.ejbimpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ejb.CreateException;
+import javax.ejb.EJBException;
+import javax.ejb.FinderException;
+import javax.ejb.ObjectNotFoundException;
+import javax.ejb.RemoveException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.sybase.bpe.context.IContextService;
+import com.sybase.bpe.context.base.IDataObjectFactory;
+import com.sybase.bpe.context.ejb.EJBDataObjectFactory;
+import com.sybase.bpe.context.persistent.PersistentContextService;
+import com.sybase.bpe.correlation.CorrelationService;
+import com.sybase.bpe.correlation.CorrelationServiceException;
+import com.sybase.bpe.correlation.CorrelationServiceSLImpl;
+import com.sybase.bpe.correlation.Registration;
+import com.sybase.bpe.engine.ProcessService;
+import com.sybase.bpe.event.IStaticKey;
+import com.sybase.bpe.instance.service.IPMIProcess;
+import com.sybase.bpe.util.BPEProperties;
+import com.sybase.bpe.util.BPException;
+
+/**
+ * @author charper
+ *
+ * <bkl>
+ * I added an in-memory caching layer to this service so that
+ * cmp registration operations could be bundled up at the
+ * end of the transaction rather than interspersed throughout.
+ * This bundling reduces the deadlock hazard.
+ * </bkl>
+ * <charper>
+ * Deadlock occur when doing a BP to BP invoke in the same thread/tx so,
+ * the in-memory chaching layer is now on a per thread basis. This will
+ * allow the correlation service to bundle registration opperations
+ * per thread/tx.
+ * </charper>
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CorrelationServiceEjbImpl extends CorrelationService
+{
+
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationServiceEjbImpl.class.getName());
+
+ //private LinkedList registrationOperations = new LinkedList();
+ //private LinkedList registrationsForRemoval = new LinkedList();
+ private static Map<Thread,LinkedList<RegistrationType>> registrationOperationsByThread =
+ Collections.synchronizedMap(new HashMap<Thread,LinkedList<RegistrationType>>());
+ private static Map<Thread,LinkedList<Registration>> registrationsForRemovalByThread =
+ Collections.synchronizedMap(new HashMap<Thread,LinkedList<Registration>>());
+ private ProcessService ps;
+ private boolean isBPELCompliant;
+ private RegistrationEntityLocalHome relh;
+ public static final String JNDI_RELH_NAME = "java:comp/env/registrationBean";
+ private static Map<Thread,CorrelationServiceSLImpl> transientCSByThread =
+ Collections.synchronizedMap(new HashMap<Thread,CorrelationServiceSLImpl>());
+ private static Map<Thread,Integer> nestedCorrelationSerivceInvokesByThreadId =
+ Collections.synchronizedMap(new HashMap<Thread,Integer>());
+
+ private void addRegistrationOperation(RegistrationType reg) {
+ LinkedList<RegistrationType> regOps =
+ registrationOperationsByThread.get(Thread.currentThread());
+ if ( regOps == null ) {
+ regOps = new LinkedList<RegistrationType>();
+ registrationOperationsByThread.put(Thread.currentThread(),regOps);
+ }
+ regOps.add(reg);
+ }
+
+ private Collection<RegistrationType> getRegistrationOperations() {
+ Collection<RegistrationType> regOps = registrationOperationsByThread.get(Thread.currentThread());
+ if ( regOps == null ) {
+ return new LinkedList<RegistrationType>();
+ } else {
+ return regOps;
+ }
+ }
+
+ private void addRegistrationOperationForRemoval(Registration reg) {
+ LinkedList<Registration> regs =
+ registrationsForRemovalByThread.get(Thread.currentThread());
+ if ( regs == null ) {
+ regs = new LinkedList<Registration>();
+ registrationsForRemovalByThread.put(Thread.currentThread(),regs);
+ }
+ regs.add(reg);
+ }
+
+ private Collection<Registration> getRegistrationOperationsForRemoval() {
+ Collection<Registration> regs = registrationsForRemovalByThread.get(Thread.currentThread());
+ if ( regs == null ) {
+ return new LinkedList<Registration>();
+ } else {
+ return regs;
+ }
+ }
+
+ private int getNestedCorrelationCount() {
+ Integer count = nestedCorrelationSerivceInvokesByThreadId.
+ get(Thread.currentThread());
+ if ( count == null ) {
+ return 0;
+ } else {
+ return count.intValue();
+ }
+ }
+ private void setNestedCorrelationCount(int val) {
+ nestedCorrelationSerivceInvokesByThreadId.
+ put(Thread.currentThread(),new Integer(val));
+ }
+ private void clearNestedCorreclationCount() {
+ nestedCorrelationSerivceInvokesByThreadId.
+ remove(Thread.currentThread());
+ }
+ public void incNestedCorrelation() {
+ int count = getNestedCorrelationCount();
+ count++;
+ setNestedCorrelationCount(count);
+ }
+
+ public int decNestedCorrelation() {
+ int count = getNestedCorrelationCount();
+ count--;
+ setNestedCorrelationCount(count);
+ return count;
+
+ }
+ private CorrelationServiceSLImpl getCachedTransientCS() {
+ CorrelationServiceSLImpl cs = transientCSByThread.get(Thread.currentThread());
+ if ( cs == null ) {
+ return createCachedCS();
+ } else {
+ return cs;
+ }
+
+ }
+ private void clearCachedCS() {
+ transientCSByThread.remove(Thread.currentThread());
+ }
+
+ private CorrelationServiceSLImpl createCachedCS() {
+ CorrelationServiceSLImpl cs = new CorrelationServiceSLImpl();
+ transientCSByThread.put(Thread.currentThread(),cs);
+ return cs;
+ }
+
+/*
+ private void trace( String message )
+ {
+ System.out.println("CSTrace - " + message + " iid=" + this.hashCode());
+ System.out.flush();
+ }
+
+ private void trace( String message, Object obj )
+ {
+ trace(message);
+
+ XStream xstream = new XStream();
+ String xmlString = xstream.toXML(obj);
+ System.out.println(xmlString);
+ System.out.flush();
+
+ }
+*/
+ public CorrelationServiceEjbImpl()
+ {
+ //trace( "Inside CorrelationServiceEjbImpl()");
+ //transientCS = new CorrelationServiceSLImpl();
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#createRegistration(com.sybase.bpe.correlation.Registration)
+ */
+ public void createRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside createRegistration(Registration registration)", registration);
+ CreateRegistration cr = new CreateRegistration();
+ cr.reg = registration;
+ //registrationOperations.add(cr);
+ addRegistrationOperation(cr);
+ getCachedTransientCS().createRegistration(registration);
+ }
+
+ private void createRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+
+ //trace( "Inside createRegistrationEntity(Registration registration)", registration );
+ try {
+ relh.create(
+ registration.getStaticKeyValue(),
+ registration.getRootDefId(),
+ registration.getDefId(),
+ registration.getRootProcId(),
+ registration.getProcId(),
+ registration.getKeyValue(),
+ registration.getOperationId());
+ } catch (CreateException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"CreateException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey, java.util.Collection)
+ */
+ public Collection getRegistrations(IStaticKey key, Collection keyValues)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key, Collection keyValues)",
+ // new Object[] {key, keyValues});
+ Collection regs = new HashSet();
+ Iterator it = keyValues.iterator();
+ while (it.hasNext()){
+ String keyValue = (String)it.next();
+ Collection beans;
+ try {
+ // if the key value is null look for all registration
+ // with the given static key, the correlation service
+ // will decide how to handle this if there are multiple
+ // registrations found
+ if ( keyValue == null ) {
+ beans =
+ relh.findByStaticKey(key.toString());
+ } else {
+ beans =
+ relh.findByStaticKeyandKeyValue(key.toString(), keyValue);
+ }
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ Collection regObjs = makeRegistrations(key.toString(),beans);
+ for (Iterator iter = regObjs.iterator(); iter.hasNext();) {
+ Object element = (Object) iter.next();
+ boolean dup = false;
+ for (Iterator iterator = regs.iterator(); iterator
+ .hasNext();) {
+ Object reg = (Object) iterator.next();
+ if ( element.equals(reg)) {
+ dup = true;
+ break;
+ }
+ }
+ if ( ! dup ) {
+ regs.add(element);
+ }
+ }
+
+
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll(getCachedTransientCS().getRegistrations(key, keyValues));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.instance.service.IPMIProcess)
+ */
+ public Collection getRegistrations(IPMIProcess key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IPMIProcess key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByProcId(key.getKey());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.getKey(),beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll( getCachedTransientCS().getRegistrations(key));
+ return regs;
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistrations(com.sybase.bpe.event.IStaticKey)
+ */
+ public Collection getRegistrations(IStaticKey key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByStaticKey(key.toString());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.toString(), beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ Collection transientRegs = getCachedTransientCS().getRegistrations(key);
+ //trace("Adding transient regs", transientRegs);
+ regs.addAll( transientRegs );
+ return regs;
+
+ }
+
+ // Called by the various getRegistrations methods.
+ // If the registrations have been removed but the removal
+ // has not been persistent yet, don't report the registration.
+ private Collection screenRegs( Collection regs )
+ {
+ Collection retVal = new HashSet();
+ Iterator iter = regs.iterator();
+ while( iter.hasNext() )
+ {
+ Registration candidate = ( Registration )( iter.next());
+ //if ( !( registrationsForRemoval.contains( candidate) ) )
+ if ( !( getRegistrationOperationsForRemoval().contains( candidate) ) )
+ {
+ retVal.add(candidate);
+ }
+ }
+ return retVal;
+ }
+
+
+ private Collection makeRegistrations(String key, Collection beans){
+ //trace( "Inside makeRegistrations(String key, Collection beans)",
+ // new Object[]{key, beans});
+ ArrayList regs = new ArrayList ();
+ Iterator it = beans.iterator();
+ while ( it.hasNext() ){
+ RegistrationEntityLocal bean = (RegistrationEntityLocal)it.next();
+ regs.add(
+ new Registration(
+ key,
+ bean.getOperationId(),
+ bean.getRootDefId(),
+ bean.getDefId(),
+ bean.getRootProcId(),
+ bean.getProcId(),
+ bean.getKeyValue()
+ ));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#update()
+ */
+ public void update() throws BPException {
+ //trace( "Inside update()");
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getProcessService()
+ */
+ public ProcessService getProcessService() throws BPException {
+ //trace( "Inside getProcessService()");
+ return ps;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#isBPELCompliant()
+ */
+ public boolean isBPELCompliant() {
+ //trace( "Inside isBPELCompliant()");
+ return isBPELCompliant;
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#init(com.sybase.bpe.util.BPEProperties, com.sybase.bpe.engine.ProcessService)
+ */
+ public void init(BPEProperties props, ProcessService ps)
+ throws CorrelationServiceException {
+ //trace( "Inside init(BPEProperties props, ProcessService ps)", new Object[]{props, ps});
+ this.ps = ps;
+ if (props.getBPELCompliant().equals("TRUE")) {
+ this.isBPELCompliant = true;
+ }
+
+ try {
+ InitialContext ic = new InitialContext();
+ relh = (RegistrationEntityLocalHome) ic.lookup(JNDI_RELH_NAME);
+ } catch (NamingException ne) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"NamingException"},ne);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#getRegistration(com.sybase.bpe.event.IStaticKey, java.lang.String, java.lang.String)
+ */
+ public Registration getRegistration(IStaticKey key, String operationId, String processId) throws CorrelationServiceException {
+ //trace( "Inside getRegistration(IStaticKey key, String operationId, String processId)",
+ // new Object[]{key, operationId, processId});
+ Collection beans;
+ try {
+ beans = relh.findByStaticKeyandOpIdandProcId(key.toString(),operationId,processId);
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Iterator it = makeRegistrations(key.toString(),beans).iterator();
+
+ Registration retVal = null;
+
+ // there should only be one of these
+ if ( it.hasNext() )
+ {
+ retVal = (Registration)it.next();
+
+ //The registration could already have been
+ //deleted but the deletion may not have been
+ //persisted yet. Consult the removal list to
+ //find out.
+ //trace( "Checking transient registration removal list.");
+ //if ( registrationsForRemoval.contains(retVal))
+ if ( getRegistrationOperationsForRemoval().contains(retVal))
+ {
+ //trace("Transient deletion detected.");
+ retVal = null;
+ }
+ }
+
+
+ if ( retVal == null )
+ {
+ //There could be a registration which has not been persisted yet.
+ //Consult the transient correlation service to find out.
+ retVal = getCachedTransientCS().getRegistration(key, operationId, processId);
+ if ( retVal != null )
+ {
+ //trace("Transient registration found.");
+ }
+ }
+
+ return retVal;
+ }
+
+ public void removeRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside removeRegistration(Registration registration)", registration);
+ RemoveRegistration rr = new RemoveRegistration();
+ rr.reg = registration;
+ //registrationOperations.add(rr);
+ addRegistrationOperation(rr);
+ getCachedTransientCS().removeRegistration(registration);
+ //registrationsForRemoval.add(registration);
+ addRegistrationOperationForRemoval(registration);
+ }
+
+ /* (non-Javadoc)
+ * @see com.sybase.bpe.correlation.CorrelationService#removeRegistration(com.sybase.bpe.correlation.Registration)
+ */
+ protected void removeRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+ //trace( "Inside removeRegistrationEntity(Registration registration)", registration);
+ try {
+ RegistrationEntityPK pk = new RegistrationEntityPK();
+ pk.setOperationId(registration.getOperationId());
+ pk.setProcId(registration.getProcId());
+ pk.setRootDefId(registration.getRootDefId());
+ pk.setRootProcId(registration.getRootProcId());
+ pk.setStaticKeyValue(registration.getStaticKeyValue());
+ RegistrationEntityLocal bean = relh.findByPrimaryKey(pk);
+ bean.remove();
+ } catch ( ObjectNotFoundException e ) {
+ // ignore requests to remove regstrations that are not there
+ } catch (FinderException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (EJBException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (RemoveException e) {
+ logger.log(Level.WARNING,"",e);
+ }
+ }
+
+ private void doContextDeletes(String key) throws CorrelationServiceException {
+
+ try {
+ IContextService cs = ps.getContextService(key);
+ if (cs instanceof PersistentContextService) {
+ PersistentContextService pcs = (PersistentContextService) cs;
+ IDataObjectFactory dof = pcs.getPersistentDataObjectFactory();
+ if ( dof instanceof EJBDataObjectFactory ) {
+ EJBDataObjectFactory ejbdof = (EJBDataObjectFactory)dof;
+ ejbdof.removeObjects();
+ }
+ }
+ } catch (Exception e) {
+ throw new CorrelationServiceException(e.getLocalizedMessage(),new Object[]{},e);
+ }
+
+ }
+
+ private void clearContextDeletes(String key) {
+
+ try {
+ IContextService cs = ps.getContextService(key);
+ if (cs instanceof PersistentContextService) {
+ PersistentContextService pcs = (PersistentContextService) cs;
+ IDataObjectFactory dof = pcs.getPersistentDataObjectFactory();
+ if ( dof instanceof EJBDataObjectFactory ) {
+ EJBDataObjectFactory ejbdof = (EJBDataObjectFactory)dof;
+ ejbdof.clearRemoveObject();
+ }
+ }
+ } catch (Exception e) {
+ // problem cleaning up
+ logger.log(Level.SEVERE,e.getLocalizedMessage(),e);
+ }
+
+ }
+
+ // Delete old regs and insert new regs. This method
+ // was added so that the removal of the registrations from the
+ // database could be sequenced at the end of the transaction
+ // rather than interspersed throughout the transaction causing
+ // a deadlock hazard.
+ public void persistRegistrationChanges() throws CorrelationServiceException
+ {
+
+ //trace( "Inside persistRegistrationChanges()");
+ //Iterator iter = registrationOperations.iterator();
+ //while( iter.hasNext() )
+ for (RegistrationType obj : getRegistrationOperations())
+ {
+ //Object obj = iter.next();
+ if ( obj instanceof RemoveRegistration )
+ {
+ RemoveRegistration rr =
+ (RemoveRegistration)(obj);
+
+ doContextDeletes(rr.reg.getRootProcId());
+
+ removeRegistrationEntity( rr.reg );
+ }
+ if ( obj instanceof CreateRegistration )
+ {
+ CreateRegistration cr =
+ (CreateRegistration)(obj);
+
+ doContextDeletes(cr.reg.getRootProcId());
+
+ createRegistrationEntity( cr.reg );
+ }
+ }
+ clearState();
+
+ /*
+ Iterator iter = registrationsForRemoval.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ removeRegistrationEntity( reg );
+ }
+ registrationsForRemoval.clear();
+
+ iter = newRegistrations.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ createRegistrationEntity( reg );
+ }
+ newRegistrations.clear();
+ */
+ }
+
+ public void clearState()
+ {
+ //trace( "Inside clearState()");
+ //registrationsForRemoval.clear();
+ //registrationOperations.clear();
+ Collection<RegistrationType> regtypes = getRegistrationOperations();
+ for ( RegistrationType regt : regtypes ) {
+ if ( regt instanceof RemoveRegistration )
+ {
+ RemoveRegistration rr =
+ (RemoveRegistration)(regt);
+ clearContextDeletes(rr.reg.getRootProcId());
+ }
+ if ( regt instanceof CreateRegistration )
+ {
+ CreateRegistration cr =
+ (CreateRegistration)(regt);
+ clearContextDeletes(cr.reg.getRootProcId());
+ }
+ }
+ regtypes.clear();
+
+ Collection<Registration> regs = getRegistrationOperationsForRemoval();
+ for ( Registration reg : regs ) {
+ clearContextDeletes(reg.getRootProcId());
+ }
+ regs.clear();
+
+ //newRegistrations.clear();
+ clearCachedCS();
+ clearNestedCorreclationCount();
+ }
+
+ private interface RegistrationType {}
+
+ private class RemoveRegistration implements RegistrationType
+ {
+ public Registration reg;
+ }
+
+ private class CreateRegistration implements RegistrationType
+ {
+ public Registration reg;
+ }
+}