You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ad...@apache.org on 2006/02/28 17:35:26 UTC
svn commit: r381694 [20/38] - in /incubator/ode/scratch: bpe/ ode/
ode/bpelTests/ ode/bpelTests/probeService/ ode/bpelTests/test1/
ode/bpelTests/test10/ ode/bpelTests/test12/ ode/bpelTests/test13/
ode/bpelTests/test14/ ode/bpelTests/test15/ ode/bpelTes...
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationService.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationService.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationService.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationService.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,817 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.ode.correlation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.ode.bped.IInternalEventDirector;
+import org.apache.ode.context.IContainer;
+import org.apache.ode.context.IContextService;
+import org.apache.ode.context.resolver.ContextResolver;
+import org.apache.ode.context.resolver.IResolvedObject;
+import org.apache.ode.context.test.SerializationContext;
+import org.apache.ode.definition.IPMDCorrelationSet;
+import org.apache.ode.definition.IPMDLocator;
+import org.apache.ode.definition.IPMDOperation;
+import org.apache.ode.definition.IPMDProcess;
+import org.apache.ode.definition.IPMDRoot;
+import org.apache.ode.definition.service.DefinitionService;
+import org.apache.ode.engine.ProcessDefinitionKey;
+import org.apache.ode.engine.ProcessInstance;
+import org.apache.ode.engine.ProcessService;
+import org.apache.ode.engine.ReturnMessageLocatorHolder;
+import org.apache.ode.engine.StateEnum;
+import org.apache.ode.event.Fault;
+import org.apache.ode.event.IRequestMessageEvent;
+import org.apache.ode.event.IResponseMessage;
+import org.apache.ode.event.IStaticKey;
+import org.apache.ode.instance.IPMIProcess;
+import org.apache.ode.instance.service.InstanceService;
+import org.apache.ode.interaction.IInteraction;
+import org.apache.ode.interaction.IInvocation;
+import org.apache.ode.interaction.InteractionFactory;
+import org.apache.ode.lang.ResourceGetter;
+import org.apache.ode.scope.service.BPRuntimeException;
+import org.apache.ode.scope.service.ScopePath;
+import org.apache.ode.util.BPEProperties;
+import org.apache.ode.util.BPException;
+import org.apache.ode.event.BPELStaticKey;
+import org.apache.ode.interaction.Invocation;
+import org.apache.ode.interaction.query.JaxenXPathSingleNodeQuery;
+
+/**
+ * A CorrelationService implementation will allow processes to register
+ * correlations Underneath the CorrelationService is a repostory that stores these
+ * correlations. CorrelationServcies are obtained from the
+ * CorrelationServiceFactory.getCorrelationService() function.
+ * @see org.apache.ode.correlation.CorrelationServcieFactory
+ * @see org.apache.ode.correlation.StaticRegistration
+ */
+public abstract class CorrelationService {
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationService.class.getName());
+
+
+ /**
+ * Create a registration in the repostitory.
+ * @param regisration
+ * @throws BPException
+ */
+ public abstract void createRegistration(Registration regisration)
+ throws BPException;
+
+ /**
+ * Remove a registration from the repository.
+ * @param registration
+ */
+ public abstract void removeRegistration(Registration registration)
+ throws BPException;
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @param keyValues
+ * @return
+ * @throws BPException
+ */
+ public abstract Collection getRegistrations(
+ IStaticKey key,
+ Collection keyValues)
+ throws BPException;
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+
+ public abstract Collection getRegistrations(IPMIProcess key)
+ throws BPException;
+
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+
+ public abstract Collection getRegistrations(IStaticKey key)
+ throws BPException;
+
+
+ /**
+ * Find a list of registrations.
+ * @param key
+ * @return
+ * @throws BPException
+ */
+ public abstract Registration getRegistration(IStaticKey key, String operationId,
+ String processId)
+ throws BPException;
+
+ // lock on a key
+ public void lock(IInternalEventDirector ed, String key) throws BPException {
+ ed.getLockingService().lock(key);
+ }
+
+ // update the correlation service
+ public abstract void update() throws BPException;
+
+ public abstract ProcessService getProcessService() throws BPException;
+
+ public abstract boolean isBPELCompliant();
+
+ /**
+ * Send the message to a particular registration.
+ * @param registration
+ * @param me
+ * @param sync
+ * @param ed
+ * @return
+ */
+ public void routeToRegistration(
+ Registration registration,
+ IRequestMessageEvent me,
+ IInternalEventDirector ed) throws BPException {
+
+
+
+ String rootDefId = registration.getRootDefId();
+ String defId = registration.getDefId();
+ String rootProcId = registration.getRootProcId();
+ String procId = registration.getProcId();
+ String opId = registration.getOperationId();
+
+
+ DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+ IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(rootDefId));
+ IPMDOperation op = root.getOperation(opId);
+
+
+ ProcessInstance pi = null;
+ // we are sending this to an existing process instance
+ if ( ! op.isInstanceCreating() ) {
+ pi = getProcessService().lookupProcess( rootProcId, procId );
+
+ // we are making a new process instance
+ } else {
+ // get the defintion to clone
+
+ ProcessDefinitionKey rpdk = new ProcessDefinitionKey(rootDefId);
+ ProcessDefinitionKey pdk = new ProcessDefinitionKey(defId);
+ IPMDProcess ipmd = ds.getProcessDefintion(pdk,rpdk);
+ // get the parent process
+ IPMIProcess ipmi = getProcessService().getInstanceService().getInstance(
+ rootProcId,rootProcId);
+ // create the process
+ pi = getProcessService().createSubProcess(ipmi,ipmd);
+ pi.setState(StateEnum.STARTED);
+ // since we are creating the proc off the root
+ // we have to set the scope path to the scope path
+ // of the process that made the registration
+ ProcessInstance regProc = getProcessService().lookupProcess( rootProcId, procId );
+ pi.setScopePath(regProc.getScopePath());
+
+ }
+
+ // send the message
+ pi.processEvent(me,ed, ed);
+
+ }
+
+ /**
+ * Send the message to a particular registrations.
+ * @param operation
+ * @param me
+ * @param sync
+ * @param ed
+ * @return
+ */
+ public void createInstanceAndRoute(
+ IPMDOperation operation,
+ IRequestMessageEvent me,
+ IInternalEventDirector ed) throws BPException {
+
+ // creating root definition
+ ProcessDefinitionKey rpdk = new ProcessDefinitionKey(operation.getRootDefId());
+ ProcessInstance pi = getProcessService().createProcess(rpdk);
+ pi.setState(StateEnum.STARTED);
+
+ // create the root context partition
+ IContextService ctxs = pi.getContextService();
+ IContainer root = ctxs.getRoot();
+ IContainer procroot = (IContainer)root.findChild(pi.getRootKey());
+ if ( procroot == null ) {
+ root.createContainer(pi.getRootKey());
+ }
+
+ // send the message
+ pi.processEvent(me,ed, ed);
+ }
+
+ /**
+ * Initialize the CorrelationService.
+ * @param props Initialization properties.
+ */
+ public abstract void init(BPEProperties props, ProcessService ps)
+ throws BPException;
+
+ /**
+ * Correlate an event.
+ * @param me the message to correlate
+ * @param ed the EventDirector
+ * @return
+ * @throws BPException
+ */
+ public IResponseMessage correlateEvent(
+ IRequestMessageEvent me,
+ boolean sync,
+ IInternalEventDirector ed)
+ throws BPException {
+
+ // init the process service
+ getProcessService().init();
+
+ // set the message event on the event director
+ ed.setMessageEvent(me);
+ // this is a new message so null out the return message locator object
+ ed.setReturnMessageMetadata(null);
+
+ // get the static key
+ IStaticKey key = me.getStaticKey();
+
+ // object to collect data about sent messages
+ SentMessageData smd = new SentMessageData();
+
+ // look up noninstance creating operations for this static key
+ // inflight and metadata
+ Collection defOps = getAllDefinedNonInstanceCreatingOperations(key);
+
+ Collection keys = computeKeys(me,defOps,false);
+
+ //sync on computed keys
+ // TODO - make sure these keys are ordered constitently so we don't get dead locks
+ // This is not needed when we are not caching events
+ //lockOnKeys(ed, me.getStaticKey().toString(),keys);
+
+
+ // if defOps all belong to stateless BPs
+ // we don't need to look up registrations
+ Registration reg = null;
+ if ( isStatefull(defOps)) {
+ // find a regiration to route to
+ // null if there are no registrations to be routed to
+ // this method also locks on the process instance id so execution is
+ // serialized for the rest of the transaction
+ reg = findRegistration(ed, key,keys);
+ }
+
+ // we found a registation
+ // route and return
+ if ( reg != null ) {
+ routeToRegistration(reg,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+ IPMDOperation op = findInstanceCreatingOperation(key);
+
+ // we found an instance creation operation
+ // route and return
+ if ( op !=null ) {
+
+ // get root definition to see if it is stateless
+ IPMDRoot root = getProcessService().getInstanceService().getDefinitionService().
+ getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+
+ // if we need to protect process instantiation and
+ // the process is statefull we need to lock on the root id
+ if ( root.getProtectedInstantiation() && ! root.getIsStateless() ) {
+
+ // this sync is required to prevent
+ // serialize multiple instance creating events
+ // racing toward the same process
+ lock(ed, op.getRootDefId());
+
+ // we have to perform this check again because the instance could have been
+ // created between the time the first check was performed and the
+ // time root definition id was syncronized upon.
+ Registration regRecheck = findRegistration(ed, key,keys);
+
+ // we found a registation
+ // route and return
+ if ( regRecheck != null ) {
+ routeToRegistration(regRecheck,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+ }
+
+ // we have an operation
+ // so create an instance and route to it
+ // note this is called if the defintion is stateless or not
+ createInstanceAndRoute(op,me,ed);
+ sentMessage(smd,
+ ed.getReturnMessageMetadata().getRootProcessID());
+ return mapResult(smd,ed,sync);
+ }
+
+
+ if ( defOps == null && op == null ) {
+ CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { key });
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+ // At this point an exception should be thrown. The input event cannot be correlated. Caching
+ // is still not implement and it is not clear if this is even possible.
+ if (op == null)
+ {
+ BPELStaticKey bpelKey = (BPELStaticKey) key;
+ Object[] obj = new Object[5];
+ obj[0] = bpelKey.getOperation();
+ obj[1] = bpelKey.getPortType();
+ obj[2] = bpelKey.getTargetNamespace();
+ obj[3] = getCorrelationValues( me,defOps );
+ obj[4] = getCorrelationKeys( defOps );
+
+ CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR_R", obj);
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+
+
+ //cacheEvent(keys,me);
+ // TODO return what when cached?
+ return null;
+ }
+
+ private IPMDOperation findInstanceCreatingOperation (IStaticKey key) throws BPException {
+ // get a Collection of ids that are instance creating
+ DefinitionService ds = getProcessService()
+ .getInstanceService().getDefinitionService();
+ Collection ops = ds.getCorrelations(key);
+
+ if ( ops.size() == 0 ) {
+ return null;
+ }
+ if ( ops.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ IPMDOperation op = (IPMDOperation)ops.iterator().next();
+ return op;
+ }
+
+ private Registration findRegistration( IInternalEventDirector ed, IStaticKey key, Collection keys ) throws BPException {
+
+ Collection registrations = getRegistrations(key,keys);
+
+ if ( registrations.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ Registration reg = null;
+
+ if ( registrations.size() > 0 ) {
+ reg = (Registration)registrations.iterator().next();
+
+ // look the process instance so that we get a protected registration lookup
+ // this is necessary because an in flight crank of the engine could remove
+ // a registration
+ lockRootProcessInstance(ed, reg);
+
+ // select again because the instance coulde have been satisfied and gone away
+/* This is not needed if we are not caching events.
+ * registrations = getRegistrations(key,keys);
+
+ if ( registrations.size() > 1 ) {
+ throw new CorrelationServiceException("CS_NO_SPRAY",new Object[] { });
+ }
+
+ reg = (Registration)registrations.iterator().next();*/
+
+ }
+
+ return reg;
+
+ }
+
+ private Collection getAllDefinedNonInstanceCreatingOperations(IStaticKey key) throws BPException {
+ // get def service
+ DefinitionService ds = getProcessService()
+ .getInstanceService().getDefinitionService();
+ return ds.getNonInstanceCreatingOps(key);
+ }
+
+ private IResponseMessage mapResult(SentMessageData smd,
+ IInternalEventDirector ed, boolean sync ) throws BPException {
+ // build the return message if needed
+ if (sync && ! smd.spray ) {
+ // create the return message; no fault has occured
+ IResponseMessage mer = ed.getMessageEvent().createResponseMessage();
+ // get the return metadata
+ ReturnMessageLocatorHolder rmlh = ed.getReturnMessageMetadata();
+
+ // Test to see if the reply has a fault
+ Fault replyFault = null;
+ if ( rmlh.getFaultName() != null ) {
+ replyFault = new Fault();
+ replyFault.setFaultString(rmlh.getFaultName());
+ BPRuntimeException bpre = new BPRuntimeException(rmlh.getFaultName(),"");
+ bpre.setNameSpace(rmlh.getFaultNS());
+ replyFault.setFaultException(bpre);
+ mer.setFault(replyFault);
+ }
+
+ Iterator it = rmlh.getLocators();
+ InstanceService is = ed.getProcessService().getInstanceService();
+ IPMIProcess ipmip =
+ is.getInstance(
+ rmlh.getRootProcessID(),
+ rmlh.getRootProcessID());
+ // create a resolver so we can get at the data
+ ContextResolver resolver =
+ new ContextResolver(
+ ipmip,
+ rmlh,
+ ed.getProcessService().getContextService(ipmip),
+ ed.getProcessService().getScopeService(ipmip));
+
+ try
+ {
+// Some debugging code that dumps the
+ // process context.
+ SerializationContext sc = new SerializationContext( System.out );
+ sc.printComment( "Correlation Service Context:");
+ sc.serialize( resolver.getContextService() );
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+
+ // we might be building the return message after the scope has
+ // ended so we need to hijack the
+ // current scope of the process
+ ScopePath saveSP = ipmip.getScopePath();
+ ipmip.setScopePath(rmlh.getScopePath());
+
+ // loop over the return metadata
+ while (it.hasNext()) {
+ IPMDLocator loc = (IPMDLocator) it.next();
+ String name = loc.getName();
+ String[] name_split = name.split(":");
+
+ // if it's not a correlation set locator
+ Collection corrlSets = rmlh.getCorrlSets();
+ if ( corrlSets == null || ! corrlSets.contains(name_split[0])) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Adding " + loc.getName()
+ + " to return message");
+ }
+ IResolvedObject ro = null;
+
+ // resolve the object
+ ro = resolver.resolveBPContext(loc.getName());
+
+ // set the part
+ Object obj = ro.getValue();
+ IInteraction interaction = null;
+ if (obj instanceof IInteraction) {
+ interaction = (IInteraction) ro.getValue();
+ } else if (obj instanceof String) {
+ // create a new interaction
+ interaction = InteractionFactory.newInstance()
+ .createXMLInteraction(
+ ((String) ro.getValue()).getBytes());
+ }
+
+ if (replyFault == null) {
+ mer.setPart(name_split[1], interaction);
+ } else {
+ BPRuntimeException bpre = (BPRuntimeException) replyFault
+ .getFaultException();
+ bpre.addPartMessage(name_split[1], interaction);
+ }
+ }
+
+ }
+
+ // put back the scope
+ ipmip.setScopePath(saveSP);
+
+ // update the instance service
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(
+ "updatting instance service and removing lock for:"
+ + ipmip.getRootKey());
+ }
+ ed.getProcessService().update(ipmip.getRootKey(), ed, ed);
+
+ return mer;
+ } else {
+
+ Iterator it = smd.rootIds.iterator();
+ while ( it.hasNext() ) {
+ ed.getProcessService().update((String)it.next(), ed, ed);
+ }
+ return ed.getMessageEvent().createResponseMessage();
+ }
+
+ }
+
+ private String getCorrelationKeys(Collection operations)
+ {
+ String rtnVal = new String();
+ Iterator it = operations.iterator();
+ while (it.hasNext())
+ {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() != null)
+ {
+ Iterator it2 = op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext())
+ {
+ IPMDCorrelationSet corrlset = (IPMDCorrelationSet) it2.next();
+ Iterator it3 = corrlset.getPartQueries().iterator();
+ while (it3.hasNext())
+ {
+ PartQuery pq = (PartQuery) it3.next();
+ rtnVal += pq.getPartName();
+ if ((pq.getQuery() != null) && (pq.getQuery() instanceof Invocation))
+ {
+ Invocation query = (Invocation) pq.getQuery();
+ if (query.getQuery() instanceof JaxenXPathSingleNodeQuery)
+ {
+ JaxenXPathSingleNodeQuery jsnq = (JaxenXPathSingleNodeQuery)query.getQuery();
+ try
+ {
+ rtnVal += ":";
+ rtnVal += jsnq.getXPathExpression();
+ }
+ catch(Exception e)
+ {
+ // Just eat this error. We are in the process of building up an error to throw.
+ // All we are trying to do is capture more information to log.
+ }
+ }
+ }
+ rtnVal += " ";
+ }
+ }
+ }
+ }
+ return rtnVal;
+ }
+
+ // Get the correlation values for error message when something goes wrong.
+ private String getCorrelationValues(IRequestMessageEvent me, Collection operations)
+ {
+ String rtnVal = new String();
+ Iterator it = operations.iterator();
+ while (it.hasNext()) {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() != null)
+ {
+ // create the full key, in a process creating instance this should be the only
+ // key created
+ Iterator it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ //rtnVal += makeKeyValue(me, corrlset.getPartQueries());
+ //rtnVal += " ";
+
+ try
+ {
+ Iterator it3 = corrlset.getPartQueries().iterator();
+ while (it3.hasNext())
+ {
+ String s;
+ PartQuery pq = (PartQuery) it3.next();
+
+ IInteraction part = me.getPart(pq.getPartName());
+ if (part != null)
+ {
+ IInvocation invocation = ( IInvocation )pq.getQuery();
+ if ( invocation != null ) {
+ // invoke the interaction
+ s = ( String ) (part.invoke(invocation)).toString();
+ } else {
+ s = part.toString();
+ }
+
+ // append to the buffer if it is not null
+ if (s != null) {
+ rtnVal += s;
+ rtnVal += " ";
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ // TODO: Cory - please verify no rethrow
+ logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+ return null;
+ }
+ }
+ }
+ }
+ return rtnVal;
+ }
+
+ // compute a matching list of key values for a list of registrations
+ private Collection computeKeys(IRequestMessageEvent me, Collection operations, boolean dups) {
+ Collection keys = null;
+ if ( dups ) {
+ keys = new ArrayList();
+ } else {
+ keys = new HashSet();
+ }
+
+ Iterator it = operations.iterator();
+ while (it.hasNext()) {
+ IPMDOperation op = (IPMDOperation) it.next();
+ if (op.getCorrelation() == null) {
+ keys.add(null);
+ } else {
+ // create the full key, in a process creating instance this should be the only
+ // key created
+ StringBuffer buff = new StringBuffer("");
+ Iterator it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+ }
+ keys.add(buff.toString());
+
+
+ // create a key based on only the non initatiating correlation sets
+ buff = new StringBuffer("");
+ boolean foundNonInstantiating = false;
+ it2 =
+ op.getCorrelation().getCorrelationSets().iterator();
+ while (it2.hasNext()) {
+ IPMDCorrelationSet corrlset =
+ (IPMDCorrelationSet) it2.next();
+ if ( ! corrlset.isInstantiating() ) {
+ buff.append(makeKeyValue(me, corrlset.getPartQueries()));
+ foundNonInstantiating = true;
+ }
+ }
+ if ( ! buff.toString().equals("")) {
+ keys.add(buff.toString());
+ }
+ // if this operation only had instantiating keys
+ // there could be a registration with a null
+ // key(no correlations)
+ if ( ! foundNonInstantiating ) {
+ keys.add(null);
+ }
+ }
+ }
+
+ return keys;
+ }
+
+ /**
+ * Generate a key value
+ * @param me
+ * @param keyMetaData
+ * @return
+ */
+ private String makeKeyValue(IRequestMessageEvent me, Collection keyMetaData) {
+
+ // StringBuffer to build the key value
+ StringBuffer buff = new StringBuffer();
+
+ try {
+
+ // generate a key value
+ Iterator it = keyMetaData.iterator();
+ String s = null;
+ while (it.hasNext()) {
+
+ PartQuery pq = (PartQuery) it.next();
+
+ // check to see if the message event has a maching part
+ IInteraction part = me.getPart(pq.getPartName());
+ if (part != null) {
+
+ IInvocation invocation = ( IInvocation )pq.getQuery();
+
+ if ( invocation != null ) {
+ // invoke the interaction
+ s = ( String ) (part.invoke(invocation)).toString();
+ } else {
+ s = part.toString();
+ }
+
+ // append to the buffer if it is not null
+ if (s != null) {
+ buff.append(s);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ // TODO: Cory - please verify no rethrow
+ logger.log(Level.SEVERE, ResourceGetter.getString("ED_ED_EWM"), e);
+ return null;
+ }
+
+ return buff.toString();
+
+ }
+
+ private boolean isStatefull(Collection defOps) throws BPException {
+ boolean ret = false;
+ DefinitionService ds = getProcessService().getInstanceService().getDefinitionService();
+ for ( Iterator iter = defOps.iterator(); iter.hasNext(); ) {
+ IPMDOperation op = (IPMDOperation) iter.next();
+ op.getRootDefId();
+ IPMDRoot root = ds.getRootDefinition(new ProcessDefinitionKey(op.getRootDefId()));
+ if ( ! root.getIsStateless() ) {
+ ret = true;
+ break;
+ }
+
+ }
+ return ret;
+ }
+
+ // if we have already sent a message and this is a bpel compliant engine
+ // throw an error, check for a spray, and gather spray data
+ private void sentMessage(SentMessageData smd, String rootid)
+ throws CorrelationServiceException {
+
+ if ( smd.messageSent && isBPELCompliant() ) {
+ CorrelationServiceException cse = new CorrelationServiceException("CS_NON_BPEL",null);
+ cse.log(logger,Level.SEVERE);
+ throw cse;
+ }
+ if ( smd.messageSent ) {
+ smd.spray = true;
+ }
+ smd.rootIds.add(rootid);
+ smd.messageSent = true;
+ }
+
+// private void lockOnKeys(IInternalEventDirector ed, String prefix, Collection keys) throws BPException {
+// Iterator it = keys.iterator();
+// while ( it.hasNext() ) {
+// String key = (String)it.next();
+// lock(ed, prefix+key);
+// }
+// }
+
+// private void cacheEvent(Collection keys, IRequestMessageEvent me) throws BPException {
+// // TODO implement caching
+// CorrelationServiceException cse = new CorrelationServiceException("ED_ED_NSR",new Object[] { me.getStaticKey().toString() });
+// cse.log(logger,Level.SEVERE);
+// throw cse;
+// }
+
+ private void lockRootProcessInstance(IInternalEventDirector ed, Registration reg) throws BPException {
+ lock(ed, reg.getRootProcId());
+ }
+
+ class SentMessageData {
+
+ // spray flag, if this event is routed to more than one process is is a spray
+ boolean spray = false;
+ // a list of rootid to update at the end, needed for the spray case
+ ArrayList rootIds = new ArrayList();
+ // flag to see if we sent this message anywhere
+ boolean messageSent = false;
+
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceException.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceException.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceException.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceException.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.ode.correlation;
+
+import org.apache.ode.util.BPException;
+
+/**
+ * Exception class for the CorrelationService.
+ */
+public class CorrelationServiceException extends BPException {
+
+ static final long serialVersionUID = -6546885597497431542L;
+
+
+ /**
+ * @param message_id
+ * @param msgParams
+ */
+ public CorrelationServiceException(String message_id, Object[] msgParams) {
+ super(message_id, msgParams);
+ }
+
+ /**
+ * @param message_id
+ * @param msgParams
+ * @param cause
+ */
+ public CorrelationServiceException(
+ String message_id,
+ Object[] msgParams,
+ Throwable cause) {
+ super(message_id, msgParams, cause);
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceFactory.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceFactory.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceFactory.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/CorrelationServiceFactory.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.ode.correlation;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.ode.engine.ProcessService;
+import org.apache.ode.util.BPEProperties;
+import org.apache.ode.util.BPException;
+
+public class CorrelationServiceFactory
+{
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationServiceFactory.class.getName());
+
+ /** @param properties */
+ public static CorrelationService createCorrelationService(BPEProperties props,
+ ProcessService ps)
+ throws BPException {
+
+ CorrelationService cs = null;
+
+
+ try {
+ // load the implementation
+ Class instClass =
+ java.lang.Class.forName(props.getCorrelationServiceClass());
+ // try to instantiate the subclass
+ cs = (CorrelationService) instClass.newInstance();
+ cs.init(props,ps);
+
+ } catch (ClassNotFoundException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("CLASS_NOT_FOUND",new Object[] {props.getCorrelationServiceClass()});
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ } catch (InstantiationException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"InstantiationException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ } catch (IllegalAccessException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"IllegalAccessException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ return cs;
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/PartQuery.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/PartQuery.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/PartQuery.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/PartQuery.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.ode.correlation;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.apache.ode.interaction.IInvocation;
+import org.apache.ode.interaction.InteractionException;
+import org.apache.ode.interaction.InvocationFactory;
+
+/**
+ * A PartQuery is just a object that holds a part name and a query object for that part.
+ * StaticRegistrations and DynamicRegistrations build an array of these to represent
+ * metadata that is used to generate a correlation key value.
+ */
+public class PartQuery implements Serializable {
+
+ static final long serialVersionUID = -2542067136142474418L;
+
+ private String part;
+ private Object obj;
+ private String locatorName;
+
+ public PartQuery(){}
+
+ /**
+ * Constructor for a PartQuery
+ */
+ public PartQuery(String part, Object obj, String locatorName, HashMap nsMap) {
+ this.part = part;
+ if ( obj instanceof String )
+ {
+ try
+ {
+ IInvocation invocation =
+ InvocationFactory.newInstance().
+ //createXPathQueryNodeValueInvocation( ( String ) obj, new HashMap());
+ createXPathQueryNodeValueInvocation( ( String ) obj, nsMap);
+ this.obj = invocation;
+ } catch (InteractionException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+ else
+ {
+ this.obj = obj;
+ }
+
+ this.locatorName = locatorName;
+ }
+
+ /**
+ * Set the part name.
+ * @param part The part name.
+ */
+ public void setPartName(String part) {
+ this.part = part;
+ }
+
+ /**
+ * Get the part name.
+ * @return The part name.
+ */
+ public String getPartName() {
+ return this.part;
+ }
+
+ /**
+ * Set the query object.
+ * @param obj The query.
+ */
+ public void setQuery(Object obj) {
+ this.obj = obj;
+ }
+
+ /**
+ * Get the query object.
+ * @return The query object.
+ */
+ public Object getQuery() {
+ return this.obj;
+ }
+
+ /**
+ * @return
+ */
+ public String getLocatorName() {
+ return locatorName;
+ }
+
+ /**
+ * @param string
+ */
+ public void setLocatorName(String string) {
+ locatorName = string;
+ }
+
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/Registration.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/Registration.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/Registration.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/Registration.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/*
+ * Created on Aug 12, 2003
+ *
+ */
+package org.apache.ode.correlation;
+
+import java.io.Serializable;
+
+/**
+ * @author charper
+ *
+ */
+public class Registration implements Serializable {
+
+ static final long serialVersionUID = 2061953613411330775L;
+
+ private String staticKeyValue;
+ private String rootDefId;
+ private String defId;
+ private String rootProcId;
+ private String procId;
+ private String keyValue;
+ private String operationId;
+
+ public Registration(String staticKeyValue, String operationId, String rootDefId, String defId,
+ String rootProcId, String procId, String keyValue) {
+ this.staticKeyValue = staticKeyValue;
+ this.rootDefId = rootDefId;
+ this.defId = defId;
+ this.rootProcId = rootProcId;
+ this.procId = procId;
+ this.keyValue = keyValue;
+ this.operationId = operationId;
+ }
+
+ /**
+ * @return
+ */
+ public String getDefId() {
+ return defId;
+ }
+
+ /**
+ * @return
+ */
+ public String getKeyValue() {
+ return keyValue;
+ }
+
+ /**
+ * @return
+ */
+ public String getProcId() {
+ return procId;
+ }
+
+ /**
+ * @return
+ */
+ public String getRootDefId() {
+ return rootDefId;
+ }
+
+ /**
+ * @return
+ */
+ public String getRootProcId() {
+ return rootProcId;
+ }
+
+ /**
+ * @return
+ */
+ public String getStaticKeyValue() {
+ return staticKeyValue;
+ }
+
+ /**
+ * @param string
+ */
+ public void setDefId(String string) {
+ defId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setKeyValue(String string) {
+ keyValue = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setProcId(String string) {
+ procId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setRootDefId(String string) {
+ rootDefId = string;
+ }
+
+ /**
+ * @param string
+ */
+ public void setRootProcId(String string) {
+ rootProcId = string;
+ }
+
+ /**
+ * @param key
+ */
+ public void setStaticKeyValue(String key) {
+ staticKeyValue = key;
+ }
+
+ /**
+ * @return
+ */
+ public String getOperationId() {
+ return operationId;
+ }
+
+ /**
+ * @param string
+ */
+ public void setOperationId(String string) {
+ operationId = string;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "Registration(staticKeyValue:"+staticKeyValue+
+ " rootDefId:"+rootDefId+
+ " defId:"+defId+
+ " rootProcId:"+rootProcId+
+ " procId:"+procId+
+ " keyValue:"+keyValue+
+ " operationId:"+operationId+")";
+ }
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object obj) {
+ if ( obj instanceof Registration ) {
+ Registration reg = (Registration)obj;
+ if (
+ staticKeyValue == null ?
+ reg.getStaticKeyValue() == null :
+ ( reg.getStaticKeyValue() != null &&
+ reg.getStaticKeyValue().equals(staticKeyValue)) &&
+ rootDefId == null ?
+ reg.getRootDefId() == null :
+ ( reg.getRootDefId() != null &&
+ reg.getRootDefId().equals(rootDefId) )&&
+ defId == null ?
+ reg.getDefId() == null :
+ ( reg.getDefId() != null &&
+ reg.getDefId().equals(defId) ) &&
+ rootProcId == null ?
+ reg.getRootProcId() == null :
+ ( reg.getRootProcId() != null &&
+ reg.getRootProcId().equals(rootProcId) ) &&
+ procId == null ?
+ reg.getProcId() == null :
+ ( reg.getProcId() != null &&
+ reg.getProcId().equals(procId) ) &&
+ keyValue == null ?
+ reg.getKeyValue() == null :
+ ( reg.getKeyValue() != null &&
+ reg.getKeyValue().equals(keyValue) ) &&
+ operationId == null ?
+ reg.getOperationId() == null :
+ ( reg.getOperationId() != null &&
+ reg.getOperationId().equals(operationId) ) ) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/keys/CorrelationKeysUtil.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/keys/CorrelationKeysUtil.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/keys/CorrelationKeysUtil.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/keys/CorrelationKeysUtil.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.ode.correlation.keys;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.ode.action.internal.ActionException;
+import org.apache.ode.context.resolver.ContextResolvedObject;
+import org.apache.ode.context.resolver.ContextResolver;
+import org.apache.ode.correlation.PartQuery;
+import org.apache.ode.definition.IPMDCorrelation;
+import org.apache.ode.definition.IPMDCorrelationSet;
+import org.apache.ode.definition.IPMDOperation;
+import org.apache.ode.interaction.IInteraction;
+import org.apache.ode.interaction.IInvocation;
+import org.apache.ode.interaction.InteractionException;
+import org.apache.ode.interaction.InteractionFactory;
+import org.apache.ode.util.BPException;
+
+public class CorrelationKeysUtil {
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationKeysUtil.class.getName());
+
+ public static boolean hasCorrelationSets(Collection operations) {
+ boolean ret = false;
+ for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+ IPMDOperation op = (IPMDOperation) opIter.next();
+ IPMDCorrelation correlation = op.getCorrelation();
+ if ( correlation != null &&
+ ! correlation.getCorrelationSets().isEmpty() ) {
+ ret = true;
+ break;
+ }
+ }
+ return ret;
+ }
+
+
+ public static boolean setCorrelationKeys(
+ ContextResolver resolver,
+ Map parts,
+ Collection operations,
+ String pattern)
+ throws BPException {
+
+ boolean initializedSet = false;
+
+ logger.fine("Trying to Initailize a correlation for set.");
+
+ for (Iterator opIter = operations.iterator(); opIter.hasNext();) {
+ IPMDOperation op = (IPMDOperation) opIter.next();
+
+ IPMDCorrelation correlation = op.getCorrelation();
+
+ if (correlation == null)
+ continue;
+
+
+ // loop over all the correlation sets and initialize them if uninitalized
+ for (Iterator csIter = correlation.getCorrelationSets().iterator();
+ csIter.hasNext();
+ ) {
+
+ IPMDCorrelationSet cs = (IPMDCorrelationSet) csIter.next();
+
+ if (cs.isInstantiating()) {
+
+ if ( pattern == null || cs.getPattern().equals(pattern) ) {
+
+ for (Iterator pqsIter = cs.getPartQueries().iterator(); pqsIter
+ .hasNext();) {
+
+ PartQuery pq = (PartQuery) pqsIter.next();
+
+ ContextResolvedObject robj = (ContextResolvedObject) resolver
+ .resolveBPContext(pq.getLocatorName());
+
+ // if the correlation has been set then break the set loop
+ if (robj.getValue() != null)
+ break;
+
+ // check to see if the message event has a maching part
+ initializedSet = true;
+ Object part = parts.get(pq.getPartName());
+ if (part != null) {
+ robj.setObject(getPartData(part, pq));
+ }
+ }
+ }
+ }
+ }
+ }
+ return initializedSet;
+ }
+
+
+ private static Object getPartData(Object inputObj, PartQuery pq)
+ throws InteractionException, ActionException {
+
+ IInteraction interaction = null;
+
+ if (inputObj instanceof String) {
+ String part = (String) inputObj;
+ // create a new interaction
+ interaction =
+ InteractionFactory.newInstance().createXMLInteraction(
+ part.getBytes());
+ } else if (inputObj instanceof IInteraction) {
+ interaction = (IInteraction) inputObj;
+ } else {
+ ActionException bpx = new ActionException("UNKNOWN_DATATYPE", null);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ // create an invocation
+ IInvocation invocation = ( IInvocation ) ( pq.getQuery());
+
+ // invoke the interaction
+ Object resultObject = null;
+ if ( invocation != null ) {
+ resultObject = interaction.invoke(invocation);
+ } else {
+ resultObject = interaction.toString();
+ }
+
+ return resultObject;
+}
+
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/CorrelationServiceEjbImpl.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/CorrelationServiceEjbImpl.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/CorrelationServiceEjbImpl.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/CorrelationServiceEjbImpl.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,477 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package org.apache.ode.correlation.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.ejb.CreateException;
+import javax.ejb.EJBException;
+import javax.ejb.FinderException;
+import javax.ejb.ObjectNotFoundException;
+import javax.ejb.RemoveException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.ode.correlation.CorrelationService;
+import org.apache.ode.correlation.CorrelationServiceException;
+import org.apache.ode.correlation.Registration;
+import org.apache.ode.correlation.managed.RegistrationEntityLocal;
+import org.apache.ode.correlation.managed.RegistrationEntityLocalHome;
+import org.apache.ode.correlation.unmanaged.CorrelationServiceSLImpl;
+import org.apache.ode.engine.ProcessService;
+import org.apache.ode.event.IStaticKey;
+import org.apache.ode.instance.IPMIProcess;
+import org.apache.ode.util.BPEProperties;
+import org.apache.ode.util.BPException;
+
+/**
+ * @author charper
+ *
+ * <bkl>
+ * I added an in-memory caching layer to this service so that
+ * cmp registration operations could be bundled up at the
+ * end of the transaction rather than interspersed throughout.
+ * This bundling reduces the deadlock hazard.
+ * </bkl>
+ *
+ */
+public class CorrelationServiceEjbImpl extends CorrelationService
+{
+
+
+ private static Logger logger =
+ Logger.getLogger(CorrelationServiceEjbImpl.class.getName());
+
+ private LinkedList registrationOperations = new LinkedList();
+ private LinkedList registrationsForRemoval = new LinkedList();
+ private ProcessService ps;
+ private boolean isBPELCompliant;
+ private RegistrationEntityLocalHome relh;
+ public static final String JNDI_RELH_NAME = "java:comp/env/registrationBean";
+ private CorrelationServiceSLImpl transientCS;
+
+/*
+ private void trace( String message )
+ {
+ System.out.println("CSTrace - " + message + " iid=" + this.hashCode());
+ System.out.flush();
+ }
+
+ private void trace( String message, Object obj )
+ {
+ trace(message);
+
+ XStream xstream = new XStream();
+ String xmlString = xstream.toXML(obj);
+ System.out.println(xmlString);
+ System.out.flush();
+
+ }
+*/
+ public CorrelationServiceEjbImpl()
+ {
+ //trace( "Inside CorrelationServiceEjbImpl()");
+ transientCS = new CorrelationServiceSLImpl();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#createRegistration(org.apache.ode.correlation.Registration)
+ */
+ public void createRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside createRegistration(Registration registration)", registration);
+ CreateRegistration cr = new CreateRegistration();
+ cr.reg = registration;
+ registrationOperations.add(cr);
+ transientCS.createRegistration(registration);
+ }
+
+ private void createRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+
+ //trace( "Inside createRegistrationEntity(Registration registration)", registration );
+ try {
+ relh.create(
+ registration.getStaticKeyValue(),
+ registration.getRootDefId(),
+ registration.getDefId(),
+ registration.getRootProcId(),
+ registration.getProcId(),
+ registration.getKeyValue(),
+ registration.getOperationId());
+ } catch (CreateException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"CreateException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.event.IStaticKey, java.util.Collection)
+ */
+ public Collection getRegistrations(IStaticKey key, Collection keyValues)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key, Collection keyValues)",
+ // new Object[] {key, keyValues});
+ Collection regs = new HashSet();
+ Iterator it = keyValues.iterator();
+ while (it.hasNext()){
+ String keyValue = (String)it.next();
+ Collection beans;
+ try {
+ // if the key value is null look for all registration
+ // with the given static key, the correlation service
+ // will decide how to handle this if there are multiple
+ // registrations found
+ if ( keyValue == null ) {
+ beans =
+ relh.findByStaticKey(key.toString());
+ } else {
+ beans =
+ relh.findByStaticKeyandKeyValue(key.toString(), keyValue);
+ }
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ Collection regObjs = makeRegistrations(key.toString(),beans);
+ for (Iterator iter = regObjs.iterator(); iter.hasNext();) {
+ Object element = (Object) iter.next();
+ boolean dup = false;
+ for (Iterator iterator = regs.iterator(); iterator
+ .hasNext();) {
+ Object reg = (Object) iterator.next();
+ if ( element.equals(reg)) {
+ dup = true;
+ break;
+ }
+ }
+ if ( ! dup ) {
+ regs.add(element);
+ }
+ }
+
+
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll(transientCS.getRegistrations(key, keyValues));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.instance.service.IPMIProcess)
+ */
+ public Collection getRegistrations(IPMIProcess key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IPMIProcess key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByProcId(key.getKey());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.getKey(),beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ regs.addAll( transientCS.getRegistrations(key));
+ return regs;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.event.IStaticKey)
+ */
+ public Collection getRegistrations(IStaticKey key)
+ throws CorrelationServiceException {
+ //trace( "Inside getRegistrations(IStaticKey key)", key);
+ Collection beans;
+ try {
+ beans = relh.findByStaticKey(key.toString());
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Collection regs = makeRegistrations(key.toString(), beans);
+ regs = screenRegs(regs);
+
+ //Add the registrations which were created in the current transaction
+ //but have not been persisted yet.
+ Collection transientRegs = transientCS.getRegistrations(key);
+ //trace("Adding transient regs", transientRegs);
+ regs.addAll( transientRegs );
+ return regs;
+
+ }
+
+ // Called by the various getRegistrations methods.
+ // If the registrations have been removed but the removal
+ // has not been persistent yet, don't report the registration.
+ private Collection screenRegs( Collection regs )
+ {
+ LinkedList retVal = new LinkedList();
+ Iterator iter = regs.iterator();
+ while( iter.hasNext() )
+ {
+ Registration candidate = ( Registration )( iter.next());
+ if ( !( registrationsForRemoval.contains( candidate) ) )
+ {
+ retVal.add(candidate);
+ }
+ }
+ return retVal;
+ }
+
+
+ private Collection makeRegistrations(String key, Collection beans){
+ //trace( "Inside makeRegistrations(String key, Collection beans)",
+ // new Object[]{key, beans});
+ ArrayList regs = new ArrayList ();
+ Iterator it = beans.iterator();
+ while ( it.hasNext() ){
+ RegistrationEntityLocal bean = (RegistrationEntityLocal)it.next();
+ regs.add(
+ new Registration(
+ key,
+ bean.getOperationId(),
+ bean.getRootDefId(),
+ bean.getDefId(),
+ bean.getRootProcId(),
+ bean.getProcId(),
+ bean.getKeyValue()
+ ));
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#update()
+ */
+ public void update() throws BPException {
+ //trace( "Inside update()");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getProcessService()
+ */
+ public ProcessService getProcessService() throws BPException {
+ //trace( "Inside getProcessService()");
+ return ps;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#isBPELCompliant()
+ */
+ public boolean isBPELCompliant() {
+ //trace( "Inside isBPELCompliant()");
+ return isBPELCompliant;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#init(org.apache.ode.util.BPEProperties, org.apache.ode.engine.ProcessService)
+ */
+ public void init(BPEProperties props, ProcessService ps)
+ throws CorrelationServiceException {
+ //trace( "Inside init(BPEProperties props, ProcessService ps)", new Object[]{props, ps});
+ this.ps = ps;
+ if (props.getBPELCompliant().equals("TRUE")) {
+ this.isBPELCompliant = true;
+ }
+
+ try {
+ InitialContext ic = new InitialContext();
+ relh = (RegistrationEntityLocalHome) ic.lookup(JNDI_RELH_NAME);
+ } catch (NamingException ne) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"NamingException"},ne);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistration(org.apache.ode.event.IStaticKey, java.lang.String, java.lang.String)
+ */
+ public Registration getRegistration(IStaticKey key, String operationId, String processId) throws CorrelationServiceException {
+ //trace( "Inside getRegistration(IStaticKey key, String operationId, String processId)",
+ // new Object[]{key, operationId, processId});
+ Collection beans;
+ try {
+ beans = relh.findByStaticKeyandOpIdandProcId(key.toString(),operationId,processId);
+ } catch (FinderException e) {
+ CorrelationServiceException bpx = new CorrelationServiceException("NATIVE_EXCEPTION",new Object[] {"FinderException"},e);
+ bpx.log(logger,Level.SEVERE);
+ throw bpx;
+ }
+ Iterator it = makeRegistrations(key.toString(),beans).iterator();
+
+ Registration retVal = null;
+
+ // there should only be one of these
+ if ( it.hasNext() )
+ {
+ retVal = (Registration)it.next();
+
+ //The registration could already have been
+ //deleted but the deletion may not have been
+ //persisted yet. Consult the removal list to
+ //find out.
+ //trace( "Checking transient registration removal list.");
+ if ( registrationsForRemoval.contains(retVal))
+ {
+ //trace("Transient deletion detected.");
+ retVal = null;
+ }
+ }
+
+
+ if ( retVal == null )
+ {
+ //There could be a registration which has not been persisted yet.
+ //Consult the transient correlation service to find out.
+ retVal = transientCS.getRegistration(key, operationId, processId);
+ if ( retVal != null )
+ {
+ //trace("Transient registration found.");
+ }
+ }
+
+ return retVal;
+ }
+
+ public void removeRegistration(Registration registration)
+ throws CorrelationServiceException
+ {
+ //trace( "Inside removeRegistration(Registration registration)", registration);
+ RemoveRegistration rr = new RemoveRegistration();
+ rr.reg = registration;
+ registrationOperations.add(rr);
+ transientCS.removeRegistration(registration);
+ registrationsForRemoval.add(registration);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#removeRegistration(org.apache.ode.correlation.Registration)
+ */
+ protected void removeRegistrationEntity(Registration registration)
+ throws CorrelationServiceException {
+ //trace( "Inside removeRegistrationEntity(Registration registration)", registration);
+ try {
+ RegistrationEntityPK pk = new RegistrationEntityPK();
+ pk.setOperationId(registration.getOperationId());
+ pk.setProcId(registration.getProcId());
+ pk.setRootDefId(registration.getRootDefId());
+ pk.setRootProcId(registration.getRootProcId());
+ pk.setStaticKeyValue(registration.getStaticKeyValue());
+ RegistrationEntityLocal bean = relh.findByPrimaryKey(pk);
+ bean.remove();
+ } catch ( ObjectNotFoundException e ) {
+ // ignore requests to remove regstrations that are not there
+ } catch (FinderException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (EJBException e) {
+ logger.log(Level.WARNING,"",e);
+ } catch (RemoveException e) {
+ logger.log(Level.WARNING,"",e);
+ }
+ }
+
+ // Delete old regs and insert new regs. This method
+ // was added so that the removal of the registrations from the
+ // database could be sequenced at the end of the transaction
+ // rather than interspersed throughout the transaction causing
+ // a deadlock hazard.
+ public void persistRegistrationChanges() throws CorrelationServiceException
+ {
+ //trace( "Inside persistRegistrationChanges()");
+ Iterator iter = registrationOperations.iterator();
+ while( iter.hasNext() )
+ {
+ Object obj = iter.next();
+ if ( obj instanceof RemoveRegistration )
+ {
+ RemoveRegistration rr =
+ (RemoveRegistration)(obj);
+ removeRegistrationEntity( rr.reg );
+ }
+ if ( obj instanceof CreateRegistration )
+ {
+ CreateRegistration cr =
+ (CreateRegistration)(obj);
+ createRegistrationEntity( cr.reg );
+ }
+ }
+ clearState();
+
+ /*
+ Iterator iter = registrationsForRemoval.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ removeRegistrationEntity( reg );
+ }
+ registrationsForRemoval.clear();
+
+ iter = newRegistrations.iterator();
+ while( iter.hasNext() )
+ {
+ Registration reg = ( Registration )( iter.next() );
+ createRegistrationEntity( reg );
+ }
+ newRegistrations.clear();
+ */
+ }
+
+ public void clearState()
+ {
+ //trace( "Inside clearState()");
+ registrationsForRemoval.clear();
+ registrationOperations.clear();
+ //newRegistrations.clear();
+ transientCS = new CorrelationServiceSLImpl();
+ }
+
+ private class RemoveRegistration
+ {
+ public Registration reg;
+ }
+
+ private class CreateRegistration
+ {
+ public Registration reg;
+ }
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityBean.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityBean.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityBean.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityBean.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.ode.correlation.managed;
+import java.rmi.RemoteException;
+
+import javax.ejb.CreateException;
+import javax.ejb.EntityBean;
+import javax.ejb.EntityContext;
+
+/**
+ * This bean stores static registrations.
+ * @ejb.bean
+ * type="CMP"
+ * name="RegistrationEntity"
+ * jndi-name="BPE/RegistrationEntity"
+ * local-jndi-name="BPE/RegistrationEntity"
+ * view-type="local"
+ * cmp-version="2.x"
+ * schema="RegSchema"
+ * @ejb.persistence
+ * table-name="BPE_RegistrationEntity"
+ * @ejb.pk
+ * class="org.apache.ode.correlation.managed.RegistrationEntityPK"
+ * @ejb.home
+ * local-class="org.apache.ode.correlation.managed.RegistrationEntityLocalHome"
+ * @ejb.interface
+ * local-class="org.apache.ode.correlation.managed.RegistrationEntityLocal"
+ * @ejb.transaction type="Mandatory"
+ *@ejb.finder
+ * signature="java.util.Collection findByStaticKey(java.lang.String staticKey)"
+ * query="SELECT OBJECT(rs) FROM RegSchema rs WHERE rs.staticKeyValue = ?1"
+ *@ejb.finder
+ * signature="java.util.Collection findByStaticKeyandKeyValue(java.lang.String staticKey, java.lang.String keyValue)"
+ * query="SELECT OBJECT(rs) FROM RegSchema rs WHERE rs.staticKeyValue = ?1 AND rs.keyValue = ?2"
+ * @ejb.finder
+ * signature="java.util.Collection findByStaticKeyandOpIdandProcId(java.lang.String staticKey, java.lang.String opId, java.lang.String procId)"
+ * query="SELECT OBJECT(rs) FROM RegSchema rs WHERE rs.staticKeyValue = ?1 AND rs.operationId = ?2 AND rs.rootProcId = ?3"
+ * @ejb.finder
+ * signature="java.util.Collection findByProcId(java.lang.String procId)"
+ * query="SELECT OBJECT(rs) FROM RegSchema rs WHERE rs.rootProcId = ?1"
+ * @ejb.finder
+ * signature="java.util.Collection findByRootDefId(java.lang.String rootDefId)"
+ * query="SELECT OBJECT(rs) FROM RegSchema rs WHERE rs.rootDefId = ?1"
+ *
+ *
+*/
+
+public abstract class RegistrationEntityBean implements EntityBean {
+
+ static final long serialVersionUID = -887111238106204156L;
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.persistence column-name="defId"
+ */
+ public abstract String getDefId();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.persistence column-name="keyValue"
+ */
+ public abstract String getKeyValue();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.pk-field
+ * @ejb.persistence column-name="staticKeyValue"
+ */
+ public abstract String getStaticKeyValue();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.pk-field
+ * @ejb.persistence column-name="rootProcId"
+ */
+ public abstract String getRootProcId();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.pk-field
+ * @ejb.persistence column-name="operationId"
+ */
+ public abstract String getOperationId();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.pk-field
+ * @ejb.persistence column-name="procId"
+ */
+ public abstract String getProcId();
+
+ /**
+ * @return
+ * @ejb.interface-method
+ * @ejb.pk-field
+ * @ejb.persistence column-name="rootDefId"
+ */
+ public abstract String getRootDefId();
+
+ /**
+ * @param string
+ * @ejb.interface-method
+ */
+ public abstract void setDefId(String string);
+
+ /**
+ * @param string
+ * @ejb.interface-method
+ */
+ public abstract void setKeyValue(String string);
+
+ /**
+ * @param string
+ * @ejb.interface-method
+ */
+ public abstract void setOperationId(String string);
+
+ /**
+ * @param string
+ */
+ public abstract void setProcId(String string);
+
+ /**
+ * @param string
+ * @ejb.interface-method
+ */
+ public abstract void setRootDefId(String string);
+ /**
+ * @param string
+ */
+ public abstract void setRootProcId(String string);
+
+ /**
+ * @param string
+ * @ejb.interface-method
+ */
+ public abstract void setStaticKeyValue(String string);
+
+ /**
+ * @ejb.create-method
+ */
+ public RegistrationEntityPK ejbCreate(
+ String staticKeyValue,
+ String rootDefId,
+ String defId,
+ String rootProcId,
+ String procId,
+ String keyValue,
+ String operationId)
+ throws CreateException {
+
+ //Called by container after setEntityContext
+ // Use the abstract methods to set parameters
+ setStaticKeyValue(staticKeyValue);
+ setRootDefId(rootDefId);
+ setDefId(defId);
+ setRootProcId(rootProcId);
+ setProcId(procId);
+ setKeyValue(keyValue);
+ setOperationId(operationId);
+ return null;
+ }
+
+ public void ejbActivate() {
+ //Called by container before bean
+ //swapped into memory
+ }
+
+ public void ejbPostCreate(
+ String staticKeyValue,
+ String rootDefId,
+ String defId,
+ String rootProcId,
+ String procId,
+ String keyValue,
+ String operationId) {
+ //Called by container after ejbCreate
+ }
+
+ public void ejbPassivate() {
+ //Called by container before
+ //bean swapped into storage
+ }
+
+ public void ejbRemove() throws RemoteException {
+ //Called by container before
+ //data removed from database
+ }
+
+ public void ejbLoad() {
+ //Called by container to
+ //refresh entity bean's state
+ }
+
+ public void ejbStore() {
+ //Called by container to save
+ //bean's state to database
+ }
+
+ public void setEntityContext(EntityContext ctx) {
+ //Called by container to set bean context
+ }
+
+ public void unsetEntityContext() {
+ //Called by container to unset bean context
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityPK.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityPK.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityPK.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/managed/RegistrationEntityPK.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/*
+ * Generated by XDoclet - Do not edit!
+ */
+package org.apache.ode.correlation.managed;
+
+/**
+ * Primary key for RegistrationEntity.
+ */
+public class RegistrationEntityPK
+ extends java.lang.Object
+ implements java.io.Serializable {
+
+ // EAServer can't deal with this either
+ //static final long serialVersionUID = 2903919825803166151L;
+
+ //EAServer can't deal with these
+ //private int _hashCode = 0;
+ //private StringBuffer _toStringValue = null;
+
+ public java.lang.String operationId;
+ public java.lang.String procId;
+ public java.lang.String rootDefId;
+ public java.lang.String rootProcId;
+ public java.lang.String staticKeyValue;
+
+ public RegistrationEntityPK() {
+ }
+
+ public RegistrationEntityPK(
+ java.lang.String operationId,
+ java.lang.String procId,
+ java.lang.String rootDefId,
+ java.lang.String rootProcId,
+ java.lang.String staticKeyValue) {
+ this.operationId = operationId;
+ this.procId = procId;
+ this.rootDefId = rootDefId;
+ this.rootProcId = rootProcId;
+ this.staticKeyValue = staticKeyValue;
+ }
+
+ public java.lang.String getOperationId() {
+ return operationId;
+ }
+ public java.lang.String getProcId() {
+ return procId;
+ }
+ public java.lang.String getRootDefId() {
+ return rootDefId;
+ }
+ public java.lang.String getRootProcId() {
+ return rootProcId;
+ }
+ public java.lang.String getStaticKeyValue() {
+ return staticKeyValue;
+ }
+
+ public void setOperationId(java.lang.String operationId) {
+ this.operationId = operationId;
+ //EAServer can't deal with these
+ //_hashCode = 0;
+ }
+ public void setProcId(java.lang.String procId) {
+ this.procId = procId;
+ //EAServer can't deal with these
+ //_hashCode = 0;
+ }
+ public void setRootDefId(java.lang.String rootDefId) {
+ this.rootDefId = rootDefId;
+ //EAServer can't deal with these
+ //_hashCode = 0;
+ }
+ public void setRootProcId(java.lang.String rootProcId) {
+ this.rootProcId = rootProcId;
+ //EAServer can't deal with these
+ //_hashCode = 0;
+ }
+ public void setStaticKeyValue(java.lang.String staticKeyValue) {
+ this.staticKeyValue = staticKeyValue;
+ //EAServer can't deal with these
+ //_hashCode = 0;
+ }
+
+ public int hashCode() {
+ int _hashCode = 0;
+ if (_hashCode == 0) {
+ if (this.operationId != null)
+ _hashCode += this.operationId.hashCode();
+ if (this.procId != null)
+ _hashCode += this.procId.hashCode();
+ if (this.rootDefId != null)
+ _hashCode += this.rootDefId.hashCode();
+ if (this.rootProcId != null)
+ _hashCode += this.rootProcId.hashCode();
+ if (this.staticKeyValue != null)
+ _hashCode += this.staticKeyValue.hashCode();
+ }
+
+ return _hashCode;
+ }
+
+ public boolean equals(Object obj) {
+ if (!(obj
+ instanceof org.apache.ode.correlation.managed.RegistrationEntityPK))
+ return false;
+
+ org.apache.ode.correlation.managed.RegistrationEntityPK pk =
+ (org.apache.ode.correlation.managed.RegistrationEntityPK) obj;
+ boolean eq = true;
+
+ if (obj == null) {
+ eq = false;
+ } else {
+ if (this.operationId == null
+ && ((org.apache.ode.correlation.managed.RegistrationEntityPK) obj)
+ .getOperationId()
+ == null) {
+ eq = true;
+ } else {
+ if (this.operationId == null
+ || ((org.apache.ode.correlation.managed
+ .RegistrationEntityPK) obj)
+ .getOperationId()
+ == null) {
+ eq = false;
+ } else {
+ eq = eq && this.operationId.equals(pk.operationId);
+ }
+ }
+ if (this.procId == null
+ && ((org.apache.ode.correlation.managed.RegistrationEntityPK) obj)
+ .getProcId()
+ == null) {
+ eq = true;
+ } else {
+ if (this.procId == null
+ || ((org.apache.ode.correlation.managed
+ .RegistrationEntityPK) obj)
+ .getProcId()
+ == null) {
+ eq = false;
+ } else {
+ eq = eq && this.procId.equals(pk.procId);
+ }
+ }
+ if (this.rootDefId == null
+ && ((org.apache.ode.correlation.managed.RegistrationEntityPK) obj)
+ .getRootDefId()
+ == null) {
+ eq = true;
+ } else {
+ if (this.rootDefId == null
+ || ((org.apache.ode.correlation.managed
+ .RegistrationEntityPK) obj)
+ .getRootDefId()
+ == null) {
+ eq = false;
+ } else {
+ eq = eq && this.rootDefId.equals(pk.rootDefId);
+ }
+ }
+ if (this.rootProcId == null
+ && ((org.apache.ode.correlation.managed.RegistrationEntityPK) obj)
+ .getRootProcId()
+ == null) {
+ eq = true;
+ } else {
+ if (this.rootProcId == null
+ || ((org.apache.ode.correlation.managed
+ .RegistrationEntityPK) obj)
+ .getRootProcId()
+ == null) {
+ eq = false;
+ } else {
+ eq = eq && this.rootProcId.equals(pk.rootProcId);
+ }
+ }
+ if (this.staticKeyValue == null
+ && ((org.apache.ode.correlation.managed.RegistrationEntityPK) obj)
+ .getStaticKeyValue()
+ == null) {
+ eq = true;
+ } else {
+ if (this.staticKeyValue == null
+ || ((org.apache.ode.correlation.managed
+ .RegistrationEntityPK) obj)
+ .getStaticKeyValue()
+ == null) {
+ eq = false;
+ } else {
+ eq = eq && this.staticKeyValue.equals(pk.staticKeyValue);
+ }
+ }
+ }
+
+ return eq;
+ }
+
+ /** @return String representation of this pk in the form of [.field1.field2.field3]. */
+ public String toString() {
+ StringBuffer _toStringValue = null;
+ if (_toStringValue == null) {
+ _toStringValue = new StringBuffer("[.");
+ _toStringValue.append(this.operationId).append('.');
+ _toStringValue.append(this.procId).append('.');
+ _toStringValue.append(this.rootDefId).append('.');
+ _toStringValue.append(this.rootProcId).append('.');
+ _toStringValue.append(this.staticKeyValue).append('.');
+ _toStringValue.append(']');
+ }
+
+ return _toStringValue.toString();
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/unmanaged/CorrelationServiceSLImpl.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/unmanaged/CorrelationServiceSLImpl.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/unmanaged/CorrelationServiceSLImpl.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/correlation/unmanaged/CorrelationServiceSLImpl.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/*
+ * Created on Aug 25, 2003
+ *
+ */
+package org.apache.ode.correlation.unmanaged;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.ode.correlation.CorrelationService;
+import org.apache.ode.correlation.CorrelationServiceException;
+import org.apache.ode.correlation.Registration;
+import org.apache.ode.engine.ProcessService;
+import org.apache.ode.instance.IPMIProcess;
+import org.apache.ode.event.IStaticKey;
+import org.apache.ode.util.BPEProperties;
+import org.apache.ode.util.BPException;
+
+/**
+ * @author charper
+ *
+ */
+public class CorrelationServiceSLImpl extends CorrelationService {
+
+ private ProcessService ps;
+ private boolean isBPELCompliant;
+ private HashMap hashByStaticKey= new HashMap();
+ private HashMap hashByStatcKeyandKeyValue = new HashMap();
+ private HashMap hashByStaticKeyandOperationIdandProcId = new HashMap();
+ private HashMap hashByProcId = new HashMap();
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#createRegistration(org.apache.ode.correlation.Registration)
+ */
+ public void createRegistration(Registration regisration)
+ throws CorrelationServiceException {
+
+ // add to static key hash
+ String key = regisration.getStaticKeyValue();
+ ArrayList regs = (ArrayList) hashByStaticKey.get(key);
+ if (regs == null) {
+ regs = new ArrayList();
+ hashByStaticKey.put(key, regs);
+ }
+ regs.add(regisration);
+
+ // add to static key - dynamic key hash
+ String keyValue = regisration.getKeyValue();
+ if (keyValue != null) {
+
+ HashMap keyHash = (HashMap) hashByStatcKeyandKeyValue.get(key);
+ if (keyHash == null) {
+ keyHash = new HashMap();
+ hashByStatcKeyandKeyValue.put(key, keyHash);
+ }
+ regs = (ArrayList) keyHash.get(keyValue);
+ if (regs == null) {
+ regs = new ArrayList();
+ keyHash.put(keyValue, regs);
+ }
+ regs.add(regisration);
+ }
+
+ // add to static key - op id hash
+ String opId = regisration.getOperationId();
+ String procId = regisration.getRootProcId();
+ HashMap opHash = (HashMap) hashByStaticKeyandOperationIdandProcId.get(key);
+ if (opHash == null) {
+ opHash = new HashMap();
+ hashByStaticKeyandOperationIdandProcId.put(key, opHash);
+ }
+ HashMap procsHash = (HashMap) opHash.get(opId);
+ if (procsHash == null) {
+ procsHash = new HashMap();
+ opHash.put(opId, procsHash);
+ }
+ procsHash.put(procId, regisration);
+
+ ArrayList regList = (ArrayList) hashByProcId.get(procId);
+ if (regList == null) {
+ regList = new ArrayList();
+ hashByProcId.put(procId, regList);
+ }
+ regList.add(regisration);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.instance.service.IMPIProcess)
+ */
+ public Collection getRegistrations(IPMIProcess key)
+ throws CorrelationServiceException {
+ return (ArrayList) hashByProcId.get(key.getKey());
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.event.IStaticKey, java.util.Collection)
+ */
+ public Collection getRegistrations(IStaticKey key, Collection keyValues)
+ throws CorrelationServiceException {
+
+ ArrayList regs = new ArrayList();
+ HashMap keyHash = (HashMap)hashByStatcKeyandKeyValue.get(key.toString());
+ ArrayList keyList = (ArrayList)hashByStaticKey.get(key.toString());
+ if (keyHash != null || keyList != null) {
+ Iterator it = keyValues.iterator();
+ while (it.hasNext()) {
+ String keyValue = (String) it.next();
+ if (keyValue != null) {
+ if ( keyHash != null ) {
+ ArrayList regsSublist = (ArrayList) keyHash.get(keyValue);
+ if (regsSublist != null) {
+ if ( ! regs.containsAll(regsSublist)) {
+ regs.addAll(regsSublist);
+ }
+ }
+ }
+ } else {
+ if (keyList != null) {
+ if ( ! regs.containsAll(keyList) ) {
+ regs.addAll(keyList);
+ }
+ }
+ }
+ }
+ }
+
+ return regs;
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.event.IStaticKey)
+ */
+ public Collection getRegistrations(IStaticKey key)
+ throws CorrelationServiceException {
+ ArrayList regs = (ArrayList)hashByStaticKey.get(key.toString());
+ if ( regs == null ) {
+ return new ArrayList();
+ }
+ return regs;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#update()
+ */
+ public void update() throws BPException {
+ // no update in stateless case
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#init(org.apache.ode.util.BPEProperties, org.apache.ode.engine.ProcessService)
+ */
+ public void init(BPEProperties props, ProcessService ps)
+ throws CorrelationServiceException {
+ this.ps = ps;
+ if (props.getBPELCompliant().equals("TRUE")) {
+ this.isBPELCompliant = true;
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getProcessService()
+ */
+ public ProcessService getProcessService() throws BPException {
+ return ps;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#isBPELCompliant()
+ */
+ public boolean isBPELCompliant() {
+ return isBPELCompliant;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#getRegistrations(org.apache.ode.event.IStaticKey, java.lang.String)
+ */
+ public Registration getRegistration(IStaticKey key, String operationId,
+ String processId) throws CorrelationServiceException {
+ HashMap opHash= (HashMap)hashByStaticKeyandOperationIdandProcId.get(key.toString());
+ if ( opHash == null ) {
+ return null;
+ }
+ HashMap procsHash = (HashMap)opHash.get(operationId);
+ if ( procsHash == null ) {
+ return null;
+ }
+ return (Registration)procsHash.get(processId);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.correlation.CorrelationService#removeRegistration(org.apache.ode.correlation.Registration)
+ */
+ public void removeRegistration(Registration registration) throws CorrelationServiceException {
+ // remove from static key hash
+ String key = registration.getStaticKeyValue();
+ ArrayList regs = (ArrayList) hashByStaticKey.get(key);
+ if ( regs != null ) regs.remove(registration);
+
+ // remove from static key - dynamic key hash
+ String keyValue = registration.getKeyValue();
+ HashMap keyHash = (HashMap) hashByStatcKeyandKeyValue.get(key);
+ if ( keyHash != null ) regs = (ArrayList) keyHash.get(keyValue);
+ if ( regs != null ) regs.remove(registration);
+
+ // remove from static key - op id hash
+ String opId = registration.getOperationId();
+ String procId = registration.getRootProcId();
+ HashMap procsHash=null;
+ HashMap opHash = (HashMap) hashByStaticKeyandOperationIdandProcId.get(key);
+ if ( opHash != null ) procsHash = (HashMap) opHash.get(opId);
+ if ( procsHash != null ) procsHash.remove(procId);
+
+ // remove from proc id hash
+ regs = (ArrayList) hashByProcId.get(procId);
+ if ( regs != null ) regs.remove(registration);
+ }
+
+}
Added: incubator/ode/scratch/ode/src/main/java/org/apache/ode/definition/IPMDAction.java
URL: http://svn.apache.org/viewcvs/incubator/ode/scratch/ode/src/main/java/org/apache/ode/definition/IPMDAction.java?rev=381694&view=auto
==============================================================================
--- incubator/ode/scratch/ode/src/main/java/org/apache/ode/definition/IPMDAction.java (added)
+++ incubator/ode/scratch/ode/src/main/java/org/apache/ode/definition/IPMDAction.java Tue Feb 28 08:31:48 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+/***********************************************************************
+ * Module: IPMDAction.java
+ * Author: waterman
+ * Purpose: Defines the Interface IPMDAction
+ ***********************************************************************/
+
+package org.apache.ode.definition;
+
+import java.util.Properties;
+
+import org.apache.ode.action.internal.IInternalAction;
+import org.apache.ode.definition.service.DefinitionServiceException;
+import org.apache.ode.util.BPException;
+
+/** An implementation of the interface holds the metadata needed to create an Action implementation. */
+public interface IPMDAction extends IPMDLocatorHolder
+{
+ /** A verbose human readable description of the action invocation logic. It allows a business process analyst to augment a business process design.
+ *
+ * @return a description of the Action implementation */
+ java.lang.String getDescription();
+ /** A human readable tag for the action.
+ *
+ * @return a human readable tag for the Action implementation */
+ java.lang.String getLabel();
+
+ /** Uses metadata properties and a n Action factory to instantiate an Action implemenation. The metadata properties were deployed into the metadata repository through the mutator interface. */
+ IInternalAction getActionImpl() throws BPException;
+
+ /** The name of a class that implements the Action interface. It is used by the ActionFactory to instantiate an Action implementation.
+ *
+ * @return the name of an Action class implementation */
+ java.lang.String getActionClass();
+
+ /** @param description */
+ void setDescription(java.lang.String description);
+ /** Returns a list of name/value pairs. These properties are used to initialize an Action implementation. They are consumed by the ActionFactory.
+ *
+ * @return a collection of IPMDProperty */
+ java.util.Properties getMetadata();
+
+ IPMDRoot getRoot();
+
+ /** Creates a name/value pair property in the metadata repository and associates it with the action definition.
+ *
+ * @param name
+ * @param value */
+ void addMetadata(java.lang.String name, Object value) throws DefinitionServiceException;
+ public void setMetadata(Properties p);
+
+ public void setActionClass(String name);
+}