You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by rr...@apache.org on 2010/02/16 17:08:07 UTC

svn commit: r910573 - in /ode/branches/APACHE_ODE_1.X: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ bpel-schemas/src/main/xsd/ jbi/src/test/java/org/apache/ode/jbi/ jbi/src/test/...

Author: rr
Date: Tue Feb 16 16:08:06 2010
New Revision: 910573

URL: http://svn.apache.org/viewvc?rev=910573&view=rev
Log:
New features for Instance Replayer: live communication and rollbackOnFault. Test cases included. 

Added:
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml   (with props)
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml   (with props)
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml   (with props)
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml   (with props)
Modified:
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
    ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
    ode/branches/APACHE_ODE_1.X/jbi/src/test/java/org/apache/ode/jbi/JbiTestBase.java
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation.bpel
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation2.bpel
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/deploy.xml
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/test.properties
    ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Feb 16 16:08:06 2010
@@ -102,7 +102,7 @@
     private static final double PROCESS_OVERHEAD_MEMORY_FACTOR = 1.2;
 
     /** Active processes, keyed by process id. */
-    final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
+    public final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
 
     /** Mapping from myrole service name to active process. */
     private final HashMap<QName, List<BpelProcess>> _serviceMap = new HashMap<QName, List<BpelProcess>>();

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Feb 16 16:08:06 2010
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Set;
 
+import javax.wsdl.Fault;
 import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
 
@@ -502,16 +503,70 @@
     public void processOutstandingRequest(PartnerLinkInstance partnerLink, String opName, String bpelMexId, String odeMexId) throws FaultException {
         String mexRef = _imaManager.processOutstandingRequest(partnerLink, opName, bpelMexId, odeMexId);
         if (mexRef != null) {
-            reply(mexRef, partnerLink, opName, bpelMexId, null, _bpelProcess.getOProcess().constants.qnConflictingRequest, true);
+            reply2(partnerLink, opName, bpelMexId, null, _bpelProcess.getOProcess().constants.qnConflictingRequest, false, mexRef);
             throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingRequest);
         }
     }
 
-    public void reply(String mexRef, final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, QName fault, boolean failure) throws FaultException {
-        if (mexRef == null) {
-            throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
+    protected void doAsyncReply(MyRoleMessageExchangeImpl m) {
+    	MessageExchangeDAO mex = m.getDAO();
+        PartnerRoleMessageExchange pmex = null;
+
+        if (mex.getPipedMessageExchangeId() != null) {
+            pmex = (PartnerRoleMessageExchange) _bpelProcess
+                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
         }
 
+        if (pmex != null) {
+            if (BpelProcess.__log.isDebugEnabled()) {
+                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
+            }
+            try {
+                switch (m.getStatus()) {
+                    case FAILURE:
+                        // We can't seem to get the failure out of the myrole mex?
+                        pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
+                        break;
+                    case FAULT:
+                    	Fault fault = pmex.getOperation().getFault(m.getFault().getLocalPart());
+                    	if (fault == null) {
+                    		__log.error("process " + _bpelProcess + " instance " + _iid + " thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure");
+                            pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure", m.getFaultResponse().getMessage());
+                    	} else {
+	                        Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
+	                                .getMessage().getQName());
+	                        faultRes.setMessage(m.getResponse().getMessage());
+	                        pmex.replyWithFault(m.getFault(), faultRes);
+                    	}
+                        break;
+                    case RESPONSE:
+                        Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                        response.setMessage(m.getResponse().getMessage());
+                        pmex.reply(response);
+                        break;
+                    default:
+                        __log.warn("Unexpected state: " + m.getStatus());
+                        break;
+                }
+            } finally {
+                mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+            }
+        } else {
+            checkInvokeExternalPermission();
+        	_bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
+            //mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+        }
+    }
+
+    public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
+            QName fault) throws FaultException {
+        String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
+        reply2(plinkInstnace, opName, mexId, msg, fault, false, mexRef);
+    }
+
+    public void reply2(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
+                      QName fault, boolean failure, final String mexRef) throws FaultException {
+
         // prepare event
         ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
         evt.setMexId(mexId);
@@ -539,50 +594,12 @@
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
 
-        if (mex.getPipedMessageExchangeId() != null) {
-            PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess
-                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
-            }
-            try {
-                if (pmex != null) {
-                    switch (m.getStatus()) {
-                        case FAILURE:
-                            // We can't seem to get the failure out of the myrole mex?
-                            pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
-                            break;
-                        case FAULT:
-                            Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
-                                    .getMessage().getQName());
-                            faultRes.setMessage(m.getResponse().getMessage());
-                            pmex.replyWithFault(m.getFault(), faultRes);
-                            break;
-                        case RESPONSE:
-                            Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                            response.setMessage(m.getResponse().getMessage());
-                            pmex.reply(response);
-                            break;
-                        default:
-                            __log.warn("Unexpected state: " + m.getStatus());
-                            break;
-                    }
-                }
-            } finally {
-                mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
-            }
-        } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
+        doAsyncReply(m);
 
         // send event
         sendEvent(evt);
     }
 
-    public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
-                      QName fault) throws FaultException {
-        String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
-        reply(mexRef, plinkInstnace, opName, mexId, msg, fault, false);
-    }
-
     /**
      * @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
      *      org.apache.ode.bpel.common.CorrelationKey)
@@ -688,6 +705,8 @@
         }
     }
 
+    public void checkInvokeExternalPermission() {}
+
     /**
      * Called back when the process executes an invokation.
      * 
@@ -813,6 +832,7 @@
             // If we couldn't find the endpoint, then there is no sense
             // in asking the IL to invoke.
             if (partnerEpr != null) {
+                checkInvokeExternalPermission();
                 mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
                 mex.setStatus(MessageExchange.Status.REQUEST);
                 // Assuming an unreliable protocol, we schedule a task to check if recovery mode will be needed
@@ -1145,8 +1165,7 @@
                         }
                     default:
                         mex.setFailure(FailureType.OTHER, "No response.", null);
-                        _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
-                        mex.release(_bpelProcess.isCleanupCategoryEnabled(true, CLEANUP_CATEGORY.MESSAGES));
+                    	doAsyncReply(mex);
                 }
             }
         }
@@ -1167,7 +1186,7 @@
 
                 mex.setFault(faultData.getFaultName(), message);
                 mex.setFaultExplanation(faultData.getExplanation());
-                _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
+                doAsyncReply(mex);
             }
         }
     }
@@ -1180,7 +1199,7 @@
                 MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
                 _bpelProcess.initMyRoleMex(mex);
                 mex.setFailure(FailureType.OTHER, "No response.", null);
-                _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
+                doAsyncReply(mex);
             }
         }
     }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Feb 16 16:08:06 2010
@@ -34,6 +34,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.replayer.Replayer;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
@@ -49,9 +50,8 @@
 
 public class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange {
 
-
     private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
-    
+
     protected BpelProcess _process;
 
     protected static Map<String, ResponseCallback> _waitingCallbacks =
@@ -66,7 +66,7 @@
         return CorrelationStatus.valueOf(getDAO().getCorrelationStatus());
     }
 
-    void setCorrelationStatus(CorrelationStatus status) {
+    public void setCorrelationStatus(CorrelationStatus status) {
         getDAO().setCorrelationStatus(status.toString());
     }
 
@@ -75,8 +75,7 @@
      * 
      * @param mex
      *            message exchange
-     * @return <code>true</code> if execution should continue,
-     *         <code>false</code> otherwise
+     * @return <code>true</code> if execution should continue, <code>false</code> otherwise
      */
     private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
         InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), 
