You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2009/01/09 02:11:02 UTC
svn commit: r732899 - in /ode/branches/restful:
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-api/src/main/java/org/apache/ode/bpel/rapi/
bpel-test/src/main/java/org/apache/ode/test/ dao-hibernate/src/m...
Author: mriou
Date: Thu Jan 8 17:11:01 2009
New Revision: 732899
URL: http://svn.apache.org/viewvc?rev=732899&view=rev
Log:
Intermediate commit. A bit more toward RESTful (non WSDLesque) invocations.
Added:
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTInMessageExchange.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTOutMessageExchange.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTInMessageExchangeImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTOutMessageExchangeImpl.java
Removed:
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTMessageExchange.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTMessageExchangeImpl.java
Modified:
ode/branches/restful/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java
ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/IOContext.java
ode/branches/restful/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java
ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
ode/branches/restful/engine/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
ode/branches/restful/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/INVOKE.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OInvoke.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
ode/branches/restful/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Modified: ode/branches/restful/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ ode/branches/restful/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Thu Jan 8 17:11:01 2009
@@ -25,14 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.*;
/**
* Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext}
@@ -78,7 +71,9 @@
invokePartnerUnreliable(mex);
}
-
+ public void invokeRestful(RESTOutMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException("No support for RESTful invocations");
+ }
public void onMyRoleMessageExchangeStateChanged(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException {
// Add code here to handle MEXs that we've "forgotten" about due to system failure etc.. mostly
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java Thu Jan 8 17:11:01 2009
@@ -133,7 +133,7 @@
MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName serviceId, String operation,
String foreignKey) throws BpelEngineException;
- RESTMessageExchange createMessageExchange(Resource resource, String foreignKey) throws BpelEngineException;
+ RESTInMessageExchange createMessageExchange(Resource resource, String foreignKey) throws BpelEngineException;
/**
* Retrieve a message identified by the given identifer.
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchangeContext.java Thu Jan 8 17:11:01 2009
@@ -89,6 +89,8 @@
void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException;
+ void invokeRestful(RESTOutMessageExchange mex) throws ContextException;
+
/**
* Cancel an async/reliable partner-role message exchange.
* @param mex message exchange that should be cancelled.
Added: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTInMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTInMessageExchange.java?rev=732899&view=auto
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTInMessageExchange.java (added)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTInMessageExchange.java Thu Jan 8 17:11:01 2009
@@ -0,0 +1,16 @@
+package org.apache.ode.bpel.iapi;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Message exchange used for a web-service based interaction between the integration layer and the
+ * engine. Adds resource information.
+ */
+public interface RESTInMessageExchange extends MessageExchange {
+
+ Resource getResource();
+
+ Status invokeBlocking() throws BpelEngineException, TimeoutException;
+
+ void setRequest(Message message);
+}
Added: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTOutMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTOutMessageExchange.java?rev=732899&view=auto
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTOutMessageExchange.java (added)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/iapi/RESTOutMessageExchange.java Thu Jan 8 17:11:01 2009
@@ -0,0 +1,19 @@
+package org.apache.ode.bpel.iapi;
+
+import org.w3c.dom.Element;
+
+/**
+ * Outgoing RESTful message exchange implemented by the engine and used by IL implementations
+ * that can provide RESTful interactions.
+ */
+public interface RESTOutMessageExchange {
+
+ Resource getTargetResource();
+
+ Message getRequest();
+
+ void reply(Message response) throws BpelEngineException;
+
+ void replyWithFailure(MessageExchange.FailureType type, String description, Element details) throws BpelEngineException;
+
+}
Modified: ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/IOContext.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/IOContext.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/IOContext.java (original)
+++ ode/branches/restful/bpel-api/src/main/java/org/apache/ode/bpel/rapi/IOContext.java Thu Jan 8 17:11:01 2009
@@ -60,6 +60,8 @@
String /* MexId */invoke(String invokeId, PartnerLink partnerLinkInstance, Operation operation, Element outboundMsg)
throws UninitializedPartnerEPR;
+ String invoke(String requestId, org.apache.ode.bpel.iapi.Resource resource, Element outgoingMessage);
+
/**
* Get partner's response to an invoke.
*
Modified: ode/branches/restful/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java (original)
+++ ode/branches/restful/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java Thu Jan 8 17:11:01 2009
@@ -25,15 +25,7 @@
import javax.xml.namespace.QName;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.utils.DOMUtils;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
@@ -150,13 +142,14 @@
public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
}
public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
}
+ public void invokeRestful(RESTOutMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException("No support for RESTful invocations");
+ }
}
Modified: ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java (original)
+++ ode/branches/restful/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java Thu Jan 8 17:11:01 2009
@@ -47,7 +47,6 @@
public class MessageExchangeDaoImpl extends HibernateDao implements MessageExchangeDAO {
private HMessageExchange _hself;
-
// Used when provided process and instance aren't hibernate implementations. The relation
// therefore can't be persisted. Used for in-mem DAOs so that doesn't matter much.
Modified: ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java (original)
+++ ode/branches/restful/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ResourceRouteDAOImpl.java Thu Jan 8 17:11:01 2009
@@ -4,6 +4,8 @@
import javax.persistence.*;
+@Entity
+@Table(name="ODE_RESOURCE_ROUTE")
public class ResourceRouteDAOImpl extends OpenJPADAO implements ResourceRouteDAO {
@Id @Column(name="ID")
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu Jan 8 17:11:01 2009
@@ -523,7 +523,6 @@
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
-
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
we.setProcessId(_bpelProcess.getPID());
@@ -536,7 +535,6 @@
public String invoke(String requestId, PartnerLink partnerLink, Operation operation, Element outgoingMessage)
throws UninitializedPartnerEPR {
- // TODO: think we should move the dao creation into bpelprocess --mbs
MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
mexDao.setStatus(MessageExchange.Status.REQ);
@@ -584,7 +582,7 @@
__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink + ", op=" +
operation.getName() + " channel=" + requestId + ")");
}
- ((ODEWSProcess)_bpelProcess).invokePartner(mexDao);
+ _bpelProcess.invokePartner(mexDao);
// In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
// we need to inject a message on the response channel, so that the process continues.
@@ -602,6 +600,50 @@
return mexDao.getMessageExchangeId();
}
+ public String invoke(String requestId, org.apache.ode.bpel.iapi.Resource resource, Element outgoingMessage) {
+
+ MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
+ MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+ mexDao.setStatus(MessageExchange.Status.REQ);
+ mexDao.setResource(resource.getUrl() + "~" + resource.getMethod());
+ mexDao.setProcess(_dao.getProcess());
+ mexDao.setInstance(_dao);
+ mexDao.setPattern(MessageExchangePattern.REQUEST_RESPONSE);
+ mexDao.setChannel(requestId);
+
+ MessageDAO message = mexDao.createMessage(null);
+ mexDao.setRequest(message);
+ mexDao.setTimeout(30000);
+ message.setData(outgoingMessage);
+
+ // prepare event
+ ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
+ evt.setResource(resource.getUrl() + "~" + resource.getMethod());
+ evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
+ evt.setMexId(mexDao.getMessageExchangeId());
+ sendEvent(evt);
+
+ if (__log.isDebugEnabled())
+ __log.debug("INVOKING PARTNER: resource=" + resource + " channel=" + requestId + ")");
+
+ _bpelProcess.invokePartner(mexDao);
+
+ // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
+ // we need to inject a message on the response channel, so that the process continues.
+ switch (mexDao.getStatus()) {
+ case ACK:
+ if (mexDao.getChannel() != null) injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+ break;
+ case ASYNC:
+ // we'll have to wait for the response.
+ break;
+ default:
+ throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus());
+ }
+
+ return mexDao.getMessageExchangeId();
+ }
+
private void buildOutgoingMessage(MessageDAO message, Element outgoingElmt) {
if (outgoingElmt == null) return;
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Thu Jan 8 17:11:01 2009
@@ -603,7 +603,7 @@
}
}
- public RESTMessageExchange createMessageExchange(final Resource resource, String foreignKey) throws BpelEngineException {
+ public RESTInMessageExchange createMessageExchange(final Resource resource, String foreignKey) throws BpelEngineException {
_mngmtLock.readLock().lock();
try {
ODERESTProcess target = _restServiceMap.get(resource.getUrl());
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Thu Jan 8 17:11:01 2009
@@ -111,6 +111,8 @@
abstract void dehydrate();
abstract void invokeProcess(final MessageExchangeDAO mexdao);
+ abstract void invokePartner(final MessageExchangeDAO mexdao);
+
abstract MessageExchangeImpl recreateIncomingMex(MessageExchangeDAO mexdao);
protected abstract void latch(int s);
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java Thu Jan 8 17:11:01 2009
@@ -128,14 +128,34 @@
// If we did not get an ACK during this method, then mark this MEX as needing an ASYNC wake-up
if (mexdao.getStatus() != MessageExchange.Status.ACK) mexdao.setStatus(MessageExchange.Status.ASYNC);
}
+ }
+ void invokePartner(MessageExchangeDAO mexdao) {
+ // TODO p2p short-circuiting
+ mexdao.setStatus(MessageExchange.Status.REQ);
+ try {
+ String[] resStr = mexdao.getResource().split("~");
+ RESTOutMessageExchangeImpl outMex = new RESTOutMessageExchangeImpl(this,
+ mexdao.getMessageExchangeId(), new Resource(resStr[0], "application/xml", resStr[1]));
+ outMex.request();
+
+ scheduleRunnable(new UnreliableInvoker(outMex));
+
+ // Scheduling a verification to see if the invoke has really been processed. Otherwise
+ // we put it in activity recovery mode (case of a server crash during invocation).
+ scheduleInvokeCheck(mexdao);
+ } finally {
+ if (mexdao.getStatus() != MessageExchange.Status.ACK)
+ mexdao.setStatus(MessageExchange.Status.ASYNC);
+ }
}
+
void onRestMexAck(MessageExchangeDAO mexdao, MessageExchange.Status old, String url) {
if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
p2pCall(mexdao, old);
} else /* not p2p */{
- RESTMessageExchangeImpl mymex = (RESTMessageExchangeImpl) _incomingMexCache.get(mexdao, this);
+ RESTInMessageExchangeImpl mymex = (RESTInMessageExchangeImpl) _incomingMexCache.get(mexdao, this);
mymex.getResource().setUrl(url);
if (old == MessageExchange.Status.ASYNC) {
// Updating url for instantiating mexs so that the created resource url can be returned to the caller
@@ -151,16 +171,16 @@
protected void hydrate() { }
protected void dehydrate() { }
- public RESTMessageExchange createRESTMessageExchange(Resource resource, String clientKey) {
+ public RESTInMessageExchange createRESTMessageExchange(Resource resource, String clientKey) {
// TODO check the resource matches a provided one
- RESTMessageExchangeImpl mex = new RESTMessageExchangeImpl(this, clientKey, resource);
+ RESTInMessageExchangeImpl mex = new RESTInMessageExchangeImpl(this, clientKey, resource);
_incomingMexCache.put(mex);
return mex;
}
MessageExchangeImpl recreateIncomingMex(MessageExchangeDAO mexdao) {
Resource resource = getResource(mexdao.getResource());
- return new RESTMessageExchangeImpl(this, mexdao.getMessageExchangeId(), resource);
+ return new RESTInMessageExchangeImpl(this, mexdao.getMessageExchangeId(), resource);
}
public Resource getResource(String url, String method) {
@@ -193,4 +213,59 @@
}
return false;
}
+
+ class UnreliableInvoker implements Runnable {
+
+ RESTOutMessageExchangeImpl _restMex;
+
+ public UnreliableInvoker(RESTOutMessageExchangeImpl mex) {
+ _restMex = mex;
+ }
+
+ public void run() {
+ Throwable err = null;
+ try {
+ _contexts.mexContext.invokeRestful(_restMex);
+ _restMex.setState(RESTOutMessageExchangeImpl.State.HOLD);
+ } catch (Throwable t) {
+ _restMex.setState(RESTOutMessageExchangeImpl.State.DEAD);
+ err = t;
+ }
+
+ // Handle system failure in a transaction
+ final Throwable ferr = err;
+ if (ferr != null) {
+ enqueueInstanceTransaction(_restMex.getIID(), new Runnable() {
+ public void run() {
+ MessageExchangeDAO mexdao = loadMexDao(_restMex.getMessageExchangeId());
+ MexDaoUtil.setFailed(mexdao, MessageExchange.FailureType.OTHER, ferr.toString());
+ _restMex.setState(RESTOutMessageExchangeImpl.State.DEAD);
+ }
+ } );
+ return;
+ }
+
+ // We proceed handling the response in a transaction. Note that if for some reason the following transaction
+ // fails, the unreliable invoke will be in an "unknown" state, and will require manual intervention to either
+ // retry or force fail.
+ enqueueInstanceTransaction(_restMex.getIID(), new Runnable() {
+ public void run() {
+ MessageExchangeDAO mexdao = loadMexDao(_restMex.getMessageExchangeId());
+ if (_restMex.getStatus() == MessageExchange.Status.ACK) {
+ _restMex.save(mexdao);
+ _restMex.setState(RESTOutMessageExchangeImpl.State.DEAD);
+ } else if (_restMex.getStatus() == MessageExchange.Status.REQ) {
+ MexDaoUtil.setFailed(mexdao, MessageExchange.FailureType.NO_RESPONSE, "No Response");
+ _restMex.setState(RESTOutMessageExchangeImpl.State.DEAD);
+ } else {
+ // Internal error that should not be possible.
+ __log.fatal("InternalError: Unexpected message exchange state!");
+ MexDaoUtil.setFailed(mexdao, MessageExchange.FailureType.OTHER, "Unexpected message exchange state");
+ }
+
+ executeContinueInstancePartnerRoleResponseReceived(mexdao);
+ }
+ });
+ }
+ }
}
Modified: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (original)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Thu Jan 8 17:11:01 2009
@@ -314,15 +314,8 @@
UnreliablePartnerRoleMessageExchangeImpl _unreliableMex;
- BpelInstanceWorker _iworker;
-
- /** Keep a copy of the last BpelRuntimeContextImpl; this is used to optimize away a DB read. */
- BpelRuntimeContextImpl _lastBRC;
-
/**
- *
- * @param blockingMex
- * the MEX we're invoking on the partner
+ * @param blockingMex the MEX we're invoking on the partner
*/
public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
_unreliableMex = blockingMex;
Added: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTInMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTInMessageExchangeImpl.java?rev=732899&view=auto
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTInMessageExchangeImpl.java (added)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTInMessageExchangeImpl.java Thu Jan 8 17:11:01 2009
@@ -0,0 +1,143 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import java.util.concurrent.*;
+
+public class RESTInMessageExchangeImpl extends MessageExchangeImpl implements RESTInMessageExchange {
+
+ private static final Log __log = LogFactory.getLog(RESTInMessageExchangeImpl.class);
+
+ private boolean _done = false;
+ private ResponseFuture _future;
+
+ private Resource _resource;
+
+ public RESTInMessageExchangeImpl(ODEProcess process, String mexId, Resource resource) {
+ super(process, null, mexId, null, null, null);
+ _resource = resource;
+ }
+
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+ public Resource getResource() {
+ return _resource;
+ }
+
+ public void setRequest(final Message request) {
+ _request = (MessageImpl) request;
+ _changes.add(Change.REQUEST);
+ }
+
+ public Status invokeBlocking() throws BpelEngineException, TimeoutException {
+ if (_done) return getStatus();
+
+ Future<Status> future = _future != null ? _future : invokeAsync();
+ try {
+ future.get(Math.max(_timeout, 1), TimeUnit.MILLISECONDS);
+ _done = true;
+ return getStatus();
+ } catch (InterruptedException e) {
+ throw new BpelEngineException(e);
+ } catch (ExecutionException e) {
+ throw new BpelEngineException(e.getCause());
+ }
+ }
+
+ public Future<Status> invokeAsync() {
+ if (_future != null) return _future;
+
+ _future = new ResponseFuture();
+ _process.enqueueTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ MessageExchangeDAO dao = doInvoke();
+ if (dao.getStatus() == Status.ACK) {
+ // not really an async ack, same idea.
+ onAsyncAck(dao);
+ }
+ return null;
+ }
+ });
+ return _future;
+ }
+
+ protected MessageExchangeDAO doInvoke() {
+ if (getStatus() != Status.NEW) throw new IllegalStateException("Invalid state: " + getStatus());
+ request();
+
+ MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(),
+ MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ save(dao);
+ if (__log.isDebugEnabled()) __log.debug("invoke() EPR= " + _epr + " ==> " + _process);
+ try {
+ _process.invokeProcess(dao);
+ } finally {
+ if (dao.getStatus() == Status.ACK) {
+ _failureType = dao.getFailureType();
+ _fault = dao.getFault();
+ _explanation = dao.getFaultExplanation();
+ ack(dao.getAckType());
+ }
+ }
+ return dao;
+ }
+
+ protected void onAsyncAck(MessageExchangeDAO mexdao) {
+ final MemBackedMessageImpl response;
+ final QName fault = mexdao.getFault();
+ final FailureType failureType = mexdao.getFailureType();
+ final AckType ackType = mexdao.getAckType();
+ final String explanation = mexdao.getFaultExplanation();
+ switch (mexdao.getAckType()) {
+ case RESPONSE:
+ case FAULT:
+ response = new MemBackedMessageImpl(mexdao.getResponse().getHeader(),
+ mexdao.getResponse().getData(), mexdao.getResponse().getType(), false);
+ break;
+ default:
+ response = null;
+ }
+
+ final ResponseFuture f = _future;
+ // Lets be careful, the TX can still rollback!
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ _response = response;
+ _fault = fault;
+ _failureType = failureType;
+ _explanation = explanation;
+
+ ack(ackType);
+ _future.done(Status.ACK);
+
+ }
+ });
+ }
+
+ @Override
+ void save(MessageExchangeDAO dao) {
+ super.save(dao);
+ dao.setResource(_resource.getUrl() + "~" + _resource.getMethod());
+
+ if (_changes.contains(Change.REQUEST)) {
+ _changes.remove(Change.REQUEST);
+ MessageDAO requestDao = dao.createMessage(_request.getType());
+ requestDao.setData(_request.getMessage());
+ requestDao.setHeader(_request.getHeader());
+ dao.setRequest(requestDao);
+ }
+ }
+
+ @Override
+ void load(MessageExchangeDAO dao) {
+ super.load(dao);
+ _resource = ((ODERESTProcess)_process).getResource(dao.getResource());
+ }
+}
Added: ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTOutMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTOutMessageExchangeImpl.java?rev=732899&view=auto
==============================================================================
--- ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTOutMessageExchangeImpl.java (added)
+++ ode/branches/restful/engine/src/main/java/org/apache/ode/bpel/engine/RESTOutMessageExchangeImpl.java Thu Jan 8 17:11:01 2009
@@ -0,0 +1,72 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.w3c.dom.Element;
+
+public class RESTOutMessageExchangeImpl extends MessageExchangeImpl implements RESTOutMessageExchange {
+
+ private Resource _resource;
+
+ protected State _state = State.INVOKE_XXX;
+
+ /** the states for a partner mex. */
+ enum State {
+ /** state when we're in one of the MexContext.invokeXXX methods. */
+ INVOKE_XXX,
+ /** hold all actions (blocks the IL) */
+ HOLD,
+ /** the MEX is dead, it should no longer be accessed by the IL */
+ DEAD
+ };
+
+ public RESTOutMessageExchangeImpl(ODEProcess process, String mexId, Resource resource) {
+ super(process, null, mexId, null, null, null);
+ _resource = resource;
+ }
+
+ public Resource getTargetResource() {
+ return _resource;
+ }
+
+ public Message getRequest() {
+ return _request;
+ }
+
+ public void reply(Message response) throws BpelEngineException {
+ _response = (MessageImpl) response;
+ _fault = null;
+ _failureType = null;
+ ack(AckType.RESPONSE);
+ }
+
+ public void replyWithFailure(FailureType type, String description, Element details) throws BpelEngineException {
+ _failureType = type;
+ _explanation = description;
+ _fault = null;
+ _response = null;
+ ack(AckType.FAILURE);
+ }
+
+ @Override
+ void load(MessageExchangeDAO dao) {
+ super.load(dao);
+ String resStr = dao.getResource();
+ int sepIdx = resStr.indexOf("~");
+ _resource = new Resource(resStr.substring(0, sepIdx), "application/xml", resStr.substring(sepIdx+1));
+ }
+
+ @Override
+ void save(MessageExchangeDAO dao) {
+ super.save(dao);
+ dao.setResource(_resource.getUrl() + "~" + _resource.getMethod());
+ }
+
+ public void setState(State s) {
+ _state = s;
+ }
+
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+}
Modified: ode/branches/restful/engine/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/engine/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/engine/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/restful/engine/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Thu Jan 8 17:11:01 2009
@@ -208,8 +208,6 @@
}
public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
- // TODO Auto-generated method stub
-
}
public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
@@ -217,20 +215,16 @@
}
public void invokePartnerUnreliable(PartnerRoleMessageExchange mex) throws ContextException {
- // TODO Auto-generated method stub
-
}
public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
- // TODO Auto-generated method stub
-
}
public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
- // TODO Auto-generated method stub
-
}
+ public void invokeRestful(RESTOutMessageExchange mex) throws ContextException {
+ }
};
return _mexContext;
}
Modified: ode/branches/restful/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java (original)
+++ ode/branches/restful/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java Thu Jan 8 17:11:01 2009
@@ -25,14 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.*;
/**
@@ -64,14 +57,15 @@
public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
throw new ContextException("Unsupported.");
-
}
public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
throw new ContextException("Unsupported.");
-
}
+ public void invokeRestful(RESTOutMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException("No support for RESTful invocations");
+ }
public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
// What can we do in JBI to cancel? --- not much.
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/INVOKE.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/INVOKE.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/INVOKE.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/INVOKE.java Thu Jan 8 17:11:01 2009
@@ -26,18 +26,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.FaultException;
-import org.apache.ode.bpel.evt.ActivityFailureEvent;
-import org.apache.ode.bpel.evt.ActivityRecoveryEvent;
import org.apache.ode.bpel.evt.VariableModificationEvent;
-import org.apache.ode.bpel.rtrep.v2.channels.ActivityRecoveryChannel;
-import org.apache.ode.bpel.rtrep.v2.channels.ActivityRecoveryChannelListener;
import org.apache.ode.bpel.rtrep.v2.channels.FaultData;
import org.apache.ode.bpel.rtrep.v2.channels.InvokeResponseChannel;
import org.apache.ode.bpel.rtrep.v2.channels.InvokeResponseChannelListener;
import org.apache.ode.bpel.rtrep.v2.channels.TerminationChannelListener;
-import org.apache.ode.bpel.rtrep.v2.channels.TimerResponseChannel;
-import org.apache.ode.bpel.rtrep.v2.channels.TimerResponseChannelListener;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
+import org.apache.ode.bpel.iapi.Resource;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -49,19 +44,10 @@
private static final Log __log = LogFactory.getLog(INVOKE.class);
private OInvoke _oinvoke;
- // Records number of invocations on the activity.
- private int _invoked;
- // Date/time of last failure.
- private Date _lastFailure;
- // Reason for last failure.
- private String _failureReason;
- // Data associated with failure.
- private Element _failureData;
public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
super(self, scopeFrame, linkFrame);
_oinvoke = (OInvoke) _self.o;
- _invoked = 0;
}
public final void run() {
@@ -78,8 +64,32 @@
_self.parent.failure(e.toString(), null);
return;
}
- ++_invoked;
+ if (_oinvoke.isRestful()) {
+ restInvoke(outboundMsg);
+ } else {
+ wsdlInvoke(outboundMsg);
+ }
+ }
+
+ private void restInvoke(Element outboundMsg) {
+ VariableInstance outputVar = _scopeFrame.resolve(_oinvoke.outputVar);
+ InvokeResponseChannel invokeResponseChannel = newChannel(InvokeResponseChannel.class);
+ try {
+ String path = getBpelRuntime().getExpLangRuntime()
+ .evaluateAsString(_oinvoke.resource.getSubpath(), getEvaluationContext());
+ String mexId = getBpelRuntime().invoke(invokeResponseChannel.export(),
+ new Resource(path, "application/xml", _oinvoke.resource.getMethod()), outboundMsg);
+ setupListeners(mexId, invokeResponseChannel, outputVar);
+ } catch (FaultException fault) {
+ __log.error(fault);
+ FaultData faultData = createFault(fault.getQName(), _oinvoke, fault.getMessage());
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+ }
+
+ }
+
+ private void wsdlInvoke(Element outboundMsg) {
// if there is no output variable, then this is a one-way invoke
boolean isTwoWay = _oinvoke.outputVar != null;
@@ -98,115 +108,117 @@
_scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
outboundMsg, invokeResponseChannel);
- object(false, new InvokeResponseChannelListener(invokeResponseChannel) {
- private static final long serialVersionUID = 4496880438819196765L;
-
- public void onResponse() {
- // we don't have to write variable data -> this already
- // happened in the nativeAPI impl
- FaultData fault = null;
-
- Element response;
- try {
- response = getBpelRuntime().getPartnerResponse(mexId);
- } catch (Exception e) {
- __log.error(e);
- // TODO: Better error handling
- throw new RuntimeException(e);
- }
+ setupListeners(mexId, invokeResponseChannel, outputVar);
+ }
+ } catch (FaultException fault) {
+ __log.error(fault);
+ FaultData faultData = createFault(fault.getQName(), _oinvoke, fault.getMessage());
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+ }
+ }
- try {
- initializeVariable(outputVar, response);
- } catch (ExternalVariableModuleException e) {
- __log.error("Exception while initializing external variable", e);
- _self.parent.failure(e.toString(), null);
- return;
- }
-
- // Generating event
- VariableModificationEvent se = new VariableModificationEvent(outputVar.declaration.name);
- se.setNewValue(response);
- if (_oinvoke.debugInfo != null)
- se.setLineNo(_oinvoke.debugInfo.startLine);
- sendEvent(se);
-
- try {
- for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
- initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
- }
- if (_oinvoke.partnerLink.hasPartnerRole()) {
- // Trying to initialize partner epr based on a message-provided epr/session.
- if (!getBpelRuntime().isPartnerRoleEndpointInitialized(_scopeFrame
- .resolve(_oinvoke.partnerLink)) || !_oinvoke.partnerLink.initializePartnerRole) {
-
- Node fromEpr = getBpelRuntime().getSourceEPR(mexId);
- if (fromEpr != null) {
- getBpelRuntime().writeEndpointReference(
- _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
- }
- }
-
- String partnersSessionId = getBpelRuntime().getSourceSessionId(mexId);
- if (partnersSessionId != null)
- getBpelRuntime().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.partnerLink),
- partnersSessionId);
-
+ private void setupListeners(final String mexId, final InvokeResponseChannel invokeResponseChannel, final VariableInstance outputVar) {
+ object(false, new InvokeResponseChannelListener(invokeResponseChannel) {
+ private static final long serialVersionUID = 4496880438819196765L;
+
+ public void onResponse() {
+ // we don't have to write variable data -> this already
+ // happened in the nativeAPI impl
+ FaultData fault = null;
+
+ Element response;
+ try {
+ response = getBpelRuntime().getPartnerResponse(mexId);
+ } catch (Exception e) {
+ __log.error(e);
+ // TODO: Better error handling
+ throw new RuntimeException(e);
+ }
+
+ try {
+ initializeVariable(outputVar, response);
+ } catch (ExternalVariableModuleException e) {
+ __log.error("Exception while initializing external variable", e);
+ _self.parent.failure(e.toString(), null);
+ return;
+ }
+
+ // Generating event
+ VariableModificationEvent se = new VariableModificationEvent(outputVar.declaration.name);
+ se.setNewValue(response);
+ if (_oinvoke.debugInfo != null)
+ se.setLineNo(_oinvoke.debugInfo.startLine);
+ sendEvent(se);
+
+ try {
+ for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
+ initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
+ }
+ if (_oinvoke.partnerLink != null && _oinvoke.partnerLink.hasPartnerRole()) {
+ // Trying to initialize partner epr based on a message-provided epr/session.
+ if (!getBpelRuntime().isPartnerRoleEndpointInitialized(_scopeFrame
+ .resolve(_oinvoke.partnerLink)) || !_oinvoke.partnerLink.initializePartnerRole) {
+
+ Node fromEpr = getBpelRuntime().getSourceEPR(mexId);
+ if (fromEpr != null) {
+ getBpelRuntime().writeEndpointReference(
+ _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
}
- } catch (FaultException e) {
- fault = createFault(e.getQName(), _oinvoke);
}
- // TODO update output variable with data from non-initiate correlation sets
-
- _self.parent.completed(fault, CompensationHandler.emptySet());
- getBpelRuntime().releasePartnerMex(mexId);
- }
-
- public void onFault() {
- QName faultName = getBpelRuntime().getPartnerFault(mexId);
- Element msg = getBpelRuntime().getPartnerResponse(mexId);
- QName msgType = getBpelRuntime().getPartnerResponseType(mexId);
- FaultData fault = createFault(faultName, msg,
- _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
- _self.parent.completed(fault, CompensationHandler.emptySet());
- getBpelRuntime().releasePartnerMex(mexId);
- }
-
- public void onFailure() {
- // This indicates a communication failure. We don't throw a fault,
- // because there is no fault, instead we'll re-incarnate the invoke
- // and either retry or indicate failure condition.
- // admin to resume the process.
- _self.parent.failure(getBpelRuntime().getPartnerFaultExplanation(mexId), null);
- getBpelRuntime().releasePartnerMex(mexId);
+ String partnersSessionId = getBpelRuntime().getSourceSessionId(mexId);
+ if (partnersSessionId != null)
+ getBpelRuntime().initializePartnersSessionId(
+ _scopeFrame.resolve(_oinvoke.partnerLink), partnersSessionId);
}
+ } catch (FaultException e) {
+ fault = createFault(e.getQName(), _oinvoke);
+ }
+
+ // TODO update output variable with data from non-initiate correlation sets
+
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ getBpelRuntime().releasePartnerMex(mexId);
+ }
+
+ public void onFault() {
+ QName faultName = getBpelRuntime().getPartnerFault(mexId);
+ Element msg = getBpelRuntime().getPartnerResponse(mexId);
+ QName msgType = getBpelRuntime().getPartnerResponseType(mexId);
+ FaultData fault = createFault(faultName, msg,
+ _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ getBpelRuntime().releasePartnerMex(mexId);
+ }
+
+ public void onFailure() {
+ // This indicates a communication failure. We don't throw a fault,
+ // because there is no fault, instead we'll re-incarnate the invoke
+ // and either retry or indicate failure condition.
+ // admin to resume the process.
+ _self.parent.failure(getBpelRuntime().getPartnerFaultExplanation(mexId), null);
+ getBpelRuntime().releasePartnerMex(mexId);
+ }
- }.or(new TerminationChannelListener(_self.self) {
- private static final long serialVersionUID = 4219496341785922396L;
+ }.or(new TerminationChannelListener(_self.self) {
+ private static final long serialVersionUID = 4219496341785922396L;
- public void terminate() {
- _self.parent.completed(null, CompensationHandler.emptySet());
- object(new InvokeResponseChannelListener(invokeResponseChannel) {
- private static final long serialVersionUID = 688746737897792929L;
- public void onFailure() {
- __log.debug("Failure on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
- }
- public void onFault() {
- __log.debug("Fault on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
- }
- public void onResponse() {
- __log.debug("Response on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
- }
- });
+ public void terminate() {
+ _self.parent.completed(null, CompensationHandler.emptySet());
+ object(new InvokeResponseChannelListener(invokeResponseChannel) {
+ private static final long serialVersionUID = 688746737897792929L;
+ public void onFailure() {
+ __log.debug("Failure on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
}
- }));
-
- }
- } catch (FaultException fault) {
- __log.error(fault);
- FaultData faultData = createFault(fault.getQName(), _oinvoke, fault.getMessage());
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- }
+ public void onFault() {
+ __log.debug("Fault on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
+ }
+ public void onResponse() {
+ __log.debug("Response on invoke ignored, the invoke has already been terminated: " + _oinvoke.toString());
+ }
+ });
+ }
+ }));
}
private Element setupOutbound(OInvoke oinvoke, Collection<OScope.CorrelationSet> outboundInitiations)
@@ -226,96 +238,4 @@
} else return null;
}
- private void retryOrFailure(String reason, Element data) {
- _lastFailure = new Date();
- _failureReason = reason;
- _failureData = data;
-
- OFailureHandling failureHandling = _oinvoke.getFailureHandling();
- if (failureHandling != null && failureHandling.faultOnFailure) {
- // No attempt to retry or enter activity recovery state, simply fault.
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Invoke activity " + _self.aId + " faulting on failure");
- FaultData faultData = createFault(OFailureHandling.FAILURE_FAULT_NAME, _oinvoke, reason);
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- return;
- }
- // If maximum number of retries, enter activity recovery state.
- if (failureHandling == null || _invoked > failureHandling.retryFor) {
- requireRecovery();
- return;
- }
-
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Retrying invoke activity " + _self.aId);
- Date future = new Date(new Date().getTime() + failureHandling.retryDelay * 1000);
- final TimerResponseChannel timerChannel = newChannel(TimerResponseChannel.class);
- getBpelRuntime().registerTimer(timerChannel, future);
-
- object(false, new TimerResponseChannelListener(timerChannel) {
- private static final long serialVersionUID = -261911108068231376L;
- public void onTimeout() {
- instance(INVOKE.this);
- }
- public void onCancel() {
- INVOKE.this.requireRecovery();
- }
- }.or(new TerminationChannelListener(_self.self) {
- private static final long serialVersionUID = -4416795170896911290L;
-
- public void terminate() {
- _self.parent.completed(null, CompensationHandler.emptySet());
- object(new TimerResponseChannelListener(timerChannel) {
- private static final long serialVersionUID = 4822348066868313717L;
- public void onTimeout() { }
- public void onCancel() { }
- });
- }
- }));
- }
-
- private void requireRecovery() {
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Invoke activity " + _self.aId + " requires recovery");
- sendEvent(new ActivityFailureEvent(_failureReason));
- final ActivityRecoveryChannel recoveryChannel = newChannel(ActivityRecoveryChannel.class);
- getBpelRuntime().registerActivityForRecovery(recoveryChannel, _self.aId, _failureReason, _lastFailure, _failureData,
- new String[] { "retry", "cancel", "fault" }, _invoked - 1);
- object(false, new ActivityRecoveryChannelListener(recoveryChannel) {
- private static final long serialVersionUID = 8397883882810521685L;
- public void retry() {
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Retrying invoke activity " + _self.aId + " (user initiated)");
- sendEvent(new ActivityRecoveryEvent("retry"));
- getBpelRuntime().unregisterActivityForRecovery(recoveryChannel);
- instance(INVOKE.this);
- }
- public void cancel() {
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Cancelling invoke activity " + _self.aId + " (user initiated)");
- sendEvent(new ActivityRecoveryEvent("cancel"));
- getBpelRuntime().unregisterActivityForRecovery(recoveryChannel);
- _self.parent.cancelled();
- }
- public void fault(FaultData faultData) {
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Faulting invoke activity " + _self.aId + " (user initiated)");
- sendEvent(new ActivityRecoveryEvent("fault"));
- getBpelRuntime().unregisterActivityForRecovery(recoveryChannel);
- if (faultData == null)
- faultData = createFault(OFailureHandling.FAILURE_FAULT_NAME, _self.o, _failureReason);
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- }
- }.or(new TerminationChannelListener(_self.self) {
- private static final long serialVersionUID = 2148587381204858397L;
-
- public void terminate() {
- if (__log.isDebugEnabled())
- __log.debug("ActivityRecovery: Cancelling invoke activity " + _self.aId + " (terminated by scope)");
- getBpelRuntime().unregisterActivityForRecovery(recoveryChannel);
- _self.parent.completed(null, CompensationHandler.emptySet());
- }
- }));
- }
-
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OInvoke.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OInvoke.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OInvoke.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OInvoke.java Thu Jan 8 17:11:01 2009
@@ -33,6 +33,7 @@
public OScope.Variable inputVar;
public OScope.Variable outputVar;
public Operation operation;
+ public OResource resource;
/** Correlation sets initialized on the input message. */
public final List<OScope.CorrelationSet> initCorrelationsInput = new ArrayList<OScope.CorrelationSet>();
@@ -59,10 +60,11 @@
}
public OResource getResource() {
- return null;
+ return resource;
}
+
public boolean isRestful() {
- return false;
+ return resource != null;
}
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OResource.java Thu Jan 8 17:11:01 2009
@@ -12,6 +12,7 @@
private OResource reference;
private String method;
private boolean instantiateResource;
+ private boolean inbound;
private OScope declaringScope;
public OResource(OProcess owner) {
@@ -58,6 +59,14 @@
this.instantiateResource = instantiateResource;
}
+ public boolean isInbound() {
+ return inbound;
+ }
+
+ public void setInbound(boolean inbound) {
+ this.inbound = inbound;
+ }
+
public OScope getDeclaringScope() {
return declaringScope;
}
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/OdeInternalInstance.java Thu Jan 8 17:11:01 2009
@@ -46,6 +46,9 @@
String invoke(String invokeId, PartnerLinkInstance instance, Operation operation, Element outboundMsg, Object object)
throws FaultException;
+ String invoke(String requestId, org.apache.ode.bpel.iapi.Resource resource, Element outgoingMessage)
+ throws FaultException;
+
Node getPartData(Element message, OMessageVarType.Part part);
Element getPartnerResponse(String mexId);
Modified: ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java (original)
+++ ode/branches/restful/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v2/RuntimeInstanceImpl.java Thu Jan 8 17:11:01 2009
@@ -613,6 +613,11 @@
}
}
+ public String invoke(String requestId, org.apache.ode.bpel.iapi.Resource resource, Element outgoingMessage)
+ throws FaultException {
+ return _brc.invoke(requestId, resource, outgoingMessage);
+ }
+
/**
* @return
*/
Modified: ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java (original)
+++ ode/branches/restful/runtimes/src/test/java/org/apache/ode/bpel/rtrep/v2/CoreBpelTest.java Thu Jan 8 17:11:01 2009
@@ -40,6 +40,8 @@
import org.apache.ode.bpel.rapi.*;
import org.apache.ode.bpel.rtrep.v2.channels.*;
import org.apache.ode.bpel.extension.ExtensionOperation;
+import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Resource;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.junit.Assert;
@@ -148,6 +150,10 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public String invoke(String requestId, Resource resource, Element outgoingMessage) throws FaultException {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void registerTimer(TimerResponseChannel timerChannel, Date future) {
//To change body of implemented methods use File | Settings | File Templates.
}
Modified: ode/branches/restful/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/branches/restful/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=732899&r1=732898&r2=732899&view=diff
==============================================================================
--- ode/branches/restful/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/branches/restful/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Thu Jan 8 17:11:01 2009
@@ -218,4 +218,4 @@
return scheduler;
}
-}
+}
\ No newline at end of file