You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/12/03 20:51:02 UTC

svn commit: r723038 - in /ode/branches/restful: bpel-dao/src/main/java/org/apache/ode/bpel/dao/ dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/ dao-jpa/src/main/java/org/apache/ode/da...

Author: mriou
Date: Wed Dec  3 11:51:01 2008
New Revision: 723038

URL: http://svn.apache.org/viewvc?rev=723038&view=rev
Log:
More resource-oriented logic. Routing seems to work. Basic messaging activities (receive and onMessage) are resource aware.

Modified:
    ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
    ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
    ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
    ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
    ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
    ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
    ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
    ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
    ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
    ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java

Modified: ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java (original)
+++ ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java Wed Dec  3 11:51:01 2008
@@ -271,6 +271,8 @@
 
     public void setExecutionStateCounter(int stateCounter);
 
+    Set<String> getAllResourceRoutes();
+
     /**
      * Transport object holding the date of the first and last instance event along with the number events.
      */

Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Wed Dec  3 11:51:01 2008
@@ -446,4 +446,13 @@
         resRoute.setInstance(_instance);
         getSession().save(resRoute);
     }
+
+    public Set<String> getAllResourceRoutes() {
+        Set<HResourceRoute> rr = _instance.getResourceRoutes();
+        HashSet<String> rs = new HashSet<String>();
+        for (HResourceRoute hResourceRoute : rr) {
+            rs.add(hResourceRoute.getUrl() + "~" + hResourceRoute.getMethod());
+        }
+        return rs;
+    }
 }

Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Dec  3 11:51:01 2008
@@ -50,6 +50,8 @@
 
     private Set<HMessageExchange> _messageExchanges = new HashSet<HMessageExchange>();
 
+    private Set<HResourceRoute> _resourceRoutes = new HashSet<HResourceRoute>();
+
     private HFaultData _fault;
 
     private HLargeData _jacobState;
@@ -154,6 +156,19 @@
     }
 
     /**
+     * @hibernate.set lazy="true" inverse="true" cascade="delete"
+     * @hibernate.collection-key column="PIID"
+     * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HResourceRoute"
+     */
+    public Set<HResourceRoute> getResourceRoutes() {
+        return _resourceRoutes;
+    }
+
+    public void setResourceRoutes(Set<HResourceRoute> rroutes) {
+        _resourceRoutes = rroutes;
+    }
+
+    /**
      * @hibernate.property column="PREVIOUS_STATE"
      */
     public short getPreviousState() {

Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java Wed Dec  3 11:51:01 2008
@@ -60,7 +60,7 @@
     }
 
     /**
-     * @hibernate.many-to-one column="INSTANCE"
+     * @hibernate.many-to-one column="PIID"
      */
     public HProcessInstance getInstance() {
         return _instance;

Modified: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Wed Dec  3 11:51:01 2008
@@ -20,16 +20,7 @@
 package org.apache.ode.dao.jpa;
 
 import org.apache.ode.bpel.common.ProcessState;
-import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.CorrelationSetDAO;
-import org.apache.ode.bpel.dao.CorrelatorDAO;
-import org.apache.ode.bpel.dao.FaultDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
-import org.apache.ode.bpel.dao.ScopeStateEnum;
-import org.apache.ode.bpel.dao.XmlDataDAO;
+import org.apache.ode.bpel.dao.*;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
 import org.w3c.dom.Element;
 
@@ -94,6 +85,8 @@
 	private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
 	@OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
     private Collection<ActivityRecoveryDAO> _recoveries = new ArrayList<ActivityRecoveryDAO>();
+	@OneToMany(targetEntity=ResourceRouteDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+    private Collection<ResourceRouteDAO> _resourceRoutes = new ArrayList<ResourceRouteDAO>();
 	@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="FAULT_ID")
 	private FaultDAOImpl _fault;
 	@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
@@ -158,7 +151,17 @@
     }
 
     public void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
-        new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx, this);
+        ResourceRouteDAOImpl rr = new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx, this);
+        rr.setInstance(this);
+        _resourceRoutes.add(rr);
+    }
+
+    public Set<String> getAllResourceRoutes() {
+        HashSet<String> rs = new HashSet<String>();
+        for (ResourceRouteDAO resourceRoute : _resourceRoutes) {
+            rs.add(resourceRoute.getUrl() + "~" + resourceRoute.getMethod());
+        }
+        return rs;
     }
 
     public void finishCompletion() {

Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Dec  3 11:51:01 2008
@@ -21,10 +21,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
@@ -148,6 +145,8 @@
         _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(),
                 faultData.getActivityId(), faultData.getFaultMessage());
 
