You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/12/03 20:51:02 UTC
svn commit: r723038 - in /ode/branches/restful:
bpel-dao/src/main/java/org/apache/ode/bpel/dao/
dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/
dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/
dao-jpa/src/main/java/org/apache/ode/da...
Author: mriou
Date: Wed Dec 3 11:51:01 2008
New Revision: 723038
URL: http://svn.apache.org/viewvc?rev=723038&view=rev
Log:
More resource-oriented logic. Routing seems to work. Basic messaging activities (receive and onMessage) are resource aware.
Modified:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
Modified: ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java (original)
+++ ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java Wed Dec 3 11:51:01 2008
@@ -271,6 +271,8 @@
public void setExecutionStateCounter(int stateCounter);
+ Set<String> getAllResourceRoutes();
+
/**
* Transport object holding the date of the first and last instance event along with the number events.
*/
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Wed Dec 3 11:51:01 2008
@@ -446,4 +446,13 @@
resRoute.setInstance(_instance);
getSession().save(resRoute);
}
+
+ public Set<String> getAllResourceRoutes() {
+ Set<HResourceRoute> rr = _instance.getResourceRoutes();
+ HashSet<String> rs = new HashSet<String>();
+ for (HResourceRoute hResourceRoute : rr) {
+ rs.add(hResourceRoute.getUrl() + "~" + hResourceRoute.getMethod());
+ }
+ return rs;
+ }
}
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Dec 3 11:51:01 2008
@@ -50,6 +50,8 @@
private Set<HMessageExchange> _messageExchanges = new HashSet<HMessageExchange>();
+ private Set<HResourceRoute> _resourceRoutes = new HashSet<HResourceRoute>();
+
private HFaultData _fault;
private HLargeData _jacobState;
@@ -154,6 +156,19 @@
}
/**
+ * @hibernate.set lazy="true" inverse="true" cascade="delete"
+ * @hibernate.collection-key column="PIID"
+ * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HResourceRoute"
+ */
+ public Set<HResourceRoute> getResourceRoutes() {
+ return _resourceRoutes;
+ }
+
+ public void setResourceRoutes(Set<HResourceRoute> rroutes) {
+ _resourceRoutes = rroutes;
+ }
+
+ /**
* @hibernate.property column="PREVIOUS_STATE"
*/
public short getPreviousState() {
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java Wed Dec 3 11:51:01 2008
@@ -60,7 +60,7 @@
}
/**
- * @hibernate.many-to-one column="INSTANCE"
+ * @hibernate.many-to-one column="PIID"
*/
public HProcessInstance getInstance() {
return _instance;
Modified: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Wed Dec 3 11:51:01 2008
@@ -20,16 +20,7 @@
package org.apache.ode.dao.jpa;
import org.apache.ode.bpel.common.ProcessState;
-import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.CorrelationSetDAO;
-import org.apache.ode.bpel.dao.CorrelatorDAO;
-import org.apache.ode.bpel.dao.FaultDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
-import org.apache.ode.bpel.dao.ScopeStateEnum;
-import org.apache.ode.bpel.dao.XmlDataDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.w3c.dom.Element;
@@ -94,6 +85,8 @@
private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
@OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
private Collection<ActivityRecoveryDAO> _recoveries = new ArrayList<ActivityRecoveryDAO>();
+ @OneToMany(targetEntity=ResourceRouteDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+ private Collection<ResourceRouteDAO> _resourceRoutes = new ArrayList<ResourceRouteDAO>();
@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="FAULT_ID")
private FaultDAOImpl _fault;
@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
@@ -158,7 +151,17 @@
}
public void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
- new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx, this);
+ ResourceRouteDAOImpl rr = new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx, this);
+ rr.setInstance(this);
+ _resourceRoutes.add(rr);
+ }
+
+ public Set<String> getAllResourceRoutes() {
+ HashSet<String> rs = new HashSet<String>();
+ for (ResourceRouteDAO resourceRoute : _resourceRoutes) {
+ rs.add(resourceRoute.getUrl() + "~" + resourceRoute.getMethod());
+ }
+ return rs;
}
public void finishCompletion() {
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Dec 3 11:51:01 2008
@@ -21,10 +21,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
import javax.wsdl.Operation;
import javax.xml.namespace.QName;
@@ -148,6 +145,8 @@
_dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(),
faultData.getActivityId(), faultData.getFaultMessage());
+ cleanupResourceRoutes();
+
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
@@ -164,6 +163,8 @@
ODEProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + " completed OK.");
}
+ cleanupResourceRoutes();
+
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
@@ -319,6 +320,15 @@
// TODO schedule a matcher to see if the message arrived already
}
+ private void cleanupResourceRoutes() {
+ Set<String> routes = _dao.getAllResourceRoutes();
+ for (String route : routes) {
+ String[] resArr = route.split("~");
+ org.apache.ode.bpel.iapi.Resource res = new org.apache.ode.bpel.iapi.Resource(resArr[0], "application/xml", resArr[1]);
+ _bpelProcess._contexts.bindingContext.deactivateProvidedResource(res);
+ }
+ }
+
public CorrelationKey readCorrelation(CorrelationSet cset) {
ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId());
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName());
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Wed Dec 3 11:51:01 2008
@@ -73,56 +73,62 @@
}
mexdao.setProcess(getProcessDAO());
- Resource instantiatingResource = getResource(mexdao.getResource());
- InvocationStyle istyle = mexdao.getInvocationStyle();
+ try {
+ Resource instantiatingResource = getResource(mexdao.getResource());
+ InvocationStyle istyle = mexdao.getInvocationStyle();
+
+ if (instantiatingResource != null) {
+ ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
+ newInstance.setInstantiatingUrl(mexdao.getResource());
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
+ getProcessDAO().getProcessId(), newInstance.getInstanceId());
+ evt.setMexId(mexdao.getMessageExchangeId());
+ saveEvent(evt, newInstance);
- if (instantiatingResource != null) {
- ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
- newInstance.setInstantiatingUrl(mexdao.getResource());
-
- // send process instance event
- NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
- getProcessDAO().getProcessId(), newInstance.getInstanceId());
- evt.setMexId(mexdao.getMessageExchangeId());
- saveEvent(evt, newInstance);
-
- mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
- mexdao.setInstance(newInstance);
-
- doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
- public Void call() {
- executeCreateInstance(mexdao);
- return null;
- }
- });
- } else {
- // TODO avoid reloading the resource routing, it's just been loaded by the server on mex creation
- String[] urlMeth = mexdao.getResource().split("~");
- ResourceRouteDAO rr = _contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
- // This really should have been caught by the server
- if (rr == null) throw new BpelEngineException("NoSuchResource: " + mexdao.getResource());
- mexdao.setInstance(rr.getInstance());
- mexdao.setChannel(rr.getPickResponseChannel() + "&" + rr.getSelectorIdx());
+ mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+ mexdao.setInstance(newInstance);
- if (istyle == InvocationStyle.TRANSACTED) {
doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
public Void call() {
- executeContinueInstanceMyRoleRequestReceived(mexdao);
+ executeCreateInstance(mexdao);
return null;
}
});
- } else /* non-transacted style */ {
- WorkEvent we = new WorkEvent();
- we.setType(WorkEvent.Type.MYROLE_INVOKE);
- we.setIID(mexdao.getInstance().getInstanceId());
- we.setMexId(mexdao.getMessageExchangeId());
- // Could be different to this pid when routing to an older version
- we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+ } else {
+ // TODO avoid reloading the resource routing, it's just been loaded by the server on mex creation
+ String[] urlMeth = mexdao.getResource().split("~");
+ ResourceRouteDAO rr = _contexts.dao.getConnection().getResourceRoute(urlMeth[0], urlMeth[1]);
+ // This really should have been caught by the server
+ if (rr == null) throw new BpelEngineException("NoSuchResource: " + mexdao.getResource());
+ mexdao.setInstance(rr.getInstance());
+ mexdao.setChannel(rr.getPickResponseChannel() + "&" + rr.getSelectorIdx());
+
+ if (istyle == InvocationStyle.TRANSACTED) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+ public Void call() {
+ executeContinueInstanceMyRoleRequestReceived(mexdao);
+ return null;
+ }
+ });
+ } else /* non-transacted style */ {
+ WorkEvent we = new WorkEvent();
+ we.setType(WorkEvent.Type.MYROLE_INVOKE);
+ we.setIID(mexdao.getInstance().getInstanceId());
+ we.setMexId(mexdao.getMessageExchangeId());
+ // Could be different to this pid when routing to an older version
+ we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
- scheduleWorkEvent(we, null);
- }
+ scheduleWorkEvent(we, null);
+ }
+ }
+ } finally {
+ // If we did not get an ACK during this method, then mark this MEX as needing an ASYNC wake-up
+ if (mexdao.getStatus() != MessageExchange.Status.ACK) mexdao.setStatus(MessageExchange.Status.ASYNC);
}
+
}
void onRestMexAck(MessageExchangeDAO mexdao, MessageExchange.Status old, String url) {
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Wed Dec 3 11:51:01 2008
@@ -246,6 +246,14 @@
_resourceRoutes.put(url, rroute);
}
+ public Set<String> getAllResourceRoutes() {
+ HashSet<String> rs = new HashSet<String>();
+ for (ResourceRouteDAO routeDAO : _resourceRoutes.values()) {
+ rs.add(routeDAO.getUrl() + "~" + routeDAO.getMethod());
+ }
+ return rs;
+ }
+
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
*/
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OdeInternalInstance.java Wed Dec 3 11:51:01 2008
@@ -71,6 +71,8 @@
void unregisterActivityForRecovery(ActivityRecoveryChannel recoveryChannel);
+ void cancelOutstandingRequests(String channelId);
+
void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
throws FaultException;
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/OutstandingRequestManager.java Wed Dec 3 11:51:01 2008
@@ -119,7 +119,7 @@
Entry entry = _byChannel.remove(pickResponseChannel);
if (entry != null) {
- _byRid.values().remove(entry);
+ while(_byRid.values().remove(entry));
}
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java Wed Dec 3 11:51:01 2008
@@ -108,6 +108,10 @@
}
+ public void cancelOutstandingRequests(String channelId) {
+ getORM().cancel(channelId);
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
throws FaultException {
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/EH_EVENT.java Wed Dec 3 11:51:01 2008
@@ -68,10 +68,8 @@
/** Has a termination of this handler been requested. */
private boolean _terminated;
-
private boolean _childrenTerminated;
-
EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
_scopeFrame = scopeFrame;
_oevent = o;
@@ -80,9 +78,8 @@
_ehc = ehc;
}
-
public void run() {
- instance(new SELECT(_scopeFrame));
+ instance(new SELECT(_scopeFrame, 0));
}
/**
@@ -96,6 +93,7 @@
_childrenTerminated = true;
}
}
+
/**
* Template that does the actual selection interaction with the runtime system, and
* then waits on the pick response channel.
@@ -104,8 +102,11 @@
private static final long serialVersionUID = 1L;
- public SELECT(ScopeFrame scopeFrame) {
+ private int _counter;
+
+ public SELECT(ScopeFrame scopeFrame, int counter) {
_scopeFrame = scopeFrame;
+ _counter = counter;
}
/**
@@ -117,7 +118,7 @@
PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
if (_oevent.isRestful()) {
getBpelRuntime().checkResourceRoute(_scopeFrame.resolve(_oevent.resource),
- _oevent.messageExchangeId, pickResponseChannel, 0);
+ _oevent.messageExchangeId+_counter, pickResponseChannel, 0);
} else {
CorrelationKey key;
PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
@@ -133,17 +134,18 @@
assert key != null;
}
- selector = new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+ selector = new Selector(0,pLinkInstance,_oevent.operation.getName(),
+ _oevent.operation.getOutput() == null, _oevent.messageExchangeId+_counter, key);
getBpelRuntime().select(pickResponseChannel, null, false, new Selector[] { selector} );
}
- instance(new WAITING(pickResponseChannel, _scopeFrame));
+ instance(new WAITING(pickResponseChannel, _scopeFrame, _counter));
} catch(FaultException e){
__log.error(e);
if (_fault == null) {
_fault = createFault(e.getQName(), _oevent);
}
terminateActive();
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
}
}
@@ -154,10 +156,12 @@
private class WAITING extends BpelJacobRunnable {
private static final long serialVersionUID = 1L;
private PickResponseChannel _pickResponseChannel;
+ private int _counter;
- private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame scopeFrame) {
+ private WAITING(PickResponseChannel pickResponseChannel, ScopeFrame scopeFrame, int counter) {
_pickResponseChannel = pickResponseChannel;
_scopeFrame = scopeFrame;
+ _counter = counter;
}
public void run() {
@@ -168,7 +172,6 @@
if (!_terminated) {
mlset.add(new TerminationChannelListener(_tc) {
private static final long serialVersionUID = 7666910462948788042L;
-
public void terminate() {
terminateActive();
_terminated = true;
@@ -177,13 +180,11 @@
instance(WAITING.this);
}
});
-
}
if (!_stopped) {
mlset.add(new EventHandlerControlChannelListener(_ehc) {
private static final long serialVersionUID = -1050788954724647970L;
-
public void stop() {
_stopped = true;
if (_pickResponseChannel != null)
@@ -191,18 +192,15 @@
instance(WAITING.this);
}
});
-
}
for (final ActivityInfo ai : _active) {
mlset.add(new ParentScopeChannelListener(ai.parent) {
private static final long serialVersionUID = 5341207762415360982L;
-
public void compensate(OScope scope, SynchChannel ret) {
_psc.compensate(scope, ret);
instance(WAITING.this);
}
-
public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
_active.remove(ai);
_comps.addAll(compensations);
@@ -213,7 +211,6 @@
} else
instance(WAITING.this);
}
-
public void cancelled() { completed(null, CompensationHandler.emptySet()); }
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
});
@@ -228,6 +225,14 @@
ScopeFrame ehScopeFrame = new ScopeFrame(_oevent,
getBpelRuntime().createScopeInstance(_scopeFrame.scopeInstanceId, _oevent),
_scopeFrame, _comps, _fault);
+ ehScopeFrame.eventScope = true;
+ if (_oevent.isRestful()) {
+ getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.resource),
+ _oevent.messageExchangeId+_counter, _oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+ } else {
+ getBpelRuntime().associateEvent(_scopeFrame.resolve(_oevent.partnerLink), _oevent.operation.getName(),
+ _oevent.messageExchangeId+_counter, _oevent.messageExchangeId+ehScopeFrame.scopeInstanceId);
+ }
if (_oevent.variable != null) {
Element msgEl = getBpelRuntime().getMyRequest(mexId);
@@ -250,7 +255,6 @@
}
}
-
try {
for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
initializeCorrelation(ehScopeFrame.resolve(cset), ehScopeFrame.resolve(_oevent.variable));
@@ -279,7 +283,7 @@
_fault = createFault(e.getQName(), _oevent);
terminateActive();
}
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
return;
}
@@ -299,18 +303,17 @@
if (_childrenTerminated) replication(child.self).terminate();
if (_terminated || _stopped || _fault != null)
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
else
- instance(new SELECT(_scopeFrame));
+ instance(new SELECT(_scopeFrame, _counter+1));
}
-
public void onTimeout() {
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
public void onCancel() {
- instance(new WAITING(null, _scopeFrame));
+ instance(new WAITING(null, _scopeFrame, _counter));
}
});
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OScope.java Wed Dec 3 11:51:01 2008
@@ -214,7 +214,7 @@
}
public String getDescription() {
- StringBuffer buf = new StringBuffer(declaringScope.name);
+ StringBuffer buf = new StringBuffer(declaringScope.name != null ? declaringScope.name : "");
buf.append('.');
buf.append(name);
return buf.toString();
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java Wed Dec 3 11:51:01 2008
@@ -77,6 +77,8 @@
void unregisterActivityForRecovery(ActivityRecoveryChannel recoveryChannel);
+ void cancelOutstandingRequests(String channelId);
+
void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
throws FaultException;
@@ -124,6 +126,10 @@
void forceFlush();
+ void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid);
+
+ void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid);
+
void reply(PartnerLinkInstance plink, String opName, String bpelmex, Element element, QName fault)
throws FaultException;
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java Wed Dec 3 11:51:01 2008
@@ -140,10 +140,10 @@
Entry entry = _byChannel.remove(pickResponseChannel);
if (entry != null)
- _byRid.values().remove(entry);
+ while(_byRid.values().remove(entry));
RestEntry restEntry = _byRestChannel.remove(pickResponseChannel);
if (restEntry != null)
- _byRestRid.values().remove(restEntry);
+ while(_byRestRid.values().remove(restEntry));
}
/**
@@ -184,6 +184,20 @@
}
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+ RequestIdTuple rid = new RequestIdTuple(plinkInstance, opName, mexRef);
+ Entry entry = _byRid.remove(rid);
+ rid.mexId = scopeIid;
+ _byRid.put(rid, entry);
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String method, String mexRef, String scopeIid) {
+ RequestResTuple rid = new RequestResTuple(resourceInstance, method, mexRef);
+ RestEntry entry = _byRestRid.remove(rid);
+ rid.mexId = scopeIid;
+ _byRestRid.put(rid, entry);
+ }
+
/**
* Release the registration. This method is called when the reply activity sends a reply corresponding to the
* registration.
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/REPLY.java Wed Dec 3 11:51:01 2008
@@ -55,13 +55,17 @@
for (OScope.CorrelationSet cset : oreply.initCorrelations)
initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(oreply.variable));
+ // If this reply matches an event, we have to know which scope it fits in
+ ScopeFrame eventFrame = _scopeFrame.findEventScope();
+ String eventFrameId = eventFrame == null ? "" : eventFrame.scopeInstanceId.toString();
+
// send reply
if (oreply.resource != null)
getBpelRuntime().reply(_scopeFrame.resolve(oreply.resource),
- oreply.messageExchangeId, (Element)msg, oreply.fault);
+ oreply.messageExchangeId+eventFrameId, (Element)msg, oreply.fault);
else
getBpelRuntime().reply(_scopeFrame.resolve(oreply.partnerLink), oreply.operation.getName(),
- oreply.messageExchangeId, (Element)msg, oreply.fault);
+ oreply.messageExchangeId+eventFrameId, (Element)msg, oreply.fault);
} catch (FaultException e) {
__log.error(e);
fault = createFault(e.getQName(), oreply);
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java Wed Dec 3 11:51:01 2008
@@ -119,6 +119,10 @@
return _brc.getInstantiatingUrl();
}
+ public void cancelOutstandingRequests(String channelId) {
+ getORM().cancel(channelId);
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
throws FaultException {
@@ -449,6 +453,14 @@
_brc.sendEvent(evt);
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+ getORM().associateEvent(plinkInstance, opName, mexRef, scopeIid);
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid) {
+ getORM().associateEvent(resourceInstance, resourceInstance.getModel().getMethod(), mexRef, scopeIid);
+ }
+
public void reply(PartnerLinkInstance plink, String opName, String bpelmex, Element element, QName fault) throws FaultException {
String mexid = getORM().release(plink, opName, bpelmex);
if (mexid == null)
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/ScopeFrame.java Wed Dec 3 11:51:01 2008
@@ -50,6 +50,8 @@
final InstanceGlobals globals;
+ boolean eventScope = false;
+
/** Constructor used to create "fault" scopes. */
ScopeFrame( OScope scopeDef,
Long scopeInstanceId,
@@ -85,6 +87,12 @@
return (parent != null) ? parent.find(scope) : null;
}
+ public ScopeFrame findEventScope() {
+ if (eventScope) return this;
+ else if (parent == null) return null;
+ else return parent.findEventScope();
+ }
+
public VariableInstance resolve(OScope.Variable variable) {
ScopeFrame scopeFrame = find(variable.declaringScope);
if (scopeFrame == null) return null;
Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v1/CoreBpelTest.java Wed Dec 3 11:51:01 2008
@@ -126,6 +126,10 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void cancelOutstandingRequests(String channelId) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors) throws FaultException {
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java?rev=723038&r1=723037&r2=723038&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java Wed Dec 3 11:51:01 2008
@@ -128,6 +128,14 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void associateEvent(PartnerLinkInstance plinkInstance, String opName, String mexRef, String scopeIid) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void associateEvent(ResourceInstance resourceInstance, String mexRef, String scopeIid) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void reply(ResourceInstance resource, String bpelmex, Element element, QName fault) throws FaultException {
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -144,6 +152,10 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void cancelOutstandingRequests(String channelId) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors) throws FaultException {
//To change body of implemented methods use File | Settings | File Templates.
}