@@ -89,8 +88,7 @@
         return true;
     }
 
-    boolean processInterceptor(MessageExchangeInterceptor i, MyRoleMessageExchangeImpl mex, InterceptorContext ictx,
-            InterceptorInvoker invoker) {
+    boolean processInterceptor(MessageExchangeInterceptor i, MyRoleMessageExchangeImpl mex, InterceptorContext ictx, InterceptorInvoker invoker) {
         __log.debug(invoker + "--> interceptor " + i);
         try {
             invoker.invoke(i, mex, ictx);
@@ -100,8 +98,7 @@
             return false;
         } catch (AbortMessageExchangeException ame) {
             __log.debug("interceptor " + i + " cause invoke on " + this + " to be aborted with FAILURE: " + ame.getMessage());
-            mex.setFailure(MessageExchange.FailureType.ABORTED, __msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
-                    .toString(), ame.getMessage()), null);
+            mex.setFailure(MessageExchange.FailureType.ABORTED, __msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i.toString(), ame.getMessage()), null);
             return false;
         }
         return true;
@@ -147,10 +144,15 @@
             }
 
             setStatus(Status.ASYNC);
-            if (target.isInMemory())
-                _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
-            else
-                _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+            Replayer replayer = Replayer.replayer.get();
+            if (replayer == null) {
+                if (target.isInMemory())
+                    _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+                else
+                    _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+            } else {
+                replayer.scheduler.schedulePersistedJob(we.getDetail(), null);
+            }
             return new ResponseFuture(getClientId());
         }
     }
@@ -172,8 +174,7 @@
 
     public String toString() {
         try {
-            return "{MyRoleMex#" + getMessageExchangeId() + " [Client " + getClientId() + "] calling " + getServiceName() + "."
-                    + getOperationName() + "(...)}";
+            return "{MyRoleMex#" + getMessageExchangeId() + " [Client " + getClientId() + "] calling " + getServiceName() + "." + getOperationName() + "(...)}";
         } catch (Throwable t) {
             return "{MyRoleMex#???}";
         }
@@ -190,7 +191,7 @@
         }
         _dao = null;
     }
