You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/08 19:52:39 UTC

svn commit: r563962 - in /ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine: BpelProcess.java BpelRuntimeContextImpl.java MyRoleMessageExchangeImpl.java UnreliableMyRoleMessageExchangeImpl.java WorkEvent.java

Author: mszefler
Date: Wed Aug  8 10:52:37 2007
New Revision: 563962

URL: http://svn.apache.org/viewvc?view=rev&rev=563962
Log:
BART: tweak myrole invokes to do routing at the time of invokeXXX call.
BART: rename some methods in BRC for consistency/clarity.


Modified:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Aug  8 10:52:37 2007
@@ -38,6 +38,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
@@ -188,8 +189,8 @@
 
         _hydrationLatch.latch(1);
         try {
-            // The following check is mostly for sanity purposes. MexImpls should prevent this from 
-            // happening. 
+            // The following check is mostly for sanity purposes. MexImpls should prevent this from
+            // happening.
             PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
             Status oldstatus = Status.valueOf(mexdao.getStatus());
             if (target == null) {
@@ -215,13 +216,13 @@
                 return;
             }
 
-            // "Acknowledge" any one-way invokes 
+            // "Acknowledge" any one-way invokes
             if (op.getOutput() == null) {
                 mexdao.setStatus(Status.ACK.toString());
                 mexdao.setAckType(AckType.ONEWAY);
                 fireMexStateEvent(mexdao, oldstatus, Status.ACK);
             }
-            
+
             mexdao.setProcess(getProcessDAO());
 
             // TODO: fix this
@@ -243,13 +244,32 @@
                 });
 
             } else if (cstatus == CorrelationStatus.MATCHED) {
-                doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
-                    public Void call() {
-                        executeContinueInstanceMyRoleRequestReceived(mexdao);
-                        return null;
-                    }
-                });
-
+                // This should not occur for in-memory processes, since they are technically not allowed to
+                // have any <receive>/<pick> elements that are not start activities. 
+                if (isInMemory())
+                    __log.warn("In-memory process " + _pid + " is participating in a non-createinstance exchange!");
+                
+                
+                // We don't like to do the work in the same TX that did the matching, since this creates fertile 
+                // conditions for deadlock in the correlation tables. However if invocation style is transacted, 
+                // we need to do the work right then and there.
+                
+                if (istyle == InvocationStyle.TRANSACTED)
+                    doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+                        public Void call() {
+                            executeContinueInstanceMyRoleRequestReceived(mexdao);
+                            return null;
+                        }
+                    });
+                else /* non-transacted style */ {
+                    WorkEvent we = new WorkEvent();
+                    we.setType(WorkEvent.Type.MYROLE_INVOKE);
+                    we.setIID(mexdao.getInstance().getInstanceId());
+                    we.setMexId(mexdao.getMessageExchangeId());
+                    we.setProcessId(_pid);
+    
+                    scheduleWorkEvent(we, null);
+                }
             } else if (cstatus == CorrelationStatus.QUEUED) {
                 ; // do nothing
             }
@@ -280,7 +300,7 @@
         int amp = mexdao.getChannel().indexOf('&');
         String groupId = mexdao.getChannel().substring(0, amp);
         int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
-        instance.inputMsgMatch(groupId, idx, mexdao);
+        instance.injectMyRoleMessageExchange(groupId, idx, mexdao);
         instance.execute();
     }
 
@@ -295,6 +315,46 @@
         brc.execute();
     }
 
+
+    void executeContinueInstanceResume(ProcessInstanceDAO instanceDao) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+        assert worker.isWorkerThread();
+        
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+        brc.execute();
+        
+    }
+
+    void executeContinueInstanceTimerReceived(ProcessInstanceDAO instanceDao, String timerChannel) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+        assert worker.isWorkerThread();
+        
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+        if (brc.injectTimerEvent(timerChannel))
+            brc.execute();
+        
+    }
+
+    private void executeContinueInstanceMatcherEvent(ProcessInstanceDAO instanceDao, String correlatorId, CorrelationKey correlationKey) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+        assert worker.isWorkerThread();
+        
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);       
+        if(brc.matcherEvent(correlatorId, correlationKey))
+            brc.execute();
+        
+    }
+
+    private void executeContinueInstancePartnerResponse(MessageExchangeDAO mexDao, String channel) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(mexDao.getInstance().getInstanceId());
+        assert worker.isWorkerThread();
+        
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexDao.getInstance());      
+        brc.injectPartnerResponse(mexDao.getMessageExchangeId(), channel);
+        brc.execute();
+        
+    }
+
     private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
         BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
         iworker.enqueue(runnable);
@@ -428,31 +488,13 @@
             __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
         }
 
-        // Process-level events
-        if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
-            // second stage of my-role invoke for BLOCKING/ASYNC/RELIABLE invocation style.
-            if (__log.isDebugEnabled()) {
-                __log.debug("InvokeInternal event for mexid " + we.getMexId());
+        enqueueInstanceTransaction(we.getIID(), new Runnable() {
+            public void run() {
+                _contexts.scheduler.jobCompleted(jobInfo.jobName);
+                execInstanceEvent(we);
             }
 
-            enqueueTransaction(new Callable<Void>() {
-                public Void call() {
-                    _contexts.scheduler.jobCompleted(jobInfo.jobName);
-                    MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
-                    invokeProcess(mexdao);
-                    return null;
-                }
-            });
-
-        } else /* instance-level events */{
-            enqueueInstanceTransaction(we.getIID(), new Runnable() {
-                public void run() {
-                    _contexts.scheduler.jobCompleted(jobInfo.jobName);
-                    execInstanceEvent(we);
-                }
-
-            });
-        }
+        });
 
     }
 
@@ -472,6 +514,8 @@
         assert worker.isWorkerThread();
 
         ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(we.getIID());
