You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/10/29 01:21:14 UTC
svn commit: r708743 - in /ode/branches/restful:
bpel-api/src/main/java/org/apache/ode/bpel/rapi/
bpel-dao/src/main/java/org/apache/ode/bpel/dao/
dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/
dao-hibernate/src/main/java/org/apache/ode/daohib/b...
Author: mriou
Date: Tue Oct 28 17:21:13 2008
New Revision: 708743
URL: http://svn.apache.org/viewvc?rev=708743&view=rev
Log:
More on the way of matching receives waiting for REST calls.
Added:
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
Modified:
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRTInstanceContext.java
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
ode/branches/restful/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/PICK.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRTInstanceContext.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRTInstanceContext.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRTInstanceContext.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRTInstanceContext.java Tue Oct 28 17:21:13 2008
@@ -57,5 +57,6 @@
* @param optionalFaultData
*/
void noreply(String mexId, FaultInfo optionalFaultData);
-
+
+ void checkResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx);
}
Modified: ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java (original)
+++ ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java Tue Oct 28 17:21:13 2008
@@ -164,9 +164,6 @@
/**
* Returns all the scopes belonging to this isntance.
- *
- * @param scopeName
- * @return
*/
Collection<ScopeDAO> getScopes();
@@ -178,6 +175,14 @@
CorrelatorDAO getInstantiatingCorrelator();
/**
+ * Returns the URL this instance has been instantiated under.
+ * @return
+ */
+ String getInstantiatingUrl();
+
+ void setInstantiatingUrl(String url);
+
+ /**
* Returns all variable instances matching the variable name for a specified scope.
*/
XmlDataDAO[] getVariables(String variableName, int scopeModelId);
@@ -251,8 +256,10 @@
* Create an activity recovery object for a given activity instance. Specify the reason and optional data associated with the
* failure. Date/time failure occurred, and the recovery channel and available recovery actions.
*/
- void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data, String[] actions,
- int retries);
+ void createActivityRecovery(String channel, long activityId, String reason, Date dateTime,
+ Element data, String[] actions, int retries);
+
+ void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx);
/**
* Delete previously registered activity recovery.
Added: ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java?rev=708743&view=auto
==============================================================================
--- ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java (added)
+++ ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ResourceRouteDAO.java Tue Oct 28 17:21:13 2008
@@ -0,0 +1,4 @@
+package org.apache.ode.bpel.dao;
+
+public interface ResourceRouteDAO {
+}
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Tue Oct 28 17:21:13 2008
@@ -427,7 +427,23 @@
public void setExecutionStateCounter(int stateCounter) {
_instance.setExecutionStateCounter(stateCounter);
+ }
+
+ public String getInstantiatingUrl() {
+ return _instance.getInstantiatingUrl();
+ }
+ public void setInstantiatingUrl(String url) {
+ _instance.setInstantiatingUrl(url);
}
+ public void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
+ HResourceRoute resRoute = new HResourceRoute();
+ resRoute.setUrl(url);
+ resRoute.setMethod(method);
+ resRoute.setChannelId(pickResponseChannel);
+ resRoute.setIndex(selectorIdx);
+ resRoute.setInstance(_instance);
+ getSession().save(resRoute);
+ }
}
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Tue Oct 28 17:21:13 2008
@@ -36,6 +36,8 @@
/** Foreign key to the instantiating {@link HCorrelator}. */
private HCorrelator _instantiatingCorrelator;
+ private String _instantiatingUrl;
+
/** Scope instances belonging to this process instnace. */
private Collection<HScope> _scopes = new HashSet<HScope>();
@@ -281,4 +283,15 @@
}
+ /**
+ * @hibernate.property column="INSTANTIATE_URL" size=255
+ * @return
+ */
+ public String getInstantiatingUrl() {
+ return _instantiatingUrl;
+ }
+
+ public void setInstantiatingUrl(String instantiatingUrl) {
+ _instantiatingUrl = instantiatingUrl;
+ }
}
Added: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java?rev=708743&view=auto
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java (added)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HResourceRoute.java Tue Oct 28 17:21:13 2008
@@ -0,0 +1,72 @@
+package org.apache.ode.daohib.bpel.hobj;
+
+/**
+ * @hibernate.class table="BPEL_RES_ROUTE"
+ */
+public class HResourceRoute extends HObject {
+
+ private String _url;
+ private String _method;
+ private String _channelId;
+ private int _index;
+ private HProcessInstance _instance;
+
+ public HResourceRoute() {
+ super();
+ }
+
+ /**
+ * @hibernate.property column="URL" not-null="true" size="255"
+ */
+ public String getUrl() {
+ return _url;
+ }
+
+ public void setUrl(String url) {
+ _url = url;
+ }
+
+ /**
+ * @hibernate.property column="METHOD" not-null="true" size="8"
+ */
+ public String getMethod() {
+ return _method;
+ }
+
+ public void setMethod(String method) {
+ _method = method;
+ }
+
+ /**
+ * @hibernate.property column="CHANNEL" not-null="true"
+ */
+ public String getChannelId() {
+ return _channelId;
+ }
+
+ public void setChannelId(String channelId) {
+ _channelId = channelId;
+ }
+
+ /**
+ * @hibernate.property column="INDEX" not-null="true"
+ */
+ public int getIndex() {
+ return _index;
+ }
+
+ public void setIndex(int index) {
+ _index = index;
+ }
+
+ /**
+ * @hibernate.many-to-one column="INSTANCE"
+ */
+ public HProcessInstance getInstance() {
+ return _instance;
+ }
+
+ public void setInstance(HProcessInstance instance) {
+ _instance = instance;
+ }
+}
Modified: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Tue Oct 28 17:21:13 2008
@@ -81,7 +81,10 @@
private long _sequence;
@Basic @Column(name="DATE_CREATED")
private Date _dateCreated = new Date();
-
+
+ @Basic @Column(name="INSTANTIATE_URL", length=255)
+ private String _instantiatingUrl;
+
@Basic @Column(name="EXEC_STATE_COUNTER")
private int _execStateCounter;
@@ -146,7 +149,19 @@
}
- public void finishCompletion() {
+ public String getInstantiatingUrl() {
+ return _instantiatingUrl;
+ }
+
+ public void setInstantiatingUrl(String url) {
+ _instantiatingUrl = url;
+ }
+
+ public void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
+ new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx, this);
+ }
+
+ public void finishCompletion() {
// make sure we have completed.
assert (ProcessState.isFinished(this.getState()));
// let our process know that we've done our work.
Added: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java?rev=708743&view=auto
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java (added)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java Tue Oct 28 17:21:13 2008
@@ -0,0 +1,81 @@
+package org.apache.ode.dao.jpa;
+
+import org.apache.ode.bpel.dao.ResourceRouteDAO;
+
+import javax.persistence.*;
+
+public class ResourceRouteDAOImpl extends OpenJPADAO implements ResourceRouteDAO {
+
+ @Id @Column(name="ID")
+ @GeneratedValue(strategy= GenerationType.AUTO)
+ private Long _id;
+ @Basic @Column(name="URL", length=255)
+ private String _url;
+ @Basic @Column(name="METHOD", length=8)
+ private String _method;
+ @Basic @Column(name="CHANNEL")
+ private String _channelId;
+ @Basic @Column(name="ROUTE_INDEX")
+ private int _index;
+
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="INSTANCE_ID")
+ private ProcessInstanceDAOImpl _instance;
+
+ public ResourceRouteDAOImpl() { }
+
+ public ResourceRouteDAOImpl(String url, String method, String channelId, int index, ProcessInstanceDAOImpl instance) {
+ _url = url;
+ _channelId = channelId;
+ _index = index;
+ _instance = instance;
+ _method = method;
+ }
+
+ public Long getId() {
+ return _id;
+ }
+
+ public void setId(Long id) {
+ _id = id;
+ }
+
+ public String getUrl() {
+ return _url;
+ }
+
+ public void setUrl(String url) {
+ _url = url;
+ }
+
+ public String getChannelId() {
+ return _channelId;
+ }
+
+ public void setChannelId(String channelId) {
+ _channelId = channelId;
+ }
+
+ public int getIndex() {
+ return _index;
+ }
+
+ public void setIndex(int index) {
+ _index = index;
+ }
+
+ public String getMethod() {
+ return _method;
+ }
+
+ public void setMethod(String method) {
+ _method = method;
+ }
+
+ public ProcessInstanceDAOImpl getInstance() {
+ return _instance;
+ }
+
+ public void setInstance(ProcessInstanceDAOImpl instance) {
+ _instance = instance;
+ }
+}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Oct 28 17:21:13 2008
@@ -272,6 +272,28 @@
}
}
+ public void checkResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
+ // check if this is first pick
+ if (_dao.getState() == ProcessState.STATE_NEW) {
+ // send event
+ ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
+ evt.setOldState(ProcessState.STATE_NEW);
+ _dao.setState(ProcessState.STATE_READY);
+ evt.setNewState(ProcessState.STATE_READY);
+ sendEvent(evt);
+ }
+
+ if (_instantiatingMessageExchange != null && method.equals("POST") && _dao.getState() == ProcessState.STATE_READY)
+ injectMyRoleMessageExchange(pickResponseChannel, selectorIdx, _instantiatingMessageExchange);
+ else {
+ _dao.createResourceRoute(url, method, pickResponseChannel, selectorIdx);
+ Resource res = new Resource(url, "application/xml", method);
+ _bpelProcess._contexts.bindingContext.activateProvidedResource(res);
+ }
+
+ // TODO schedule a matcher to see if the message arrived already
+ }
+
public CorrelationKey readCorrelation(CorrelationSet cset) {
ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId());
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName());
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Tue Oct 28 17:21:13 2008
@@ -71,6 +71,7 @@
}
ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
+ newInstance.setInstantiatingUrl(mexdao.getResource());
// send process instance event
NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
@@ -123,6 +124,10 @@
return null;
}
+ public Resource getInstantiatingUrl(ProcessInstanceDAO instanceDao) {
+ return getResource(instanceDao.getInstantiatingUrl());
+ }
+
protected boolean isInstantiating(Resource res) {
for (Map.Entry<ResourceModel, String> resourceModel : _staticResources.entrySet()) {
if (resourceModel.getValue().equals(res.getUrl())
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java Tue Oct 28 17:21:13 2008
@@ -271,16 +271,8 @@
private void doHydrate() {
markused();
- _processModel = _pconf.getProcessModel();
- if (_processModel == null) {
- try {
- _processModel = deserializeCompiledProcess(_pconf.getCBPInputStream());
- } catch (Exception e) {
- String errmsg = "Error reloading compiled process " + _pconf.getProcessId() + "; the file appears to be corrupted.";
- __log.error(errmsg);
- throw new BpelEngineException(errmsg, e);
- }
- }
+ readModel();
+
_runtime = buildRuntime(_processModel.getModelVersion());
_runtime.init(_pconf, _processModel);
@@ -320,6 +312,23 @@
bounceProcessDAO();
}
+ private void readModel() {
+ try {
+ _processModel = _pconf.getProcessModel();
+ } catch (Exception e) {
+ // Swallow, was just trying
+ }
+ if (_processModel == null) {
+ try {
+ _processModel = deserializeCompiledProcess(_pconf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = "Error reloading compiled process " + _pconf.getProcessId() + "; the file appears to be corrupted.";
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg, e);
+ }
+ }
+ }
+
}
/**
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Tue Oct 28 17:21:13 2008
@@ -19,15 +19,7 @@
package org.apache.ode.bpel.memdao;
import org.apache.ode.bpel.common.ProcessState;
-import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.CorrelationSetDAO;
-import org.apache.ode.bpel.dao.CorrelatorDAO;
-import org.apache.ode.bpel.dao.FaultDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.dao.ScopeDAO;
-import org.apache.ode.bpel.dao.XmlDataDAO;
+import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.utils.QNameUtils;
import org.w3c.dom.Element;
@@ -51,44 +43,27 @@
private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList();
private short _previousState;
-
private short _state;
-
private Long _instanceId;
-
private ProcessDaoImpl _processDao;
-
private Object _soup;
-
private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
-
private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, List<ScopeDAO>>();
-
private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>();
-
+ private Map<String,ResourceRouteDAO> _resourceRoutes = new HashMap<String,ResourceRouteDAO>();
private ScopeDAO _rootScope;
-
private FaultDAO _fault;
-
private CorrelatorDAO _instantiatingCorrelator;
-
private BpelDAOConnection _conn;
-
private int _failureCount;
-
private Date _failureDateTime;
-
private Map<String, ActivityRecoveryDAO> _activityRecoveries = new HashMap<String, ActivityRecoveryDAO>();
-
// TODO: Remove this, we should be using the main event store...
private List<ProcessInstanceEvent> _events = new ArrayList<ProcessInstanceEvent>();
-
private Date _lastActive;
-
+ private String _instantiatingUrl;
private int _seq;
-
private byte[] _execState;
-
private int _execStateCount;
ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, CorrelatorDAO correlator) {
@@ -257,6 +232,18 @@
return _instantiatingCorrelator;
}
+ public String getInstantiatingUrl() {
+ return _instantiatingUrl;
+ }
+
+ public void setInstantiatingUrl(String url) {
+ _instantiatingUrl = url;
+ }
+
+ public void createResourceRoute(String url, String method, String pickResponseChannel, int selectorIdx) {
+ _resourceRoutes.put(url, new ResourceRouteDAOImpl(url, method, pickResponseChannel, selectorIdx));
+ }
+
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
*/
Added: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java?rev=708743&view=auto
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java (added)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/ResourceRouteDAOImpl.java Tue Oct 28 17:21:13 2008
@@ -0,0 +1,51 @@
+package org.apache.ode.bpel.memdao;
+
+import org.apache.ode.bpel.dao.ResourceRouteDAO;
+
+public class ResourceRouteDAOImpl extends DaoBaseImpl implements ResourceRouteDAO {
+ private Long _id;
+ private String url;
+ private String method;
+ private String pickResponseChannel;
+ private int selectorIdx;
+
+ public ResourceRouteDAOImpl(String url, String method, String pickResponseChannel, int selectorIdx) {
+ _id = IdGen.newProcessId();
+ this.url = url;
+ this.method = method;
+ this.pickResponseChannel = pickResponseChannel;
+ this.selectorIdx = selectorIdx;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public void setMethod(String method) {
+ this.method = method;
+ }
+
+ public String getPickResponseChannel() {
+ return pickResponseChannel;
+ }
+
+ public void setPickResponseChannel(String pickResponseChannel) {
+ this.pickResponseChannel = pickResponseChannel;
+ }
+
+ public int getSelectorIdx() {
+ return selectorIdx;
+ }
+
+ public void setSelectorIdx(int selectorIdx) {
+ this.selectorIdx = selectorIdx;
+ }
+}
Modified: ode/branches/restful/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original)
+++ ode/branches/restful/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Tue Oct 28 17:21:13 2008
@@ -278,10 +278,9 @@
}
private class JacobThreadImpl implements Runnable, JacobThread {
- private final JacobObject _methodBody;
+ private final JacobObject _methodBody;
private final Object[] _args;
-
private final Method _method;
/** Text string identifying the left side of the reduction (for debug). */
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java Tue Oct 28 17:21:13 2008
@@ -74,6 +74,8 @@
void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
throws FaultException;
+ void checkResourceRoute(String url, String method, String mexRef, PickResponseChannel pickResponseChannel, int selectorIdx);
+
CorrelationKey readCorrelation(CorrelationSetInstance cset);
ExpressionLanguageRuntimeRegistry getExpLangRuntime();
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OutstandingRequestManager.java Tue Oct 28 17:21:13 2008
@@ -49,6 +49,9 @@
private final Map<RequestIdTuple, Entry> _byRid = new HashMap<RequestIdTuple, Entry>();
private final Map<String, Entry> _byChannel = new HashMap<String, Entry>();
+ private final Map<RequestResTuple, RestEntry> _byRestRid = new HashMap<RequestResTuple, RestEntry>();
+ private final Map<String, RestEntry> _byRestChannel = new HashMap<String, RestEntry>();
+
int findConflict(Selector selectors[]) {
if (__log.isTraceEnabled()) {
__log.trace(ObjectPrinter.stringifyMethodEnter("findConflict", new Object[] { "selectors", selectors}) );
@@ -75,12 +78,9 @@
* @param selectors selectors for this receive/pick
*/
void register(String pickResponseChannel, Selector selectors[]) {
- if (__log.isTraceEnabled()) {
+ if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("register", new Object[] {
- "pickResponseChannel", pickResponseChannel,
- "selectors", selectors
- }) );
- }
+ "pickResponseChannel", pickResponseChannel, "selectors", selectors }) );
if (_byChannel.containsKey(pickResponseChannel)) {
String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
@@ -106,6 +106,27 @@
_byChannel.put(pickResponseChannel, entry);
}
+ void register(String pickResponseChannel, String url, String method, String mexRef) {
+ if (__log.isTraceEnabled())
+ __log.trace(ObjectPrinter.stringifyMethodEnter("register", new Object[] {
+ "pickResponseChannel", pickResponseChannel}) );
+
+ if (_byChannel.containsKey(pickResponseChannel)) {
+ String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalArgumentException(errmsg);
+ }
+
+ final RequestResTuple rid = new RequestResTuple(url, method, mexRef);
+ if (_byRid.containsKey(rid)) {
+ String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RID " + rid;
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+ _byRestRid.put(rid, new RestEntry(pickResponseChannel));
+ _byRestChannel.put(pickResponseChannel, new RestEntry(pickResponseChannel));
+ }
+
/**
* Cancel a previous registration.
* @see #register(String, Selector[])
@@ -114,13 +135,14 @@
void cancel(String pickResponseChannel) {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("cancel", new Object[] {
- "pickResponseChannel", pickResponseChannel
- }) );
+ "pickResponseChannel", pickResponseChannel }) );
Entry entry = _byChannel.remove(pickResponseChannel);
- if (entry != null) {
+ if (entry != null)
_byRid.values().remove(entry);
- }
+ RestEntry restEntry = _byRestChannel.remove(pickResponseChannel);
+ if (restEntry != null)
+ _byRestRid.values().remove(restEntry);
}
/**
@@ -138,18 +160,27 @@
Entry entry = _byChannel.get(pickResponseChannel);
if (entry == null) {
- String errmsg = "INTERNAL ERROR: No ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
- __log.fatal(errmsg);
- throw new IllegalArgumentException(errmsg);
- }
-
- if (entry.mexRef != null) {
- String errmsg = "INTERNAL ERROR: Duplicate ASSOCIATION for CHANEL " + pickResponseChannel;
- __log.fatal(errmsg);
- throw new IllegalStateException(errmsg);
+ RestEntry restEntry = _byRestChannel.get(pickResponseChannel);
+ if (restEntry == null) {
+ String errmsg = "INTERNAL ERROR: No ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalArgumentException(errmsg);
+ } else {
+ if (restEntry.mexRef != null) {
+ String errmsg = "INTERNAL ERROR: Duplicate ASSOCIATION for CHANEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+ restEntry.mexRef = mexRef;
+ }
+ } else {
+ if (entry.mexRef != null) {
+ String errmsg = "INTERNAL ERROR: Duplicate ASSOCIATION for CHANEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+ entry.mexRef = mexRef;
}
-
- entry.mexRef = mexRef;
}
/**
@@ -181,6 +212,24 @@
return entry.mexRef;
}
+ public String release(String url, String method, String mexId) {
+ if (__log.isTraceEnabled())
+ __log.trace(ObjectPrinter.stringifyMethodEnter("release", new Object[] {
+ "url", url, "method", method, "mexId", mexId }) );
+
+ final RequestResTuple rid = new RequestResTuple(url, method, mexId);
+ RestEntry entry = _byRestRid.get(rid);
+ if (entry == null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("==release: RID " + rid + " not found in " + _byRestRid);
+ }
+ return null;
+ }
+ while(_byRestChannel.values().remove(entry));
+ while(_byRestRid.values().remove(entry));
+ return entry.mexRef;
+ }
+
/**
* "Release" all outstanding incoming messages exchanges. Makes the object forget about
* the previous registrations
@@ -248,6 +297,36 @@
}
}
+ private class RequestResTuple implements Serializable {
+ private static final long serialVersionUID = -1059359612839777482L;
+ /** Name of the operation. */
+ String url;
+ /** Message exchange identifier. */
+ String method;
+ /** Message exchange identifier. */
+ String mexId;
+
+ /** Constructor. */
+ private RequestResTuple(String url, String method, String mexId) {
+ this.url = url;
+ this.method = method;
+ this.mexId = mexId;
+ }
+
+ public int hashCode() {
+ return this.url.hashCode() ^ this.method.hashCode() ^ this.mexId.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ RequestResTuple other = (RequestResTuple) obj;
+ return other.url.equals(url) && other.method.equals(method) && other.mexId.equals(mexId);
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] {"url", url, "method", method, "mexId", mexId});
+ }
+ }
+
private class Entry implements Serializable {
private static final long serialVersionUID = -583743124656582887L;
final String pickResponseChannel;
@@ -267,4 +346,21 @@
});
}
}
+
+ private class RestEntry implements Serializable {
+ private static final long serialVersionUID = -583733124656582887L;
+ final String pickResponseChannel;
+ String mexRef;
+
+ private RestEntry(String pickResponseChannel) {
+ this.pickResponseChannel = pickResponseChannel;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] {
+ "pickResponseChannel", pickResponseChannel,
+ "mexRef", mexRef
+ });
+ }
+ }
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/PICK.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/PICK.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/PICK.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/PICK.java Tue Oct 28 17:21:13 2008
@@ -35,6 +35,7 @@
import org.apache.ode.bpel.rtrep.v2.channels.TerminationChannelListener;
import org.apache.ode.bpel.rapi.InvalidProcessException;
import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.GUID;
import org.apache.ode.utils.xsd.Duration;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
import org.apache.ode.bpel.iapi.BpelEngineException;
@@ -69,39 +70,7 @@
Selector[] selectors;
try {
- selectors = new Selector[_opick.onMessages.size()];
- int idx = 0;
- for (OPickReceive.OnMessage onMessage : _opick.onMessages) {
- CorrelationKey key = null; // this will be the case for the
- // createInstance activity
-
- PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(onMessage.partnerLink);
- if (onMessage.matchCorrelation == null && !_opick.createInstanceFlag) {
- // Adding a route for opaque correlation. In this case,
- // correlation is on "out-of-band" session-id
- String sessionId = getBpelRuntime().fetchMySessionId(pLinkInstance);
- key = new CorrelationKey(-1, new String[] { sessionId });
- } else if (onMessage.matchCorrelation != null) {
- if (!getBpelRuntime().isCorrelationInitialized(
- _scopeFrame.resolve(onMessage.matchCorrelation))) {
- // the following should really test if this is a "join"
- // type correlation...
- if (!_opick.createInstanceFlag)
- throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation,
- "Correlation not initialized.");
- } else {
-
- key = getBpelRuntime().readCorrelation(_scopeFrame.resolve(onMessage.matchCorrelation));
-
- assert key != null;
- }
- }
-
- selectors[idx] = new Selector(idx, pLinkInstance, onMessage.operation.getName(), onMessage.operation
- .getOutput() == null, onMessage.messageExchangeId, key);
- idx++;
- }
-
+ // Pick onAlarm
timeout = null;
for (OPickReceive.OnAlarm onAlarm : _opick.onAlarms) {
Date dt = onAlarm.forExpr != null ? offsetFromNow(getBpelRuntime().getExpLangRuntime()
@@ -112,7 +81,47 @@
_alarm = onAlarm;
}
}
- getBpelRuntime().select(pickResponseChannel, timeout, _opick.createInstanceFlag, selectors);
+
+ // Pick onMessages (identical to a receive)
+ selectors = new Selector[_opick.onMessages.size()];
+ int idx = 0;
+ for (OPickReceive.OnMessage onMessage : _opick.onMessages) {
+ if (onMessage.isRestful()) {
+ // TODO here we should resolved a resource url value that's been previously instantiated by a scope
+ String url = getBpelRuntime().getExpLangRuntime()
+ .evaluateAsString(onMessage.resource.getSubpath(), getEvaluationContext());
+ url = url + "/" + new GUID().toString();
+ getBpelRuntime().checkResourceRoute(url, onMessage.resource.getMethod(),
+ onMessage.messageExchangeId, pickResponseChannel, idx);
+ } else {
+ CorrelationKey key = null;
+ PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(onMessage.partnerLink);
+ if (onMessage.matchCorrelation == null && !_opick.createInstanceFlag) {
+ // Adding a route for opaque correlation. In this case,
+ // correlation is on "out-of-band" session-id
+ String sessionId = getBpelRuntime().fetchMySessionId(pLinkInstance);
+ key = new CorrelationKey(-1, new String[] { sessionId });
+ } else if (onMessage.matchCorrelation != null) {
+ if (!getBpelRuntime().isCorrelationInitialized(
+ _scopeFrame.resolve(onMessage.matchCorrelation))) {
+ // the following should really test if this is a "join" type correlation...
+ if (!_opick.createInstanceFlag)
+ throw new FaultException(_opick.getOwner().constants.qnCorrelationViolation,
+ "Correlation not initialized.");
+ } else {
+ key = getBpelRuntime().readCorrelation(_scopeFrame.resolve(onMessage.matchCorrelation));
+ assert key != null;
+ }
+ }
+
+ selectors[idx] = new Selector(idx, pLinkInstance, onMessage.operation.getName(), onMessage.operation
+ .getOutput() == null, onMessage.messageExchangeId, key);
+ idx++;
+ }
+ }
+
+ if (selectors[0] != null)
+ getBpelRuntime().select(pickResponseChannel, timeout, _opick.createInstanceFlag, selectors);
} catch (FaultException e) {
__log.error(e);
FaultData fault = createFault(e.getQName(), _opick, e.getMessage());
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java Tue Oct 28 17:21:13 2008
@@ -105,7 +105,6 @@
public void initializePartnerLinks(Long parentScopeId, Collection<OPartnerLink> partnerLinks) {
_brc.initializePartnerLinks(parentScopeId, partnerLinks);
-
}
public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
@@ -122,6 +121,12 @@
_brc.select(pickResponseChannelStr, timeout, selectors);
}
+ public void checkResourceRoute(String url, String method, String mexRef, PickResponseChannel pickResponseChannel, int selectorIdx) {
+ final String pickResponseChannelStr = pickResponseChannel.export();
+ getORM().register(pickResponseChannelStr, url, method, mexRef);
+ _brc.checkResourceRoute(url, method, pickResponseChannelStr, selectorIdx);
+ }
+
public CorrelationKey readCorrelation(CorrelationSetInstance cset) {
return _brc.readCorrelation(cset);
}
Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java?rev=708743&r1=708742&r2=708743&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java Tue Oct 28 17:21:13 2008
@@ -112,6 +112,10 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkResourceRoute(String url, String method, PickResponseChannel pickResponseChannel, int selectorIdx) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void initializePartnerLinks(Long parentScopeId, Collection<OPartnerLink> partnerLinks) {
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -128,6 +132,10 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkResourceRoute(String url, String method, String mexRef, PickResponseChannel pickResponseChannel, int selectorIdx) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public CorrelationKey readCorrelation(CorrelationSetInstance cset) {
return null; //To change body of implemented methods use File | Settings | File Templates.
}