-    
+
     /**
      * Return a deep clone of the given message
      * 
@@ -223,6 +224,7 @@
         public boolean cancel(boolean mayInterruptIfRunning) {
             throw new UnsupportedOperationException();
         }
+
         public Object get() throws InterruptedException, ExecutionException {
             try {
                 return get(0, TimeUnit.MILLISECONDS);
@@ -231,6 +233,7 @@
                 throw new ExecutionException(e);
             }
         }
+
         public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
             ResponseCallback callback = _waitingCallbacks.get(_clientId);
             if (callback != null) {
@@ -241,9 +244,11 @@
             }
             return null;
         }
+
         public boolean isCancelled() {
             return false;
         }
+
         public boolean isDone() {
             return _done;
         }
@@ -262,6 +267,7 @@
                     __log.warn("Transaction is rolled back on sending back the response.");
                 }
             }
+
             public void beforeCompletion() {
             }
         });

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Tue Feb 16 16:08:06 2010
@@ -23,6 +23,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.engine.WorkEvent.Type;
+import org.apache.ode.bpel.engine.replayer.Replayer;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.Message;
@@ -137,10 +138,15 @@
         we.setInMem(_engine._activeProcesses.get(getDAO().getProcess().getProcessId()).isInMemory());
         we.setChannel(getDAO().getChannel());
         we.setMexId(getDAO().getMessageExchangeId());
-        if (we.isInMem())
-            _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
-        else
-            _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+        Replayer replayer = Replayer.replayer.get();
+        if (replayer == null) {
+            if (we.isInMem())
+                _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+            else
+                _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+        } else {
+            replayer.scheduler.schedulePersistedJob(we.getDetail(), null);
+        }
     }
 
     /**

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Tue Feb 16 16:08:06 2010
@@ -33,29 +33,29 @@
 
     private Map<String, Object> _jobDetail;
 
-    WorkEvent(Map<String, Object> jobDetail) {
+    public WorkEvent(Map<String, Object> jobDetail) {
         _jobDetail = jobDetail;
     }
 
-    WorkEvent() {
+    public WorkEvent() {
         _jobDetail = new HashMap<String, Object>();
     }
 
-    Long getIID() {
+    public Long getIID() {
         return (Long) _jobDetail.get("iid");
     }
 
-    Type getType() {
+    public Type getType() {
         return Type.valueOf((String) _jobDetail.get("type"));
     }
 
-    void setType(Type timer) {
+    public void setType(Type timer) {
 
         _jobDetail.put("type", timer.toString());
 
     }
 
-    Map<String, Object> getDetail() {
+    public Map<String, Object> getDetail() {
         return _jobDetail;
     }
 

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java Tue Feb 16 16:08:06 2010
@@ -25,15 +25,26 @@
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.engine.BpelEngineImpl;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
+import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl;
+import org.apache.ode.bpel.engine.WorkEvent;
+import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
+import org.apache.ode.bpel.evt.CorrelationMatchEvent;
 import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.bpel.pmapi.CommunicationType;
@@ -43,6 +54,7 @@
 import org.apache.ode.bpel.pmapi.GetCommunicationResponse;
 import org.apache.ode.bpel.pmapi.Replay;
 import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
+import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.xmlbeans.XmlCalendar;
 import org.apache.xmlbeans.XmlException;
 import org.apache.xmlbeans.XmlObject;
@@ -56,62 +68,74 @@
  */
 public class Replayer {
     private static final Log __log = LogFactory.getLog(Replayer.class);
+    public static ThreadLocal<Replayer> replayer = new ThreadLocal<Replayer>();
     public ReplayerScheduler scheduler = new ReplayerScheduler();
+    public BpelEngineImpl engine = null;
+    public List<ReplayerContext> contexts = null;
+    public BpelDAOConnection conn = null;
 
     public List<Long> replayInstances(Replay request, BpelEngine engine, BpelDAOConnection conn) throws Exception {
-        Date startDate = Calendar.getInstance().getTime();
-        List<ReplayerContext> contexts = new ArrayList<ReplayerContext>();
-        {
-            List<Long> toDelete = new ArrayList<Long>();
-            List<CommunicationType> toRestore = new ArrayList<CommunicationType>();
-
-            toDelete.addAll(request.getReplaceInstanceList());
-
-            for (Long iid : request.getUpgradeInstanceList()) {
-                toDelete.add(iid);
-                toRestore.add(CommunicationType.Factory.parse(getCommunication(iid, conn).toString()));
-            }
-            toRestore.addAll(request.getRestoreInstanceList());
-
+        try {
+            replayer.set(this);
+            this.engine = (BpelEngineImpl) engine;
+            this.conn = conn;
+            
+            Date startDate = Calendar.getInstance().getTime();
+            contexts = new ArrayList<ReplayerContext>();
             {
-                Set<CLEANUP_CATEGORY> cleanupCategory = new HashSet<CLEANUP_CATEGORY>();
-                cleanupCategory.add(CLEANUP_CATEGORY.INSTANCE);
-                cleanupCategory.add(CLEANUP_CATEGORY.MESSAGES);
-                cleanupCategory.add(CLEANUP_CATEGORY.VARIABLES);
-                cleanupCategory.add(CLEANUP_CATEGORY.CORRELATIONS);
-                cleanupCategory.add(CLEANUP_CATEGORY.EVENTS);
-
-                for (Long l : toDelete) {
-                    conn.getInstance(l).delete(cleanupCategory);
+                List<Long> toDelete = new ArrayList<Long>();
+                List<CommunicationType> toRestore = new ArrayList<CommunicationType>();
+    
+                toDelete.addAll(request.getReplaceInstanceList());
+    
+                for (Long iid : request.getUpgradeInstanceList()) {
+                    toDelete.add(iid);
+                    toRestore.add(CommunicationType.Factory.parse(getCommunication(iid, conn).toString()));
+                }
+                toRestore.addAll(request.getRestoreInstanceList());
+    
+                {
+                    Set<CLEANUP_CATEGORY> cleanupCategory = new HashSet<CLEANUP_CATEGORY>();
+                    cleanupCategory.add(CLEANUP_CATEGORY.INSTANCE);
+                    cleanupCategory.add(CLEANUP_CATEGORY.MESSAGES);
+                    cleanupCategory.add(CLEANUP_CATEGORY.VARIABLES);
+                    cleanupCategory.add(CLEANUP_CATEGORY.CORRELATIONS);
+                    cleanupCategory.add(CLEANUP_CATEGORY.EVENTS);
+    
+                    for (Long l : toDelete) {
+                        conn.getInstance(l).delete(cleanupCategory);
+                    }
+                }
+    
+                for (CommunicationType r : toRestore) {
+                    ReplayerContext context = new ReplayerContext(startDate);
+                    context.bpelEngine = (BpelEngineImpl) engine;
+                    context.init(r, scheduler);
+                    contexts.add(context);
                 }
             }
-
-            for (CommunicationType r : toRestore) {
-                ReplayerContext context = new ReplayerContext(startDate);
-                context.bpelEngine = (BpelEngineImpl) engine;
-                context.init(r, scheduler);
-                contexts.add(context);
+    
+            scheduler.startReplaying(this);
+            {
+                List<Exchange> remainingExchanges = new ArrayList<Exchange>();
+    
+                for (ReplayerContext c : contexts) {
+                    c.answers.remainingExchanges(remainingExchanges);
+                }
+                if (remainingExchanges.size() > 0) {
+                    throw new RemainingExchangesException(remainingExchanges);
+                }
             }
-        }
-
-        scheduler.startReplaying();
-        {
-            List<Exchange> remainingExchanges = new ArrayList<Exchange>();
-
+    
+            List<Long> r = new ArrayList<Long>();
             for (ReplayerContext c : contexts) {
-                c.answers.remainingExchanges(remainingExchanges);
-            }
-            if (remainingExchanges.size() > 0) {
-                throw new RemainingExchangesException(remainingExchanges);
+                r.add(c.runtimeContext.getPid());
             }
+    
+            return r;
+        } finally {
+            replayer.set(null);
         }
-
-        List<Long> r = new ArrayList<Long>();
-        for (ReplayerContext c : contexts) {
-            r.add(c.runtimeContext.getPid());
-        }
-
-        return r;
     }
 
     public GetCommunicationResponse getCommunication(GetCommunication request, BpelDAOConnection conn) throws Exception {
@@ -188,4 +212,70 @@
         }
         return result;
     }
+    
+    public ReplayerContext findReplayedInstance(long iid) {
+        for (ReplayerContext r : contexts) {
+            if (r.runtimeContext.getPid() == iid) {
+                return r;
+            }
+        }
+        return null;
+    }
+    
+    public void handleWorkEvent(Map<String, Object> jobDetail, final Date when) {
+        WorkEvent we = new WorkEvent(jobDetail);
+        __log.debug("handleWorkEvent " + jobDetail + " " + when);
+        if (we.getType() == WorkEvent.Type.INVOKE_INTERNAL) {
+            final BpelProcess p = engine._activeProcesses.get(we.getProcessId());
+            final ProcessDAO processDAO = p.getProcessDAO();
+            final MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) engine.getMessageExchange(we.getMexId());
+
+            p.invokeProcess(mex,
+            // time,
+                    new BpelProcess.InvokeHandler() {
+                        public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
+                            if (routing.messageRoute == null && createInstance) {
+                                __log.debug("creating new instance via live communication mex:" + mex);
+                                ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
+
+                                ReplayerContext context = new ReplayerContext(null);
+                                context.bpelEngine = (BpelEngineImpl) engine;
+                                contexts.add(context);
+
+                                ReplayerBpelRuntimeContextImpl runtimeContext = new ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), mex,
+                                // time,
+                                        context);
+                                context.runtimeContext = runtimeContext;
+                                runtimeContext.setCurrentEventDateTime(when);
+                                runtimeContext.updateMyRoleMex(mex);
+                                // first receive is matched to provided
+                                // mex
+                                runtimeContext.execute();
+                                return true;
+                            } else if (routing.messageRoute != null) {
+                                long iid = routing.messageRoute.getTargetInstance().getInstanceId();
+                                ReplayerContext ctx = findReplayedInstance(iid);
+                                if (ctx == null) {
+                                    throw new IllegalStateException("Trying to hit existing instance via live communication, but there's no such instance mex:" + mex + " iid:" + iid);
+                                }
+                                __log.debug("hitting existing instance via live communication mex:" + mex + " iid:" + iid);
+                                
+                                ctx.runtimeContext.inputMsgMatch(routing.messageRoute.getGroupId(), routing.messageRoute.getIndex(), mex);
+                                routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), ctx.runtimeContext.getDAO());
+
+                                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+                                mex.getDAO().setInstance(routing.messageRoute.getTargetInstance());
+                                ctx.runtimeContext.execute();
+                            }
+                            return false;
+                        }
+                    });
+        } else if (we.getType() == WorkEvent.Type.INVOKE_RESPONSE) {
+            __log.debug("reply for live communication");
+            ReplayerContext ctx = findReplayedInstance(we.getIID());
+            assert ctx != null;
+            ctx.runtimeContext.invocationResponse(we.getMexId(), we.getChannel());
+            ctx.runtimeContext.execute();
+        }
+    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java Tue Feb 16 16:08:06 2010
@@ -38,12 +38,15 @@
 import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
 import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl;
 import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