+        cleanupResourceRoutes();
+
         // send event
         ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
         evt.setOldState(_dao.getState());
@@ -164,6 +163,8 @@
             ODEProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + " completed OK.");
         }
 
+        cleanupResourceRoutes();
+
         // send event
         ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
         evt.setOldState(_dao.getState());
@@ -319,6 +320,15 @@
         // TODO schedule a matcher to see if the message arrived already
     }
 
+    private void cleanupResourceRoutes() {
+        Set<String> routes = _dao.getAllResourceRoutes();
+        for (String route : routes) {
+            String[] resArr = route.split("~");
+            org.apache.ode.bpel.iapi.Resource res = new org.apache.ode.bpel.iapi.Resource(resArr[0], "application/xml", resArr[1]);
+            _bpelProcess._contexts.bindingContext.deactivateProvidedResource(res);
+        }
+    }
+
     public CorrelationKey readCorrelation(CorrelationSet cset) {
         ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId());
         CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName());

Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Wed Dec  3 11:51:01 2008
@@ -73,56 +73,62 @@
         }
         mexdao.setProcess(getProcessDAO());
 
-        Resource instantiatingResource = getResource(mexdao.getResource());
-        InvocationStyle istyle = mexdao.getInvocationStyle();
+        try {
+            Resource instantiatingResource = getResource(mexdao.getResource());
+            InvocationStyle istyle = mexdao.getInvocationStyle();
+
+            if (instantiatingResource != null) {
+                ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
+                newInstance.setInstantiatingUrl(mexdao.getResource());
+
+                // send process instance event
+                NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
+                        getProcessDAO().getProcessId(), newInstance.getInstanceId());
+                evt.setMexId(mexdao.getMessageExchangeId());
+                saveEvent(evt, newInstance);
 
-        if (instantiatingResource != null) {
-            ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
-            newInstance.setInstantiatingUrl(mexdao.getResource());
-
-            // send process instance event
-            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
-                    getProcessDAO().getProcessId(), newInstance.getInstanceId());
-            evt.setMexId(mexdao.getMessageExchangeId());
-            saveEvent(evt, newInstance);
-
-            mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
-            mexdao.setInstance(newInstance);
-
-            doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
-                public Void call() {
-                    executeCreateInstance(mexdao);
-                    return null;
-                }
-            });
-        } else {
-            // TODO avoid reloading the resource routing, it's just been loaded by the server on mex creation
-            String[] urlMeth = mexdao.getResource().split("~");
-            ResourceRouteDAO rr = _contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
-            // This really should have been caught by the server
-            if (rr == null) throw new BpelEngineException("NoSuchResource: " + mexdao.getResource());
-            mexdao.setInstance(rr.getInstance());
-            mexdao.setChannel(rr.getPickResponseChannel() + "&" + rr.getSelectorIdx());
+                mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+                mexdao.setInstance(newInstance);
 
-            if (istyle == InvocationStyle.TRANSACTED) {
                 doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
                     public Void call() {
-                        executeContinueInstanceMyRoleRequestReceived(mexdao);
+                        executeCreateInstance(mexdao);
                         return null;
                     }
                 });
-            } else /* non-transacted style */ {
-                WorkEvent we = new WorkEvent();
-                we.setType(WorkEvent.Type.MYROLE_INVOKE);
-                we.setIID(mexdao.getInstance().getInstanceId());
-                we.setMexId(mexdao.getMessageExchangeId());
-                // Could be different to this pid when routing to an older version
-                we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+            } else {
+                // TODO avoid reloading the resource routing, it's just been loaded by the server on mex creation
+                String[] urlMeth = mexdao.getResource().split("~");
+                ResourceRouteDAO rr = _contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
+                // This really should have been caught by the server
+                if (rr == null) throw new BpelEngineException("NoSuchResource: " + mexdao.getResource());
+                mexdao.setInstance(rr.getInstance());
+                mexdao.setChannel(rr.getPickResponseChannel() + "&" + rr.getSelectorIdx());
+
+                if (istyle == InvocationStyle.TRANSACTED) {
+                    doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+                        public Void call() {
+                            executeContinueInstanceMyRoleRequestReceived(mexdao);
+                            return null;
+                        }
+                    });
+                } else /* non-transacted style */ {
+                    WorkEvent we = new WorkEvent();
+                    we.setType(WorkEvent.Type.MYROLE_INVOKE);
+                    we.setIID(mexdao.getInstance().getInstanceId());
+                    we.setMexId(mexdao.getMessageExchangeId());
+                    // Could be different to this pid when routing to an older version
+                    we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
 
-                scheduleWorkEvent(we, null);
-            }
+                    scheduleWorkEvent(we, null);
+                }
 