+        MessageExchangeDAO mexDao = we.getMexId() == null ? null : loadMexDao(we.getMexId());
+        
         if (instanceDAO == null) {
             if (__log.isDebugEnabled()) {
                 __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
@@ -479,32 +523,26 @@
             return;
         }
 
-        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDAO);
+        if (__log.isDebugEnabled()) {
+            __log.debug("handleWorkEvent: " + we.getType() + " event for process instance " + we.getIID());
+        }        
+
         switch (we.getType()) {
+        case MYROLE_INVOKE:
+            executeContinueInstanceMyRoleRequestReceived(mexDao);
+            break;
         case TIMER:
-            if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: TimerWork event for process instance " + brc);
-            }
-            brc.timerEvent(we.getChannel());
+            executeContinueInstanceTimerReceived(instanceDAO, we.getChannel());
             break;
         case RESUME:
-            if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
-            }
-            brc.execute();
+            executeContinueInstanceResume(instanceDAO);
             break;
         case PARTNER_RESPONSE:
-            if (__log.isDebugEnabled()) {
-                __log.debug("InvokeResponse event for iid " + we.getIID());
-            }
-            brc.injectPartnerResponse(we.getMexId(), we.getChannel());
-            brc.execute();
+            executeContinueInstancePartnerResponse(mexDao, we.getChannel());
             break;
         case MATCHER:
-            if (__log.isDebugEnabled()) {
-                __log.debug("Matcher event for iid " + we.getIID());
-            }
-            brc.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+            executeContinueInstanceMatcherEvent(instanceDAO, we.getCorrelatorId(), we.getCorrelationKey());
+            break;
         }
     }
 
@@ -1139,9 +1177,9 @@
     }
 
     public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
-//        if (isInMemory())
-//            throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
-        
+        // if (isInMemory())
+        // throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
+
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
     }
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Aug  8 10:52:37 2007
@@ -354,7 +354,7 @@
             for (int i = 0; i < correlators.size(); ++i) {
                 CorrelatorDAO ci = correlators.get(i);
                 if (ci.equals(_dao.getInstantiatingCorrelator())) {
-                    inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
+                    injectMyRoleMessageExchange(pickResponseChannelStr, i, _instantiatingMessageExchange);
                     if (BpelProcess.__log.isDebugEnabled()) {
                         BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": FOUND match for NEW instance mexRef="
                                 + _instantiatingMessageExchange);
@@ -924,7 +924,7 @@
         _instanceWorker.setCachedState(newcount, _soup);
     }
 
-    void inputMsgMatch(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
+    void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
         // if we have a message match, this instance should be marked
         // active if it isn't already
         if (_dao.getState() == ProcessState.STATE_READY) {
@@ -954,7 +954,7 @@
         });
     }
 
-    void timerEvent(final String timerResponseChannel) {
+    boolean injectTimerEvent(final String timerResponseChannel) {
         // In case this is a pick event, we remove routes,
         // and cancel the outstanding requests.
         _dao.getProcess().removeRoutes(timerResponseChannel, _dao);
@@ -962,7 +962,7 @@
 
         // Ignore timer events after the process is finished.
         if (ProcessState.isFinished(_dao.getState())) {
-            return;
+            return false;
         }
 
         _vpu.inject(new JacobRunnable() {
@@ -973,7 +973,8 @@
                 responseChannel.onTimeout();
             }
         });
-        execute();
+        
+        return true;
     }
 
     public void cancel(final TimerResponseChannel timerResponseChannel) {
@@ -1308,7 +1309,7 @@
      * Attempt to match message exchanges on a correlator.
      * 
      */
-    void matcherEvent(String correlatorId, CorrelationKey ckey) {
+    boolean matcherEvent(String correlatorId, CorrelationKey ckey) {
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
         }
@@ -1320,7 +1321,7 @@
         if (mroute == null) {
             // Ok, this means that a message arrived before we did, so nothing to do.
             __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
-            return;
+            return false;
         }
 
         // Now see if there is a message that matches this selector.
@@ -1336,12 +1337,14 @@
                 BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + ckey);
             }
 
-            inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mexdao);
-            execute();
+            injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
+            return true;
         } else {
             __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
 
         }
+        
+        return false;
     }
 
     private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Aug  8 10:52:37 2007
@@ -1,5 +1,6 @@
 package org.apache.ode.bpel.engine;
 
+import java.util.Date;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
@@ -126,14 +127,7 @@
         we.setProcessId(_process.getPID());
         we.setMexId(_mexId);
 
-        // Schedule a timeout
-        final WorkEvent we1 = new WorkEvent();
-        we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
-        we1.setProcessId(_process.getPID());
-        we1.setMexId(_mexId);
-
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-        _contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
 
     }
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Wed Aug  8 10:52:37 2007
@@ -56,10 +56,7 @@
                 request();
                 MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
                 save(dao);
-                if (_process.isInMemory()) 
-                    _process.invokeProcess(dao);
-                else
-                    scheduleInvoke();
+                _process.invokeProcess(dao);
                 return null;
             }
             

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Aug  8 10:52:37 2007
@@ -59,6 +59,10 @@
         return _jobDetail;
     }
 
+    public String toString() {
+        return "WorkEvent" + _jobDetail;
+    }
+    
     public enum Type {
         TIMER, 
         
@@ -72,8 +76,7 @@
         /** Invoke a "my role" operation (i.e. implemented by the process). */
         MYROLE_INVOKE, 
         
-        /** Timer event for "my role" invocations that are taking too long. */
-        MYROLE_INVOKE_TIMEOUT, MYROLE_INVOKE_ASYNC_RESPONSE
+        MYROLE_INVOKE_ASYNC_RESPONSE
     }
 
     public String getChannel() {