+import org.apache.ode.bpel.engine.replayer.ReplayerContext.AnswerResult;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
 import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.ode.bpel.runtime.PartnerLinkInstance;
 import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.bpel.runtime.channels.ActivityRecoveryChannel;
+import org.apache.ode.bpel.runtime.channels.FaultData;
 import org.apache.ode.bpel.runtime.channels.InvokeResponseChannel;
 import org.apache.ode.bpel.runtime.channels.PickResponseChannel;
 import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
@@ -80,68 +83,95 @@
         __log.debug("cancel " + timerResponseChannel.export());
         super.cancel(timerResponseChannel);
     }
+    
+    
+
+    @Override
+    public void checkInvokeExternalPermission() {
+        throw new IllegalStateException("Invoking external services is disabled during replaying");
+    }
 
     @Override
     public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage, InvokeResponseChannel channel) throws FaultException {
         __log.debug("invoke");
+        AnswerResult answerResult = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName(), outgoingMessage, getCurrentEventDateTime());
 
-        Exchange answer = replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(), operation.getName(), outgoingMessage, getCurrentEventDateTime());
-
-        PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
+        if (answerResult.isLive) {
+            return super.invoke(aid, partnerLink, operation, outgoingMessage, channel);
+        } else {
+            PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
 
-        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+            MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
 
-        mexDao.setCreateTime(new Date(getCurrentEventDateTime().getTime() + 1));
-        mexDao.setOperation(operation.getName());
-        mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
-        mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
-        mexDao.setPartnerLink(plinkDAO);
-        mexDao.setPattern((operation.getOutput() != null ? MessageExchangePattern.REQUEST_RESPONSE : MessageExchangePattern.REQUEST_ONLY).toString());
-        mexDao.setProcess(_dao.getProcess());
-        mexDao.setInstance(_dao);
-        {
-            MessageDAO request = mexDao.createMessage(new QName("replayer", "replayer"));
-            request.setData(outgoingMessage);
-            mexDao.setRequest(request);
-        }
-
-        if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_RESPONSE.toString())) {
-            if (answer.isSetFault()) {
-                MessageDAO response = mexDao.createMessage(new QName("replayer", "replayer"));
-                try {
-                    assign(response, answer.getFault());
-                } catch (Exception e) {
-                    throw new FaultException(new QName("replayer", "replayer"), e);
-                }
-                mexDao.setResponse(response);
-                mexDao.setFault(answer.getFault().getType());
-                mexDao.setFaultExplanation(answer.getFault().getExplanation());
-                mexDao.setStatus(Status.FAULT.toString());
-
-            } else if (answer.isSetOut()) {
-                MessageDAO response = mexDao.createMessage(new QName("replayer", "replayer"));
-                try {
-                    assign(response, answer.getOut());
-                } catch (Exception e) {
-                    throw new FaultException(new QName("replayer", "replayer"), e);
+            mexDao.setCreateTime(new Date(getCurrentEventDateTime().getTime() + 1));
+            mexDao.setOperation(operation.getName());
+            mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
+            mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
+            mexDao.setPartnerLink(plinkDAO);
+            mexDao.setPattern((operation.getOutput() != null ? MessageExchangePattern.REQUEST_RESPONSE : MessageExchangePattern.REQUEST_ONLY).toString());
+            mexDao.setProcess(_dao.getProcess());
+            mexDao.setInstance(_dao);
+            {
+                MessageDAO request = mexDao.createMessage(new QName("replayer", "replayer"));
+                request.setData(outgoingMessage);
+                // try {
+                // assign(request, answer.getIn());
+                // } catch (Exception e) {
+                // throw new FaultException(new QName("replayer", "replayer"), e);
+                // }
+                mexDao.setRequest(request);
+            }
+            
+            Exchange answer = answerResult.e;
+    
+            if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_RESPONSE.toString())) {
+                if (answer.isSetFault()) {
+                    MessageDAO response = mexDao.createMessage(new QName("replayer", "replayer"));
+                    try {
+                        assign(response, answer.getFault());
+                    } catch (Exception e) {
+                        throw new FaultException(new QName("replayer", "replayer"), e);
+                    }
+                    mexDao.setResponse(response);
+                    mexDao.setFault(answer.getFault().getType());
+                    mexDao.setFaultExplanation(answer.getFault().getExplanation());
+                    mexDao.setStatus(Status.FAULT.toString());
+    
+                } else if (answer.isSetOut()) {
+                    MessageDAO response = mexDao.createMessage(new QName("replayer", "replayer"));
+                    try {
+                        assign(response, answer.getOut());
+                    } catch (Exception e) {
+                        throw new FaultException(new QName("replayer", "replayer"), e);
+                    }
+                    mexDao.setResponse(response);
+                    mexDao.setStatus(Status.RESPONSE.toString());
+                } else if (answer.isSetFailure()) {
+                    mexDao.setFaultExplanation(answer.getFailure().getExplanation());
+                    mexDao.setStatus(Status.FAILURE.toString());
+                } else {
+                    // We don't have output for in-out operation - resulting with
+                    // replayer error to the top
+                    throw new IllegalStateException("I don't have response for invoke " + answer);
                 }
-                mexDao.setResponse(response);
-                mexDao.setStatus(Status.RESPONSE.toString());
-            } else if (answer.isSetFailure()) {
-                mexDao.setFaultExplanation(answer.getFailure().getExplanation());
-                mexDao.setStatus(Status.FAILURE.toString());
+                
+                final String channel2 = channel.export();
+                final String mexid = mexDao.getMessageExchangeId();
+                replayerContext.scheduler.scheduleReplayerJob(new Callable() {
+                    public Object call() throws Exception {
+                        __log.debug("executing invoke response " + channel2);
+                        invocationResponse(mexid, channel2);
+                        execute();
+                        return null;
+                    }
+                }, getCurrentEventDateTime(), this);
             } else {
-                // We don't have output for in-out operation - resulting with
-                // replayer error to the top
-                throw new IllegalStateException("I don't have response for invoke " + answer);
+                // in only - continuing
+                mexDao.setStatus(Status.COMPLETED_OK.toString());
             }
-            invocationResponse(mexDao.getMessageExchangeId(), channel.export());
-        } else {
-            // in only - continuing
-            mexDao.setStatus(Status.COMPLETED_OK.toString());
+    
+            return mexDao.getMessageExchangeId();
         }