+            }
+        } finally {
+            // If we did not get an ACK during this method, then mark this MEX as needing an ASYNC wake-up
+            if (mexdao.getStatus() != MessageExchange.Status.ACK) mexdao.setStatus(MessageExchange.Status.ASYNC);
         }
+
     }
 
     void onRestMexAck(MessageExchangeDAO mexdao, MessageExchange.Status old, String url) {

Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Wed Dec  3 11:51:01 2008
@@ -246,6 +246,14 @@
         _resourceRoutes.put(url, rroute);
     }
 
+    public Set<String> getAllResourceRoutes() {
+        HashSet<String> rs = new HashSet<String>();
+        for (ResourceRouteDAO routeDAO : _resourceRoutes.values()) {
+            rs.add(routeDAO.getUrl() + "~" + routeDAO.getMethod());
+        }
+        return rs;
+    }
+
     /**
      * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
      */

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java Wed Dec  3 11:51:01 2008
@@ -71,6 +71,8 @@
 
     void unregisterActivityForRecovery(ActivityRecoveryChannel recoveryChannel);
 
+    void cancelOutstandingRequests(String channelId);
+
     void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
             throws FaultException;
 

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java Wed Dec  3 11:51:01 2008
@@ -119,7 +119,7 @@
 
         Entry entry = _byChannel.remove(pickResponseChannel);
         if (entry != null) {
-            _byRid.values().remove(entry);
+            while(_byRid.values().remove(entry));
         }
     }
 

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java Wed Dec  3 11:51:01 2008
@@ -108,6 +108,10 @@
 
     }
 
+    public void cancelOutstandingRequests(String channelId) {
+        getORM().cancel(channelId);
+    }
+
     public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
             throws FaultException {
 

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java Wed Dec  3 11:51:01 2008
@@ -68,10 +68,8 @@
 
     /** Has a termination of this handler been requested. */
     private boolean _terminated;
-
     private boolean _childrenTerminated;
 
-
     EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
         _scopeFrame = scopeFrame;
         _oevent = o;
@@ -80,9 +78,8 @@
         _ehc = ehc;
     }
 
