You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/10/24 02:50:25 UTC
svn commit: r707516 - in /ode/branches/restful:
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-api/src/main/java/org/apache/ode/bpel/rapi/
bpel-dao/src/main/java/org/apache/ode/bpel/dao/
dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ da...
Author: mriou
Date: Thu Oct 23 17:50:25 2008
New Revision: 707516
URL: http://svn.apache.org/viewvc?rev=707516&view=rev
Log:
A restful process can get activated and requests registration of its instantiating resources to the IL. It can then get called, route and start a new instance. The rest is still to do.
Added:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
Removed:
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERestfulProcess.java
Modified:
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRuntime.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/ResourceModel.java
ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeImpl.java
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java Thu Oct 23 17:50:25 2008
@@ -1,5 +1,7 @@
package org.apache.ode.bpel.iapi;
+import java.util.concurrent.TimeoutException;
+
/**
* Message exchange used for a web-service based interaction between the integration layer and the
* engine. Adds resource information.
@@ -7,4 +9,8 @@
public interface RESTMessageExchange extends MessageExchange {
Resource getResource();
+
+ Status invokeBlocking() throws BpelEngineException, TimeoutException;
+
+ void setRequest(Message message);
}
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRuntime.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRuntime.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRuntime.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/OdeRuntime.java Thu Oct 23 17:50:25 2008
@@ -24,7 +24,7 @@
String extractMatch(Element msgData, PropertyExtractor extractor) throws FaultException;
- String extractAddress(ResourceModel resource);
+ String extractAddress(ResourceModel resource) throws FaultException;
void setExtensionRegistry(Map<String, ExtensionBundleRuntime> extensionRegistry);
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/ResourceModel.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/ResourceModel.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/ResourceModel.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/ResourceModel.java Thu Oct 23 17:50:25 2008
@@ -12,4 +12,6 @@
ResourceModel getReference();
String getMethod();
+
+ boolean isInstantiateResource();
}
Modified: ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original)
+++ ode/branches/restful/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Thu Oct 23 17:50:25 2008
@@ -24,6 +24,7 @@
import javax.xml.namespace.QName;
import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Resource;
import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
@@ -176,6 +177,10 @@
Element getEPR();
+ String getResource();
+
+ void setResource(String resourceStr);
+
MessageExchangePattern getPattern();
/**
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java Thu Oct 23 17:50:25 2008
@@ -151,7 +151,7 @@
}
public void setOperation(String opname) {
- entering("MessageExchangeDaoImpl.setOperation");
+ entering("MessageExchangeDaoImpl.setOperation");
_hself.setOperationName(opname);
update();
}
@@ -387,4 +387,13 @@
_hself.setPipedPID(pipedPid == null ? null : pipedPid.toString());
}
+
+ public void setResource(String resource) {
+ _hself.setResource(resource);
+ }
+
+ public String getResource() {
+ return _hself.getResource();
+ }
+
}
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java Thu Oct 23 17:50:25 2008
@@ -51,6 +51,8 @@
private HPartnerLink _partnerLink;
+ private String _resource;
+
private String _clientKey;
private HProcessInstance _instance;
@@ -420,4 +422,15 @@
public void setPipedPID(String ppid) {
_pipedPid = ppid;
}
+
+ /**
+ * @hibernate.property column="RESOURCE" length="255"
+ */
+ public String getResource() {
+ return _resource;
+ }
+
+ public void setResource(String resource) {
+ this._resource = resource;
+ }
}
Modified: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Thu Oct 23 17:50:25 2008
@@ -74,6 +74,8 @@
private char _direction;
@Lob @Column(name="EPR")
private String _epr;
+ @Basic @Column(name="RESOURCE", length=255)
+ private String _resource;
@Transient private
Element _eprElement;
@Basic @Column(name="FAULT")
@@ -403,4 +405,12 @@
public void setPipedPID(QName pipedPid) {
_pipedPid = pipedPid == null ? null : pipedPid.toString();
}
+
+ public String getResource() {
+ return _resource;
+ }
+
+ public void setResource(String resourceStr) {
+ _resource = resourceStr;
+ }
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java Thu Oct 23 17:50:25 2008
@@ -77,7 +77,6 @@
* forwarded from {@link Callable#call()}
*/
<T> T execInCurrentThread(Callable<T> callable) throws Exception {
-
// Allow recursive invocations. This allows us to nest P2P invocations to an arbitrary depth.
if (isWorkerThread())
return doInstanceWork(callable);
@@ -103,13 +102,11 @@
throw new BpelEngineException("Thread interrupted.", ex);
}
-
try {
return doInstanceWork(callable);
} finally {
finished.release();
}
-
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu Oct 23 17:50:25 2008
@@ -686,7 +686,7 @@
event.setProcessId(_dao.getProcess().getProcessId());
event.setProcessName(_dao.getProcess().getType());
event.setProcessInstanceId(_dao.getInstanceId());
- _bpelProcess._debugger.onEvent(event);
+ if (_bpelProcess._debugger != null) _bpelProcess._debugger.onEvent(event);
//filter events
List<String> scopeNames = null;
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Thu Oct 23 17:50:25 2008
@@ -95,7 +95,7 @@
/** Mapping from myrole service name to active process. */
private final HashMap<QName, List<ODEProcess>> _wsServiceMap = new HashMap<QName, List<ODEProcess>>();
- private final HashMap<String, ODEProcess> _restServiceMap = new HashMap<String, ODEProcess>();
+ private final HashMap<String, ODERESTProcess> _restServiceMap = new HashMap<String, ODERESTProcess>();
/** Weak-reference cache of all the my-role message exchange objects. */
private final MyRoleMessageExchangeCache _myRoleMexCache = new MyRoleMessageExchangeCache();
@@ -345,8 +345,8 @@
ODEProcess process;
if (conf.isRestful()) {
- ODERestfulProcess restProcess = new ODERestfulProcess(this, conf, null);
- for (String resUrl : restProcess.getResourceUrls()) {
+ ODERESTProcess restProcess = new ODERESTProcess(this, conf, null);
+ for (String resUrl : restProcess.getInitialResourceUrls()) {
_restServiceMap.put(resUrl, restProcess);
}
process = restProcess;
@@ -374,7 +374,7 @@
process = wsProcess;
}
- process.activate(_contexts);
+ process.activate();
_registeredProcesses.put(process.getPID(), process);
if (_dehydrationPolicy == null) process.hydrate();
@@ -610,7 +610,10 @@
_mngmtLock.readLock().lock();
try {
// Do stuff
- return null;
+ ODERESTProcess target = _restServiceMap.get(resource.getUrl());
+ if (target == null) throw new BpelEngineException("NoSuchResource: " + resource.getUrl());
+ assertNoTransaction();
+ return target.createRESTMessageExchange(resource, foreignKey);
} finally {
_mngmtLock.readLock().unlock();
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Thu Oct 23 17:50:25 2008
@@ -22,7 +22,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.*;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
@@ -155,8 +155,8 @@
}
void save(MessageExchangeDAO dao) {
- dao.setPartnerLinkModelId(_oplink.getId());
- dao.setOperation(_operation.getName());
+ if (_oplink != null) dao.setPartnerLinkModelId(_oplink.getId());
+ if (_operation != null) dao.setOperation(_operation.getName());
dao.setStatus(_status);
dao.setInvocationStyle(getInvocationStyle());
dao.setFault(_fault);
@@ -405,5 +405,51 @@
void sync() {
++_syncdummy;
}
+
+ protected static class ResponseFuture implements Future<Status> {
+ private Status _status;
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ public Status get() throws InterruptedException, ExecutionException {
+ try {
+ return get(0, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // If it's thrown it's definitely a bug
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ synchronized (this) {
+ if (_status != null)
+ return _status;
+
+ this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+
+ if (_status == null) throw new TimeoutException();
+ return _status;
+ }
+ }
+
+ public boolean isCancelled() {
+ return false;
+ }
+
+ public boolean isDone() {
+ return _status != null;
+ }
+
+ void done(Status status) {
+ synchronized (this) {
+ _status = status;
+ this.notifyAll();
+ }
+ }
+ }
+
+
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Thu Oct 23 17:50:25 2008
@@ -103,7 +103,7 @@
_invocationStyles = Collections.unmodifiableSet(istyles);
}
- abstract void activate(Contexts contexts);
+ abstract void activate();
abstract void deactivate();
abstract void hydrate();
abstract void dehydrate();
@@ -497,11 +497,34 @@
_lastUsed = System.currentTimeMillis();
}
+ protected void bounceProcessDAO() {
+ if (isInMemory()) {
+ doBounce(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _processModel);
+ } else if (_contexts.isTransacted()) {
+ // If we have a transaction, we do this in the current transaction.
+ doBounce(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _processModel);
+ } else {
+ // If we do not have a transaction we need to create one.
+ try {
+ _contexts.execTransaction(new Callable<Object>() {
+ public Object call() throws Exception {
+ doBounce(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _processModel);
+ return null;
+ }
+ });
+ } catch (Exception ex) {
+ String errmsg = "DbError";
+ __log.error(errmsg, ex);
+ throw new BpelEngineException(errmsg, ex);
+ }
+ }
+ }
+
/**
* If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
* exists and matches the GUID.
*/
- protected void bounceProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final ProcessModel mprocess) {
+ protected void doBounce(BpelDAOConnection conn, final QName pid, final long version, final ProcessModel mprocess) {
__log.debug("Creating process DAO for " + pid + " (guid=" + mprocess.getGuid() + ")");
try {
boolean create = true;
Added: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=707516&view=auto
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (added)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Thu Oct 23 17:50:25 2008
@@ -0,0 +1,134 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.rapi.ResourceModel;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
+import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Callable;
+
+public class ODERESTProcess extends ODEProcess {
+
+ private ConcurrentHashMap<ResourceModel,String> _staticResources = new ConcurrentHashMap<ResourceModel,String>();
+
+ private ArrayList<Resource> _resources = new ArrayList<Resource>();
+
+ public ODERESTProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
+ super(server, conf, debugger);
+ _processModel = conf.getProcessModel();
+ _runtime = buildRuntime(_processModel.getModelVersion());
+ _runtime.init(_pconf, _processModel);
+ }
+
+ public Collection<String> getInitialResourceUrls() {
+ if (_staticResources.size() > 0 ) return _staticResources.values();
+
+ ArrayList<String> addresses = new ArrayList<String>();
+ for (ResourceModel resourceModel : _processModel.getProvidedResources()) {
+ try {
+ String addr = _runtime.extractAddress(resourceModel);
+ addresses.add(addr);
+ _staticResources.put(resourceModel, addr);
+ } catch (FaultException e) {
+ throw new BpelEngineException(e);
+ }
+ }
+ return addresses;
+ }
+
+ void activate() {
+ bounceProcessDAO();
+
+ for (ResourceModel resourceModel : _staticResources.keySet()) {
+ Resource resource = new Resource(_staticResources.get(resourceModel),
+ "application/xml", resourceModel.getMethod());
+ _contexts.bindingContext.activateProvidedResource(resource);
+ _resources.add(resource);
+ }
+ }
+
+ void deactivate() {
+ for (ResourceModel resourceModel : _staticResources.keySet()) {
+ Resource resource = new Resource(_staticResources.get(resourceModel),
+ "application/xml", resourceModel.getMethod());
+ _contexts.bindingContext.deactivateProvidedResource(resource);
+ }
+ }
+
+ void invokeProcess(final MessageExchangeDAO mexdao) {
+ Resource msgResource = getResource(mexdao.getResource());
+ mexdao.setProcess(getProcessDAO());
+
+ if (_pconf.getState() == ProcessState.RETIRED) {
+ throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+ }
+
+ ProcessInstanceDAO newInstance = getProcessDAO().createInstance(null);
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(getProcessModel().getQName(),
+ getProcessDAO().getProcessId(), newInstance.getInstanceId());
+ evt.setMexId(mexdao.getMessageExchangeId());
+ saveEvent(evt, newInstance);
+
+ mexdao.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+ mexdao.setInstance(newInstance);
+
+ if (isInstantiating(msgResource)) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+ public Void call() {
+ executeCreateInstance(mexdao);
+ return null;
+ }
+ });
+ } else {
+ throw new UnsupportedOperationException("not yet");
+ }
+ }
+
+ // Restful processes don't lazy load their OModel, they need it right away to access the instantiating resource
+ protected void latch(int s) { }
+ protected void releaseLatch(int s) { }
+ protected boolean isLatched(int s) { return false; }
+ protected void hydrate() { }
+ protected void dehydrate() { }
+
+ public RESTMessageExchange createRESTMessageExchange(Resource resource, String clientKey) {
+ // TODO check the resource matches a provided one
+ return new RESTMessageExchangeImpl(this, clientKey, resource);
+ }
+
+ public Resource getResource(String url, String method) {
+ for (Resource resource : _resources) {
+ if (resource.getUrl().equals(url) && resource.getMethod().equals(method)) return resource;
+ }
+ return null;
+ }
+
+ public Resource getResource(String serializedForm) {
+ int sep = serializedForm.indexOf("~");
+ String url = serializedForm.substring(0, sep);
+ String method = serializedForm.substring(sep + 1);
+
+ for (Resource resource : _resources) {
+ if (resource.getUrl().equals(url) && resource.getMethod().equals(method)) return resource;
+ }
+ return null;
+ }
+
+ protected boolean isInstantiating(Resource res) {
+ for (Map.Entry<ResourceModel, String> resourceModel : _staticResources.entrySet()) {
+ if (resourceModel.getValue().equals(res.getUrl())
+ && resourceModel.getKey().getMethod().equals(res.getMethod())
+ && resourceModel.getKey().isInstantiateResource()) return true;
+ }
+ return false;
+ }
+}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java Thu Oct 23 17:50:25 2008
@@ -56,8 +56,7 @@
return null;
}
- void activate(Contexts contexts) {
- _contexts = contexts;
+ void activate() {
_debugger = new DebuggerSupport(this);
__log.debug("Activating endpoints for " + _pid);
@@ -272,12 +271,15 @@
private void doHydrate() {
markused();
- try {
- _processModel = deserializeCompiledProcess(_pconf.getCBPInputStream());
- } catch (Exception e) {
- String errmsg = "Error reloading compiled process " + _pconf.getProcessId() + "; the file appears to be corrupted.";
- __log.error(errmsg);
- throw new BpelEngineException(errmsg, e);
+ _processModel = _pconf.getProcessModel();
+ if (_processModel == null) {
+ try {
+ _processModel = deserializeCompiledProcess(_pconf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = "Error reloading compiled process " + _pconf.getProcessId() + "; the file appears to be corrupted.";
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg, e);
+ }
}
_runtime = buildRuntime(_processModel.getModelVersion());
_runtime.init(_pconf, _processModel);
@@ -315,26 +317,7 @@
}
}
- if (isInMemory()) {
- bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _processModel);
- } else if (_contexts.isTransacted()) {
- // If we have a transaction, we do this in the current transaction.
- bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _processModel);
- } else {
- // If we do not have a transaction we need to create one.
- try {
- _contexts.execTransaction(new Callable<Object>() {
- public Object call() throws Exception {
- bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _processModel);
- return null;
- }
- });
- } catch (Exception ex) {
- String errmsg = "DbError";
- __log.error(errmsg, ex);
- throw new BpelEngineException(errmsg, ex);
- }
- }
+ bounceProcessDAO();
}
}
Added: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java?rev=707516&view=auto
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java (added)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java Thu Oct 23 17:50:25 2008
@@ -0,0 +1,134 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import java.util.concurrent.*;
+
+public class RESTMessageExchangeImpl extends MessageExchangeImpl implements RESTMessageExchange {
+
+ private static final Log __log = LogFactory.getLog(RESTMessageExchangeImpl.class);
+
+ private boolean _done = false;
+ private ResponseFuture _future;
+
+ private Resource _resource;
+
+ public RESTMessageExchangeImpl(ODEProcess process, String mexId, Resource resource) {
+ super(process, null, mexId, null, null, null);
+ _resource = resource;
+ }
+
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+ public Resource getResource() {
+ return _resource;
+ }
+
+ public void setRequest(final Message request) {
+ _request = (MessageImpl) request;
+ _changes.add(Change.REQUEST);
+ }
+
+ public Status invokeBlocking() throws BpelEngineException, TimeoutException {
+ if (_done) return getStatus();
+
+ Future<Status> future = _future != null ? _future : invokeAsync();
+ try {
+ future.get(Math.max(_timeout, 1), TimeUnit.MILLISECONDS);
+ _done = true;
+ return getStatus();
+ } catch (InterruptedException e) {
+ throw new BpelEngineException(e);
+ } catch (ExecutionException e) {
+ throw new BpelEngineException(e.getCause());
+ }
+ }
+
+ public Future<Status> invokeAsync() {
+ if (_future != null) return _future;
+ if (_request == null) throw new IllegalStateException("Must call setRequest(...)!");
+
+ _future = new ResponseFuture();
+ _process.enqueueTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ MessageExchangeDAO dao = doInvoke();
+ if (dao.getStatus() == Status.ACK) {
+ // not really an async ack, same idea.
+ onAsyncAck(dao);
+ }
+ return null;
+ }
+ });
+ return _future;
+ }
+
+ protected MessageExchangeDAO doInvoke() {
+ if (getStatus() != Status.NEW) throw new IllegalStateException("Invalid state: " + getStatus());
+ request();
+
+ MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ save(dao);
+ if (__log.isDebugEnabled()) __log.debug("invoke() EPR= " + _epr + " ==> " + _process);
+ try {
+ _process.invokeProcess(dao);
+ } finally {
+ if (dao.getStatus() == Status.ACK) {
+ _failureType = dao.getFailureType();
+ _fault = dao.getFault();
+ _explanation = dao.getFaultExplanation();
+ ack(dao.getAckType());
+ }
+ }
+ return dao;
+ }
+
+ protected void onAsyncAck(MessageExchangeDAO mexdao) {
+ final MemBackedMessageImpl response;
+ final QName fault = mexdao.getFault();
+ final FailureType failureType = mexdao.getFailureType();
+ final AckType ackType = mexdao.getAckType();
+ final String explanation = mexdao.getFaultExplanation();
+ switch (mexdao.getAckType()) {
+ case RESPONSE:
+ case FAULT:
+ response = new MemBackedMessageImpl(mexdao.getResponse().getHeader(),
+ mexdao.getResponse().getData(), mexdao.getResponse().getType(), false);
+ break;
+ default:
+ response = null;
+ }
+
+ final ResponseFuture f = _future;
+ // Lets be careful, the TX can still rollback!
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ _response = response;
+ _fault = fault;
+ _failureType = failureType;
+ _explanation = explanation;
+
+ ack(ackType);
+ _future.done(Status.ACK);
+
+ }
+ });
+ }
+
+ @Override
+ void save(MessageExchangeDAO dao) {
+ super.save(dao);
+ dao.setResource(_resource.getUrl() + "~" + _resource.getMethod());
+ }
+
+ @Override
+ void load(MessageExchangeDAO dao) {
+ super.load(dao);
+ _resource = ((ODERESTProcess)_process).getResource(dao.getResource());
+ }
+}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Thu Oct 23 17:50:25 2008
@@ -73,50 +73,6 @@
}
}
- private static class ResponseFuture implements Future<Status> {
- private Status _status;
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- public Status get() throws InterruptedException, ExecutionException {
- try {
- return get(0, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- // If it's thrown it's definitely a bug
- throw new RuntimeException(e);
- }
- }
-
- public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- synchronized (this) {
- if (_status != null)
- return _status;
-
- this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
-
- if (_status == null) throw new TimeoutException();
- return _status;
- }
- }
-
- public boolean isCancelled() {
- return false;
- }
-
- public boolean isDone() {
- return _status != null;
- }
-
- void done(Status status) {
- synchronized (this) {
- _status = status;
- this.notifyAll();
- }
- }
- }
-
@Override
protected void onAsyncAck(MessageExchangeDAO mexdao) {
final MemBackedMessageImpl response;
@@ -134,7 +90,6 @@
response = null;
}
- final UnreliableMyRoleMessageExchangeImpl self = this;
final ResponseFuture f = _future;
// Lets be careful, the TX can still rollback!
_process.scheduleRunnable(new Runnable() {
@@ -146,9 +101,7 @@
ack(ackType);
_future.done(Status.ACK);
-
}
-
});
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Thu Oct 23 17:50:25 2008
@@ -53,6 +53,7 @@
MessageExchangePattern pattern;
Element ePR;
String channel;
+ String resource;
QName fault;
String faultExplanation;
String correlationStatus;
@@ -146,7 +147,15 @@
}
- public void setPattern(MessageExchangePattern pattern) {
+ public String getResource() {
+ return resource;
+ }
+
+ public void setResource(String resource) {
+ this.resource = resource;
+ }
+
+ public void setPattern(MessageExchangePattern pattern) {
this.pattern = pattern;
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java Thu Oct 23 17:50:25 2008
@@ -11,6 +11,7 @@
private OExpression subpath;
private OResource reference;
private String method;
+ private boolean instantiateResource;
public OResource(OProcess owner) {
super(owner);
@@ -47,4 +48,12 @@
public void setMethod(String method) {
this.method = method;
}
+
+ public boolean isInstantiateResource() {
+ return instantiateResource;
+ }
+
+ public void setInstantiateResource(boolean instantiateResource) {
+ this.instantiateResource = instantiateResource;
+ }
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeImpl.java?rev=707516&r1=707515&r2=707516&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeImpl.java Thu Oct 23 17:50:25 2008
@@ -133,12 +133,16 @@
return null;
}
- public String extractAddress(ResourceModel resourceModel) {
+ public String extractAddress(ResourceModel resourceModel) throws FaultException {
OResource resource = (OResource) resourceModel;
String rootPath = "";
if (resource.getReference() != null)
- rootPath = extractAddress(resource.getReference());
- return rootPath + "/" + resource.getSubpath();
+ rootPath = extractAddress(resource.getReference()) + "/";
+
+ BoundVariableEvaluationContext ctx = new BoundVariableEvaluationContext();
+ String subpath = ((Text)_expLangRuntimeRegistry.evaluateNode(resource.getSubpath(), ctx)).getWholeText();
+
+ return rootPath + subpath;
}
public String extractMatch(Element msgData, PropertyExtractor extractor) throws FaultException {