-
-        return mexDao.getMessageExchangeId();
     }
 
     public static class TimerResume extends JacobRunnable {
@@ -178,6 +208,19 @@
         }
     }
 
+    
+    @Override
+    public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason, Date dateTime, Element details, String[] actions, int retries) {
+        super.registerActivityForRecovery(channel, activityId, reason, dateTime, details, actions, retries);
+        replayerContext.checkRollbackOnFault();
+    }
+    
+    @Override
+    public void completedFault(FaultData faultData) {
+        super.completedFault(faultData);
+        replayerContext.checkRollbackOnFault();
+    }
+
     @Override
     public void reply(PartnerLinkInstance plinkInstnace, String opName, String mexId, Element msg, QName fault) throws FaultException {
         String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
@@ -188,12 +231,18 @@
 
         MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
 
-        MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput().getMessage().getQName());
-        buildOutgoingMessage(message, msg);
+        String pipedId = mex.getPipedMessageExchangeId();
+        if (pipedId != null) {
+            __log.debug("instance replied for live communication:" + mexRef + " " + DOMUtils.domToString(msg));
+            super.reply2(plinkInstnace, opName, mexId, msg, fault, false, mexRef);
+        } else {
+            MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput().getMessage().getQName());
+            buildOutgoingMessage(message, msg);
 
-        __log.debug("instance replied mexRef:" + mexRef + " " + DOMUtils.domToString(msg));
-        mex.setResponse(message);
-        mex.setStatus(Status.RESPONSE.toString());
+            __log.debug("instance replied mexRef:" + mexRef + " " + DOMUtils.domToString(msg));
+            mex.setResponse(message);
+            mex.setStatus(Status.RESPONSE.toString());
+        }
     }
 
     @Override
@@ -202,6 +251,7 @@
         __log.debug("select " + pickResponseChannel + " " + ObjectPrinter.toString(selectors, selectors));
     }
 
+
     public ProcessInstanceDAO getDAO() {
         return _dao;
     }
@@ -233,7 +283,7 @@
     }
 
     public void handleIncomingRequest(final MyRoleMessageExchangeImpl mex, final Date currentEventDateTime) {
-        __log.debug("handleIncomingRequest " + mex);
+        __log.debug("handleIncomingRequest for mock communication " + mex);
 
         setCurrentEventDateTime(currentEventDateTime);
 
@@ -241,7 +291,7 @@
             public boolean invoke(PartnerLinkMyRoleImpl target, RoutingInfo routing, boolean createInstance) {
                 if (routing.messageRoute == null && createInstance) {
                     // No route but we can create a new instance
-                    throw new IllegalStateException("Mex caused creation of new instance " + mex);
+                    throw new IllegalStateException("Mock type M mex caused creation of new instance " + mex);
                 } else if (routing.messageRoute != null) {
                     if (!routing.messageRoute.getTargetInstance().getInstanceId().equals(_dao.getInstanceId())) {
                         throw new IllegalStateException("Routed target instance is not equal to replayed instance");

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java Tue Feb 16 16:08:06 2010
@@ -75,6 +75,7 @@
     public ReplayerBpelRuntimeContextImpl runtimeContext;
     
     public Map<QName, ServiceConfig> servicesConfig = new HashMap<QName, ServiceConfig>();
+    public CommunicationType replayerConfig;
 
     public final Date replayStartDate;
 
@@ -100,7 +101,7 @@
             }
         }
 
-        public Exchange fetchAnswer(QName service, String operation, Element outgoingMessage, Date currentEventDateTime) {
+        public AnswerResult fetchAnswer(QName service, String operation, Element outgoingMessage, Date currentEventDateTime) {
             __log.debug("fetching answer for " + service + " " + operation);
             
             ServiceConfig cfg = getServiceConfig(service);
@@ -114,9 +115,11 @@
                 }
                 v.answerPos++;
                 __log.debug("fetched " + e);
-                return e;
+                return new AnswerResult(false, e);
             } else if (cfg.getReplayType().isSetMockQuery()) {
-                return fetchMockQuery(service, operation, outgoingMessage, cfg);
+                return new AnswerResult(false, fetchMockQuery(service, operation, outgoingMessage, cfg));
+            } else if (cfg.getReplayType().isSetLive()) {
+                return new AnswerResult(true, null);
             } else assert(false);
             return null;
         }
@@ -220,6 +223,8 @@
     public void init(final CommunicationType r, ReplayerScheduler scheduler) throws Exception {
         this.scheduler = scheduler;
         
+        replayerConfig = r;
+        
         for (ServiceConfig s : r.getServiceConfigList()) {
             servicesConfig.put(s.getService(), s);
         }
@@ -285,10 +290,6 @@
 
     }
 
-    public void run() throws Exception {
-        scheduler.startReplaying();
-    }
-
     public ReplayerContext(Date replayStartDate) {
         super();
         this.replayStartDate = replayStartDate;
@@ -303,4 +304,24 @@
             return c;
         } else return c;
     }