-
     public void run() {
-        instance(new SELECT(_scopeFrame));
+        instance(new SELECT(_scopeFrame, 0));
     }
 
     /**
@@ -96,6 +93,7 @@
             _childrenTerminated = true;
         }
     }
+
     /**
      * Template that does the actual selection interaction with the runtime system, and
      * then waits on the pick response channel.
@@ -104,8 +102,11 @@
 
         private static final long serialVersionUID = 1L;
 
-        public SELECT(ScopeFrame scopeFrame) {
+        private int _counter;
+
+        public SELECT(ScopeFrame scopeFrame, int counter) {
             _scopeFrame = scopeFrame;
+            _counter = counter;
         }
 
         /**
@@ -117,7 +118,7 @@
                 PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
                 if (_oevent.isRestful()) {
                     getBpelRuntime().checkResourceRoute(_scopeFrame.resolve(_oevent.resource),
-                            _oevent.messageExchangeId, pickResponseChannel, 0);
+                            _oevent.messageExchangeId+_counter, pickResponseChannel, 0);
                 } else {
                     CorrelationKey key;
                     PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
@@ -133,17 +134,18 @@
                         assert key != null;
                     }
 
-                    selector =  new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+                    selector =  new Selector(0,pLinkInstance,_oevent.operation.getName(),
+                            _oevent.operation.getOutput() == null, _oevent.messageExchangeId+_counter, key);
                     getBpelRuntime().select(pickResponseChannel, null, false, new Selector[] { selector} );
                 }
-                instance(new WAITING(pickResponseChannel, _scopeFrame));
+                instance(new WAITING(pickResponseChannel, _scopeFrame, _counter));
             } catch(FaultException e){
                 __log.error(e);
                 if (_fault == null) {
                     _fault = createFault(e.getQName(), _oevent);
                 }
                 terminateActive();
-                instance(new WAITING(null, _scopeFrame));
+                instance(new WAITING(null, _scopeFrame, _counter));
             }
         }
     }
@@ -154,10 +156,12 @@
     private class WAITING extends BpelJacobRunnable {
         private static final long serialVersionUID = 1L;
         private PickResponseChannel _pickResponseChannel;
+        private int _counter;
 
-        private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame scopeFrame) {
+        private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame scopeFrame, int counter) {
             _pickResponseChannel = pickResponseChannel;
             _scopeFrame = scopeFrame;
+            _counter = counter;
         }
 
         public void run() {
@@ -168,7 +172,6 @@
                 if (!_terminated) {
                     mlset.add(new TerminationChannelListener(_tc) {
                         private static final long serialVersionUID = 7666910462948788042L;
-
                         public void terminate() {
                             terminateActive();
                             _terminated = true;
@@ -177,13 +180,11 @@
                             instance(WAITING.this);
                         }
                     });
-
                 }
 
                 if (!_stopped) {
                     mlset.add(new EventHandlerControlChannelListener(_ehc) {
                         private static final long serialVersionUID = -1050788954724647970L;
-
                         public void stop() {
                             _stopped = true;
                             if (_pickResponseChannel != null)
@@ -191,18 +192,15 @@
                             instance(WAITING.this);
                         }
                     });
-
                 }
 
                 for (final ActivityInfo ai : _active) {
                     mlset.add(new ParentScopeChannelListener(ai.parent) {
                         private static final long serialVersionUID = 5341207762415360982L;
-
                         public void compensate(OScope scope, SynchChannel ret) {
                             _psc.compensate(scope, ret);
                             instance(WAITING.this);
                         }
-
                         public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
                             _active.remove(ai);
                             _comps.addAll(compensations);
@@ -213,7 +211,6 @@
                             } else
                                 instance(WAITING.this);
                         }
-
                         public void cancelled() { completed(null, CompensationHandler.emptySet()); }
                         public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
                     });
@@ -228,6 +225,14 @@
                             ScopeFrame ehScopeFrame = new ScopeFrame(_oevent,
                                     getBpelRuntime().createScopeInstance(_scopeFrame.scopeInstanceId, _oevent),
                                     _scopeFrame, _comps, _fault);
+                            ehScopeFrame.eventScope = true;
+                            if (_oevent.isRestful()) {
+                                getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.resource),
+                                        _oevent.messageExchangeId+_counter, _oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+                            } else {
+                                getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.partnerLink), _oevent.operation.getName(),
+                                        _oevent.messageExchangeId+_counter, _oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+                            }
 
                             if (_oevent.variable != null) {
                                 Element msgEl = getBpelRuntime().getMyRequest(mexId);
@@ -250,7 +255,6 @@
                                 }
                             }
 
-
                             try {
                                 for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
                                     initializeCorrelation(ehScopeFrame.resolve(cset), ehScopeFrame.resolve(_oevent.variable));
@@ -279,7 +283,7 @@
                                     _fault = createFault(e.getQName(), _oevent);
                                     terminateActive();
                                 }
-                                instance(new WAITING(null, _scopeFrame));
+                                instance(new WAITING(null, _scopeFrame, _counter));
                                 return;
                             }
 
@@ -299,18 +303,17 @@
                             if (_childrenTerminated) replication(child.self).terminate();
 
                             if (_terminated || _stopped || _fault != null)
-                                instance(new WAITING(null, _scopeFrame));
+                                instance(new WAITING(null, _scopeFrame, _counter));
                             else
-                                instance(new SELECT(_scopeFrame));
+                                instance(new SELECT(_scopeFrame, _counter+1));
                         }
 
-
                         public void onTimeout() {
-                            instance(new WAITING(null, _scopeFrame));
+                            instance(new WAITING(null, _scopeFrame, _counter));
                         }
 
                         public void onCancel() {
-                            instance(new WAITING(null, _scopeFrame));
+                            instance(new WAITING(null, _scopeFrame, _counter));
                         }
                     });
 

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java Wed Dec  3 11:51:01 2008
@@ -214,7 +214,7 @@
         }
 
         public String getDescription() {
-            StringBuffer buf = new StringBuffer(declaringScope.name);
+            StringBuffer buf = new StringBuffer(declaringScope.name != null ? declaringScope.name : "");
             buf.append('.');
             buf.append(name);
             return buf.toString();

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java Wed Dec  3 11:51:01 2008
@@ -77,6 +77,8 @@
 
     void unregisterActivityForRecovery(ActivityRecoveryChannel recoveryChannel);
 
+    void cancelOutstandingRequests(String channelId);
+
     void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
             throws FaultException;
 
@@ -124,6 +126,10 @@
 
     void forceFlush();
 
+    void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid);
+
+    void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid);
+
     void reply(PartnerLinkInstance plink, String opName, String bpelmex, Element element, QName fault)
             throws FaultException;
 

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java Wed Dec  3 11:51:01 2008
@@ -140,10 +140,10 @@
 
         Entry entry = _byChannel.remove(pickResponseChannel);
         if (entry != null)
-            _byRid.values().remove(entry);
+            while(_byRid.values().remove(entry));
         RestEntry restEntry = _byRestChannel.remove(pickResponseChannel);
         if (restEntry != null)
-            _byRestRid.values().remove(restEntry);
+            while(_byRestRid.values().remove(restEntry));
     }
 
     /**
@@ -184,6 +184,20 @@
         }
     }
 
+    public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+        RequestIdTuple rid = new RequestIdTuple(plinkInstance, opName, mexRef);
+        Entry entry = _byRid.remove(rid);
+        rid.mexId = scopeIid;
+        _byRid.put(rid, entry);
+    }
+
+    public void associateEvent(ResourceInstance resourceInstance, String method, String mexRef, String scopeIid) {
+        RequestResTuple rid = new RequestResTuple(resourceInstance, method, mexRef);
+        RestEntry entry = _byRestRid.remove(rid);
+        rid.mexId = scopeIid;
+        _byRestRid.put(rid, entry);
+    }
+
     /**
      * Release the registration. This method is called when the reply activity sends a reply corresponding to the
      * registration.

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java Wed Dec  3 11:51:01 2008
@@ -55,13 +55,17 @@
             for (OScope.CorrelationSet cset : oreply.initCorrelations)
                 initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(oreply.variable));
 
+            // If this reply matches an event, we have to know which scope it fits in
+            ScopeFrame eventFrame = _scopeFrame.findEventScope();
+            String eventFrameId = eventFrame == null ? "" : eventFrame.scopeInstanceId.toString();
+
             // send reply
             if (oreply.resource != null)
                 getBpelRuntime().reply(_scopeFrame.resolve(oreply.resource), 
-                        oreply.messageExchangeId, (Element)msg, oreply.fault);
+                        oreply.messageExchangeId+eventFrameId, (Element)msg, oreply.fault);
             else
                 getBpelRuntime().reply(_scopeFrame.resolve(oreply.partnerLink), oreply.operation.getName(),
-                        oreply.messageExchangeId, (Element)msg, oreply.fault);
+                        oreply.messageExchangeId+eventFrameId, (Element)msg, oreply.fault);
         } catch (FaultException e) {
             __log.error(e);
             fault = createFault(e.getQName(), oreply);

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java Wed Dec  3 11:51:01 2008
@@ -119,6 +119,10 @@
         return _brc.getInstantiatingUrl();
     }
 
+    public void cancelOutstandingRequests(String channelId) {
+        getORM().cancel(channelId);
+    }
+
     public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
             throws FaultException {
 
@@ -449,6 +453,14 @@
         _brc.sendEvent(evt);
     }
 
+    public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+        getORM().associateEvent(plinkInstance, opName, mexRef, scopeIid);
+    }
+
+    public void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid) {
+        getORM().associateEvent(resourceInstance, resourceInstance.getModel().getMethod(), mexRef, scopeIid);
+    }
+
     public void reply(PartnerLinkInstance plink, String opName, String bpelmex, Element element, QName fault) throws FaultException {
         String mexid = getORM().release(plink, opName, bpelmex);
         if (mexid == null)

Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java Wed Dec  3 11:51:01 2008
@@ -50,6 +50,8 @@
     
     final InstanceGlobals globals;
 
+    boolean eventScope = false;
+
     /** Constructor used to create "fault" scopes. */
     ScopeFrame( OScope scopeDef,
                 Long scopeInstanceId,
@@ -85,6 +87,12 @@
         return (parent != null) ? parent.find(scope) : null;
     }
 
+    public ScopeFrame findEventScope() {
+        if (eventScope) return this;
+        else if (parent == null) return null;
+        else return parent.findEventScope();
+    }
+
     public VariableInstance resolve(OScope.Variable variable) {
         ScopeFrame scopeFrame = find(variable.declaringScope);
         if (scopeFrame == null) return null;

Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java Wed Dec  3 11:51:01 2008
@@ -126,6 +126,10 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void cancelOutstandingRequests(String channelId) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors) throws FaultException {
         //To change body of implemented methods use File | Settings | File Templates.
     }

Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java Wed Dec  3 11:51:01 2008
@@ -128,6 +128,14 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void reply(ResourceInstance resource, String bpelmex, Element element, QName fault) throws FaultException {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -144,6 +152,10 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void cancelOutstandingRequests(String channelId) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors) throws FaultException {
         //To change body of implemented methods use File | Settings | File Templates.
     }