+    
+    public void checkRollbackOnFault() {
+        if (replayerConfig.getRollbackOnFault()) {
+            RuntimeException e = new RuntimeException("Process instance run into fault.");
+            if (__log.isDebugEnabled()) {
+                __log.debug("", e);
+            }
+            throw e;
+        }
+    }
+    
+    public static class AnswerResult {
+        public final boolean isLive;
+        public final Exchange e;
+        public AnswerResult(boolean isLive, Exchange e) {
+            super();
+            this.isLive = isLive;
+            this.e = e;
+        }
+    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java Tue Feb 16 16:08:06 2010
@@ -26,6 +26,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 
@@ -38,6 +39,10 @@
 public class ReplayerScheduler implements Scheduler {
     private static final Log __log = LogFactory.getLog(ReplayerScheduler.class);
 
+    public Replayer replayer;
+    
+    public static ThreadLocal<TaskElement> currentTaskElement = new ThreadLocal<TaskElement>();
+    
     private PriorityQueue<TaskElement> taskQueue = new PriorityQueue<TaskElement>();
 
     private static class TaskElement implements Comparable<TaskElement> {
@@ -92,14 +97,21 @@
     public void shutdown() {
     }
 
-    public void startReplaying() throws Exception {
+    public void startReplaying(Replayer replayer) throws Exception {
+        this.replayer = replayer;
         while (!taskQueue.isEmpty()) {
             TaskElement taskElement = taskQueue.remove();
-            __log.debug("executing action at time " + taskElement.when);
-            if (taskElement.runtimeContext != null) {
-                taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+            
+            try {
+                currentTaskElement.set(taskElement);
+                __log.debug("executing action at time " + taskElement.when);
+                if (taskElement.runtimeContext != null) {
+                    taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+                }
+                taskElement.action.call();
+            } finally {
+                currentTaskElement.set(null);
             }
-            taskElement.action.call();
         }
     }
 
@@ -117,18 +129,31 @@
         return null;
     }
 
-    public String schedulePersistedJob(Map<String, Object> jobDetail, Date when) throws ContextException {
+    public String schedulePersistedJob(final Map<String, Object> jobDetail, final Date when1) throws ContextException {
+        final Date when = when1 == null ? currentTaskElement.get().when : when1;
+        __log.debug("schedulePersistedJob " + jobDetail + " " + when, new Exception());
+        scheduleReplayerJob(new Callable<Void>() {
+            public Void call() throws Exception {
+                replayer.handleWorkEvent(jobDetail, when);
+                return null;
+            }
+        }, when, null);
+        
         return null;
     }
 
+    public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
+    }
+
     public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail, Date when) throws ContextException {
+        // TODO Auto-generated method stub
+        __log.debug("scheduleVolatileJob");
         return null;
     }
 
     public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail) throws ContextException {
+        // TODO Auto-generated method stub
+        __log.debug("scheduleVolatileJob");
         return null;
     }
-
-    public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
-    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd Tue Feb 16 16:08:06 2010
@@ -44,6 +44,7 @@
         <xs:choice>
             <xs:element name="mock" type="anySimpleType"></xs:element>
             <xs:element name="mockQuery" type="string"></xs:element>
+            <xs:element name="live" type="anySimpleType"></xs:element>
         </xs:choice>
     </xs:complexType>
 
@@ -107,6 +108,7 @@
     <complexType name="CommunicationType">
         <sequence>
             <element name="processType" type="QName" />
+            <element name="rollbackOnFault" type="boolean" minOccurs="0" default="true"/>
 
             <element name="serviceConfig" maxOccurs="unbounded">
                 <complexType>

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/java/org/apache/ode/jbi/JbiTestBase.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/java/org/apache/ode/jbi/JbiTestBase.java?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/java/org/apache/ode/jbi/JbiTestBase.java (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/java/org/apache/ode/jbi/JbiTestBase.java Tue Feb 16 16:08:06 2010
@@ -181,7 +181,7 @@
     	            String result = inputStreamToString(connection.getInputStream());
     	            
     	            log.debug(getTestName() + " have result: " + result);
-    	            matchResponse(expectedResponse, result);
+    	            matchResponse(expectedResponse, result, true);
     	        }
             }
             {
@@ -192,11 +192,14 @@
     	            io.setOperation(QName.valueOf(testProperties.getProperty(prefix + "nmr.operation")));
     	            io.getInMessage().setContent(new StreamSource(new ByteArrayInputStream(request.getBytes())));
     	            smxClient.sendSync(io,20000);
-    	            assertEquals(ExchangeStatus.ACTIVE,io.getStatus());
-    	            assertNotNull(io.getOutMessage());
-    	            String result = new SourceTransformer().contentToString(io.getOutMessage());
-    	            matchResponse(expectedResponse, result);
-    	            smxClient.done(io);
+                    if (io.getStatus() == ExchangeStatus.ACTIVE) {
+                        assertNotNull(io.getOutMessage());
+                        String result = new SourceTransformer().contentToString(io.getOutMessage());
+                        matchResponse(expectedResponse, result, true);
+                        smxClient.done(io);
+                    } else {
+                        matchResponse(expectedResponse, "", false);
+                    }
     	        }
             }
             
@@ -207,8 +210,12 @@
             enableProcess(getTestName(), false);
     }
     
-    protected void matchResponse(String expectedResponse, String result) {
-        assertTrue("Response doesn't match expected regex.\nExpected: " + expectedResponse + "\nReceived: " + result, Pattern.compile(expectedResponse, Pattern.DOTALL).matcher(result).matches());
+    protected void matchResponse(String expectedResponse, String result, boolean succeeded) {
+        if (succeeded) {
+            assertTrue("Response doesn't match expected regex.\nExpected: " + expectedResponse + "\nReceived: " + result, Pattern.compile(expectedResponse, Pattern.DOTALL).matcher(result).matches());
+        } else {
+            assertTrue("Expected success, but got fault", expectedResponse.equals("FAULT"));
+        }
     }
     
     private String inputStreamToString(InputStream is) throws IOException {

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation.bpel
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation.bpel?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation.bpel (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation.bpel Tue Feb 16 16:08:06 2010
@@ -22,10 +22,12 @@
   <bpws:import importType="http://schemas.xmlsoap.org/wsdl/" location="OnEventCorrelationArtifacts.wsdl" namespace="http://sample.bpel.org/bpel/sampleArtifacts"/>
   <bpws:partnerLinks>
     <bpws:partnerLink myRole="OnEventCorrelationProvider" partnerRole="OnEventCorrelationProvider" name="client" partnerLinkType="tns:OnEventCorrelation"/>
+    <bpws:partnerLink partnerRole="OnEventCorrelationProvider" name="client2" partnerLinkType="tns:OnEventCorrelation"/>
   </bpws:partnerLinks>
   <bpws:variables>
     <bpws:variable messageType="tns:OnEventCorrelationMessage" name="input"/>
     <bpws:variable messageType="tns:OnEventCorrelationMessage" name="output"/>
+    <bpws:variable messageType="tns:OnEventCorrelationMessage" name="output2"/>
     <bpws:variable type="xsd:int" name="i"/>
     <bpws:variable type="xsd:int" name="j"/>
   </bpws:variables>
@@ -41,6 +43,7 @@
           </bpws:correlations>
         </bpws:receive>
         <bpws:invoke partnerLink="client" operation="initiate" outputVariable="output" inputVariable="input"/>
+
         <bpws:assign>
           <bpws:copy>
             <bpws:from>concat('first-', $ode:currentEventDateTime, ';', $output.payload)</bpws:from>
@@ -51,7 +54,25 @@
             <bpws:to>$i</bpws:to>
           </bpws:copy>
         </bpws:assign>
+        
+        <bpws:if>
+            <bpws:condition>$input.payload2 = 'livetest'</bpws:condition>
+            <bpws:sequence>
+                <bpws:invoke partnerLink="client" operation="complete" outputVariable="output2" inputVariable="input"/>
+                <bpws:assign>
+                  <bpws:copy>
+                    <bpws:from>concat('second-', $ode:currentEventDateTime, ';', $output2.payload, ";", $output.payload)</bpws:from>
+                    <bpws:to>$output.payload</bpws:to>
+                  </bpws:copy>
+                </bpws:assign>
+            </bpws:sequence>
+        </bpws:if>
+            
         <bpws:reply operation="initiate" partnerLink="client" variable="output" messageExchange="mex1"/>
+        <bpws:if>
+          <bpws:condition>$input.payload2 = 'rollbackOnFault'</bpws:condition>
+          <bpws:throw faultName="abc"/>
+        </bpws:if>
       </bpws:sequence>
     </bpws:scope>
     <bpws:while>

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation2.bpel
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation2.bpel?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation2.bpel (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/OnEventCorrelation2.bpel Tue Feb 16 16:08:06 2010
@@ -29,10 +29,16 @@
     <bpws:variable type="xsd:int" name="i"/>
     <bpws:variable type="xsd:int" name="j"/>
   </bpws:variables>
+  <bpws:correlationSets>
+    <bpws:correlationSet name="CorrelationSet" properties="ns:input ns:input2"/>
+  </bpws:correlationSets>
   <bpws:sequence name="main">
     <bpws:scope>
       <bpws:sequence>
         <bpws:receive createInstance="yes" operation="initiate" partnerLink="client" portType="tns:OnEventCorrelation" variable="input" messageExchange="mex1">
+                  <bpws:correlations>
+                    <bpws:correlation initiate="yes" set="CorrelationSet"/>
+                  </bpws:correlations>
         </bpws:receive>
         <bpws:assign>
           <bpws:copy>
@@ -41,7 +47,27 @@
           </bpws:copy>
         </bpws:assign>
         <bpws:reply operation="initiate" partnerLink="client" variable="output" messageExchange="mex1"/>
-      </bpws:sequence>
+
+        <bpws:if>
+            <bpws:condition>$input.payload2 = 'livetest'</bpws:condition>
+            <bpws:sequence>
+                <bpws:receive createInstance="no" operation="complete" partnerLink="client" portType="tns:OnEventCorrelation" variable="input" messageExchange="mex2">
+                  <bpws:correlations>
+                    <bpws:correlation initiate="no" set="CorrelationSet"/>
+                  </bpws:correlations>
+                </bpws:receive>
+                <bpws:assign>
+                  <bpws:copy>
+                    <bpws:from>'test9'</bpws:from>
+                    <bpws:to>$output.payload</bpws:to>
+                  </bpws:copy>
+                </bpws:assign>
+                <bpws:reply operation="complete" partnerLink="client" variable="output" messageExchange="mex2"/>
+            </bpws:sequence>
+        </bpws:if>
+
+    </bpws:sequence>
+
     </bpws:scope>
   </bpws:sequence>
 </bpws:process>

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/deploy.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/deploy.xml?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/deploy.xml (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/deploy.xml Tue Feb 16 16:08:06 2010
@@ -29,7 +29,10 @@
             <dd:service name="bpel:OnEventCorrelationInit" port="OnEventCorrelation" />
         </dd:provide>
         <dd:invoke partnerLink="client">
-            <dd:service name="bpel:OnEventCorrelation2Fwd" port="OnEventCorrelation2" />
+            <dd:service name="bpel:OnEventCorrelation2" port="OnEventCorrelation2" />
+        </dd:invoke>
+        <dd:invoke partnerLink="client2">
+            <dd:service name="bpel:OnEventCorrelation2NonExistent" port="OnEventCorrelation2" />
         </dd:invoke>
     </dd:process>
     <dd:process

Added: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml?rev=910573&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml (added)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml Tue Feb 16 16:08:06 2010
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<initiate>
+   <payload>abc7</payload>
+   <payload2>livetest</payload2>
+</initiate>
+

Propchange: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/initiateLiveRequest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml?rev=910573&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml (added)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml Tue Feb 16 16:08:06 2010
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<pmap:replay xmlns:ns="http://www.apache.org/ode/pmapi/types/2006/08/02/" xmlns:pmap="http://www.apache.org/ode/pmapi">
+    <replay>
+        <!--
+            <ns:replaceInstance>454277</ns:replaceInstance>
+            <ns:upgradeInstance>?</ns:upgradeInstance>
+-->
+        <ns:restoreInstance>
+            <ns:processType xmlns:p="http://sample.bpel.org/bpel/sample">p:OnEventCorrelation</ns:processType>
+            <ns:serviceConfig>
+                <ns:service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelation</ns:service>
+                <ns:replayType>
+                    <ns:live/>
+                </ns:replayType>
+            </ns:serviceConfig>
+
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+                <type>M</type>
+                <createTime>2009-04-01T16:41:29.873+02:00</createTime>
+                <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelationInit</service>
+                <operation>initiate</operation>
+                <in>
+                    <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                        <payload>abc7</payload>
+                        <payload2>livetest</payload2>
+                    </initiate>
+                </in>
+                <out>
+                    <message xmlns="">
+                        <payload>test1</payload>
+                        <payload2 />
+                    </message>
+                </out>
+            </exchange>
+
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+                <type>M</type>
+                <createTime>2009-04-01T16:41:40.873+02:00</createTime>
+                <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelationInit</service>
+                <operation>initiate</operation>
+                <in>
+                    <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                        <payload>abc7</payload>
+                        <payload2>livetest</payload2>
+                    </initiate>
+                </in>
+                <out>
+                    <message xmlns="">
+                        <payload>test1</payload>
+                        <payload2 />
+                    </message>
+                </out>
+            </exchange>
+        </ns:restoreInstance>
+    </replay>
+    <balancerKey></balancerKey>
+</pmap:replay>
+

Propchange: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayLiveRequest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml?rev=910573&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml (added)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml Tue Feb 16 16:08:06 2010
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<pmap:replay xmlns:ns="http://www.apache.org/ode/pmapi/types/2006/08/02/" xmlns:pmap="http://www.apache.org/ode/pmapi">
+	<replay>
+<!--
+            <ns:replaceInstance>454277</ns:replaceInstance>
+            <ns:upgradeInstance>?</ns:upgradeInstance>
+-->
+            <ns:restoreInstance>
+	    <ns:processType xmlns:p="http://sample.bpel.org/bpel/sample">p:OnEventCorrelation</ns:processType>
+        <rollbackOnFault xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">true</rollbackOnFault>
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+               <type>M</type>
+               <createTime>2009-04-01T16:41:29.873+02:00</createTime>
+               <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelationInit</service>
+               <operation>initiate</operation>
+               <in>
+                  <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                     <payload>abc7</payload>
+                     <payload2>rollbackOnFault</payload2>
+                  </initiate>
+               </in>
+               <out>
+                  <message xmlns="">
+                     <payload>test1</payload>
+                     <payload2/>
+                  </message>
+               </out>
+            </exchange>
+
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+               <type>P</type>
+               <createTime>2009-04-01T16:41:32.998+02:00</createTime>
+               <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelation</service>
+               <operation>initiate</operation>
+               <in>
+                  <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                     <payload>abc7</payload>
+                     <payload2>abc8</payload2>
+                  </initiate>
+               </in>
+               <out>
+                  <message xmlns="">
+                     <payload>test5</payload>
+                     <payload2/>
+                  </message>
+               </out>
+            </exchange>
+
+            </ns:restoreInstance>
+	</replay>
+    <balancerKey></balancerKey>
+</pmap:replay>
+

Propchange: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml?rev=910573&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml (added)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml Tue Feb 16 16:08:06 2010
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<pmap:replay xmlns:ns="http://www.apache.org/ode/pmapi/types/2006/08/02/" xmlns:pmap="http://www.apache.org/ode/pmapi">
+	<replay>
+<!--
+            <ns:replaceInstance>454277</ns:replaceInstance>
+            <ns:upgradeInstance>?</ns:upgradeInstance>
+-->
+            <ns:restoreInstance>
+	    <ns:processType xmlns:p="http://sample.bpel.org/bpel/sample">p:OnEventCorrelation</ns:processType>
+        <rollbackOnFault xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">false</rollbackOnFault>
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+               <type>M</type>
+               <createTime>2009-04-01T16:41:29.873+02:00</createTime>
+               <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelationInit</service>
+               <operation>initiate</operation>
+               <in>
+                  <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                     <payload>abc7</payload>
+                     <payload2>rollbackOnFault</payload2>
+                  </initiate>
+               </in>
+               <out>
+                  <message xmlns="">
+                     <payload>test1</payload>
+                     <payload2/>
+                  </message>
+               </out>
+            </exchange>
+
+            <exchange xmlns="http://www.apache.org/ode/pmapi/types/2006/08/02/">
+               <type>P</type>
+               <createTime>2009-04-01T16:41:32.998+02:00</createTime>
+               <service xmlns:sam="http://sample.bpel.org/bpel/sample">sam:OnEventCorrelation</service>
+               <operation>initiate</operation>
+               <in>
+                  <initiate xmlns:sam="http://sample.bpel.org/bpel/sample" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="">
+                     <payload>abc7</payload>
+                     <payload2>abc8</payload2>
+                  </initiate>
+               </in>
+               <out>
+                  <message xmlns="">
+                     <payload>test5</payload>
+                     <payload2/>
+                  </message>
+               </out>
+            </exchange>
+
+            </ns:restoreInstance>
+	</replay>
+    <balancerKey></balancerKey>
+</pmap:replay>
+

Propchange: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/replayRollbackOnFault2.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/test.properties
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/test.properties?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/test.properties (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/ReplayerJbiTest/test.properties Tue Feb 16 16:08:06 2010
@@ -33,3 +33,22 @@
 4request=@initiateRequest.xml
 4response=.*>timer-2009-04-01T14:42:00.873Z;timer-2009-04-01T14:41:50.873Z;request-2009-04-01T14:41:40.873Z;timer-2009-04-01T14:41:39.873Z;first-2009-04-01T14:41:29.873Z;test6<.*
 
+5nmr.service={http://www.apache.org/ode/pmapi}InstanceManagementService
+5nmr.operation=replay
+5request=@replayRollbackOnFault.xml
+5response=FAULT
+
+6nmr.service={http://www.apache.org/ode/pmapi}InstanceManagementService
+6nmr.operation=replay
+6request=@replayRollbackOnFault2.xml
+6response=.*
+
+7nmr.service={http://www.apache.org/ode/pmapi}InstanceManagementService
+7nmr.operation=replay
+7request=@replayLiveRequest.xml
+7response=.*
+
+8nmr.service={http://sample.bpel.org/bpel/sample}OnEventCorrelationInit
+8nmr.operation=initiate
+8request=@initiateLiveRequest.xml
+8response=.*>timer-2009-04-01T14:42:00.873Z;timer-2009-04-01T14:41:50.873Z;request-2009-04-01T14:41:40.873Z;timer-2009-04-01T14:41:39.873Z;second-2009-04-01T14:41:29.873Z;test9;first-2009-04-01T14:41:29.873Z;test2<.*

Modified: ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties?rev=910573&r1=910572&r2=910573&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties (original)
+++ ode/branches/APACHE_ODE_1.X/jbi/src/test/resources/log4j.properties Tue Feb 16 16:08:06 2010
@@ -41,5 +41,12 @@
 #log4j.category.org.apache.ode.dao.jpa.MessageDAOImpl=TRACE
 log4j.category.httpclient=DEBUG
 log4j.category.httpclient.Wire=DEBUG
-#log4j.category.org.apache.ode.bpel.iapi.BpelEventListener=DEBUG
-
+log4j.category.org.apache.ode.jbi.JbiTestBase=DEBUG
+#log4j.category.org.apache.ode.bpel.runtime.SCOPE=DEBUG
+#log4j.category.org.apache.ode.bpel.runtime.EH_EVENT=DEBUG
+#log4j.category.org.apache.ode.bpel.runtime.EH_ALARM=DEBUG
+#log4j.category.org.apache.ode.daohib.bpel.CorrelatorDaoImpl=DEBUG
+#log4j.category.org.apache.ode.bpel.elang=DEBUG
+#log4j.category.org.apache.ode.bpel.compiler=DEBUG
+log4j.category.org.apache.ode.bpel.iapi.BpelEventListener=DEBUG
+log4j.category.org.apache.ode.bpel.engine=DEBUG