You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/09 23:40:09 UTC

svn commit: r564394 - in /ode/branches/bart: bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ dao-jpa/src/main/java/org/apache/ode/dao/jpa/

Author: mszefler
Date: Thu Aug  9 14:40:09 2007
New Revision: 564394

URL: http://svn.apache.org/viewvc?view=rev&rev=564394
Log:
Fix bug in jpa where routes were not getting deleted. 
Improve performance of matcher events by not loading BRC until it is determined that the event is valid.

Modified:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
    ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java Thu Aug  9 14:40:09 2007
@@ -52,7 +52,7 @@
      * @param runnable
      */
     synchronized void enqueue(Runnable runnable) {
-        __log.debug("Enqueue work for instance IID " + _iid + ": " + runnable);
+        __log.debug("enqueue: for instance " +  _process.getPID() + "#" + _iid + ": " + runnable);
         _todoQueue.add(runnable);
         // We mayh need to reschedule this thread if we've dropped out of the end of the run() method.
         if (!_running) {
@@ -77,12 +77,10 @@
      *             forwarded from {@link Callable#call()}
      */
     <T> T execInCurrentThread(Callable<T> callable) throws Exception {
-        // If we are in the current thread we can just keep working. allows re-entrant calls.
-        
+
         if (isWorkerThread())
-            return callable.call();
+            throw new BpelEngineException("InternalError: Attempt to reenter instance worker " + toString());
         
-        __log.debug("Importing thread " + Thread.currentThread() + " for IID " + _iid);
         final Semaphore ready = new Semaphore(0);
         final Semaphore finished = new Semaphore(0);
         enqueue(new Runnable() {
@@ -97,7 +95,6 @@
             }
         });
         
-        __log.debug("Blocking main worker thread for IID " + _iid);
         try {
             ready.acquire();
         } catch (InterruptedException ex) {
@@ -105,14 +102,13 @@
             throw new BpelEngineException("Thread interrupted.", ex);
         }
 
-        __log.debug("Executing worker for IID " + _iid + " in imported thread " + Thread.currentThread());
+
         _activeInstance.set(_iid);
         try {
             return doInstanceWork(callable);
         } catch (Exception ex) {
             throw ex;
         } finally {
-            __log.debug("Releasing worker thread for IID " + _iid + " imported thread " + Thread.currentThread());
             finished.release();
             _activeInstance.set(null);
         }
@@ -125,7 +121,7 @@
      * Implementation of the {@link Runnable} interface.
      */
     public void run() {
-        __log.debug("Running worker thread " + Thread.currentThread() + " for instance IID " + _iid);
+        __log.debug("Starting worker thread " + Thread.currentThread() + " for instance IID " + instanceId());
         _activeInstance.set(_iid);
         _workerThread = Thread.currentThread();
         try {
@@ -142,7 +138,6 @@
                     }
 
                     next = _todoQueue.remove(0);
-                    __log.debug("Worker thread "  + Thread.currentThread() + " for instance IID " + _iid + " found work: " + next);
                 }
 
                 try {
@@ -176,19 +171,19 @@
      * @throws Exception
      */
     private <T> T doInstanceWork(Callable<T> work) throws Exception {
-        __log.debug("Doing work for instance " + _iid +" in thread " + Thread.currentThread());
+        __log.debug("Doing work for instance " + instanceId() +" in thread " + Thread.currentThread());
         try {
             return work.call();
         } catch (Exception ex) {
-            __log.error("Work for instance " + _iid + " in thread "  + Thread.currentThread() + " resulted in an exception." ,ex);
+            __log.error("Work for instance " + instanceId() + " in thread "  + Thread.currentThread() + " resulted in an exception." ,ex);
             throw ex;
         } finally {
-            __log.debug("Finished work for instance " + _iid + " in thread " + Thread.currentThread());
+            __log.debug("Finished work for instance " + instanceId() + " in thread " + Thread.currentThread());
         }
     }
 
     public String toString() {
-        return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + _iid + " workerThread="+ _workerThread + "}";
+        return "{BpelInstanceWorker for " + instanceId() + "}";
     }
 
     boolean isWorkerThread() {
@@ -215,5 +210,9 @@
             this.state = state;
         }
     }
-
+    
+    private String instanceId() {
+        return _process.getPID() + "#" + _iid;
+    }
+    
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Thu Aug  9 14:40:09 2007
@@ -41,8 +41,10 @@
 import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.MessageRouteDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -278,7 +280,7 @@
             // If we did not get an ACK during this method, then mark this MEX as needing an ASYNC wake-up
             if (mexdao.getStatus() != Status.ACK)
                 mexdao.setStatus(Status.ASYNC);
-            
+
             assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
         }
 
@@ -329,12 +331,45 @@
 
     private void executeContinueInstanceMatcherEvent(ProcessInstanceDAO instanceDao, String correlatorId,
             CorrelationKey correlationKey) {
-        BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
-        assert worker.isWorkerThread();
 
-        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
-        if (brc.matcherEvent(correlatorId, correlationKey))
+        if (__log.isDebugEnabled()) {
+            __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + correlationKey);
+        }
+
+        CorrelatorDAO correlator = instanceDao.getProcess().getCorrelator(correlatorId);
+
+        // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
+        // So we want to acquire the lock before we do anthing else.
+        MessageRouteDAO mroute = correlator.findRoute(correlationKey);
+        if (mroute == null) {
+            // Ok, this means that a message arrived before we did, so nothing to do.
+            __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
+            return;
+        }
+
+        // Now see if there is a message that matches this selector.
+        MessageExchangeDAO mexdao = correlator.dequeueMessage(correlationKey);
+        if (mexdao != null) {
+            __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
+
+            // We have a match, so we can get rid of the routing entries.
+            correlator.removeRoutes(mroute.getGroupId(), instanceDao);
+
+            // Found message matching one of our selectors.
+            if (__log.isDebugEnabled()) {
+                __log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + correlationKey);
+            }
+
+            BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+            assert worker.isWorkerThread();
+
+            BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+            brc.injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
             brc.execute();
+        } else {
+            __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
+
+        }
 
     }
 
@@ -997,7 +1032,7 @@
     }
 
     void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
-        
+
         if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
 
             BpelProcess caller = _server.getBpelProcess(mexdao.getPipedPID());
@@ -1032,7 +1067,7 @@
 
             if (old == Status.ASYNC)
                 caller.p2pWakeup(pmex);
-            
+
         } else /* not p2p */{
             // TODO: force a myrole mex to be created if it is not in cache.
             for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
@@ -1216,7 +1251,7 @@
         } finally {
             if (mexdao.getStatus() != Status.ACK)
                 mexdao.setStatus(Status.ASYNC);
-            
+
         }
 
         assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
@@ -1250,14 +1285,12 @@
         myRoleMex.setTimeout(partnerRoleMex.getTimeout());
         myRoleMex.setRequest(partnerRoleMex.getRequest());
         myRoleMex.setInvocationStyle(partnerRoleMex.getInvocationStyle());
-        
 
-        // Piped cross-references. 
+        // Piped cross-references.
         myRoleMex.setPipedMessageExchangeId(partnerRoleMex.getMessageExchangeId());
         myRoleMex.setPipedPID(getPID());
         partnerRoleMex.setPipedPID(target.getPID());
         partnerRoleMex.setPipedMessageExchangeId(myRoleMex.getMessageExchangeId());
-        
 
         // Properties used by stateful-exchange protocol.
         String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
@@ -1295,12 +1328,12 @@
 
         try {
             iworker.execInCurrentThread(new Callable<Void>() {
-    
+
                 public Void call() throws Exception {
                     executeContinueInstancePartnerRoleResponseReceived(prolemex);
                     return null;
                 }
-    
+
             });
         } catch (Exception ex) {
             throw new BpelEngineException(ex);

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu Aug  9 14:40:09 2007
@@ -770,6 +770,8 @@
         _dao.setExecutionStateCounter(newcount);
         _dao.setExecutionState(bos.toByteArray());
         _instanceWorker.setCachedState(newcount, _soup);
+        
+        __log.debug("CACHE SAVE: #" + newcount + " for instance " + _dao.getInstanceId());
     }
 
     void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
@@ -1153,47 +1155,4 @@
         fetchPartnerLinkDAO(pLink).setPartnerSessionId(session);
 
     }
-
-    /**
-     * Attempt to match message exchanges on a correlator.
-     * 
-     */
-    boolean matcherEvent(String correlatorId, CorrelationKey ckey) {
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
-        }
-        CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
-
-        // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
-        // So we want to acquire the lock before we do anthing else.
-        MessageRouteDAO mroute = correlator.findRoute(ckey);
-        if (mroute == null) {
-            // Ok, this means that a message arrived before we did, so nothing to do.
-            __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
-            return false;
-        }
-
-        // Now see if there is a message that matches this selector.
-        MessageExchangeDAO mexdao = correlator.dequeueMessage(ckey);
-        if (mexdao != null) {
-            __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
-
-            // We have a match, so we can get rid of the routing entries.
-            correlator.removeRoutes(mroute.getGroupId(), _dao);
-
-            // Found message matching one of our selectors.
-            if (BpelProcess.__log.isDebugEnabled()) {
-                BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + ckey);
-            }
-
-            injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
-            return true;
-        } else {
-            __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
-
-        }
-        
-        return false;
-    }
-
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Thu Aug  9 14:40:09 2007
@@ -156,7 +156,8 @@
                 messageRoute = correlator.findRoute(key);
                 if (messageRoute != null) {
                     if (__log.isDebugEnabled()) {
-                        __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+                        __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " ROUTED TO (grp,index,iid) = (" + messageRoute.getGroupId() + "," + messageRoute.getIndex() + ", " + messageRoute.getTargetInstance().getInstanceId() +  ")");
+   
                     }
                     matchedKey = key;
                     break;

Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java (original)
+++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java Thu Aug  9 14:40:09 2007
@@ -57,8 +57,9 @@
     private ProcessDAOImpl _process;
 
     public CorrelatorDAOImpl(){}
-    public CorrelatorDAOImpl(String correlatorKey) {
+    public CorrelatorDAOImpl(String correlatorKey, ProcessDAOImpl process) {
         _correlatorKey = correlatorKey;
+        _process = process;
     }
 
     public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKey correlationKey) {

Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java (original)
+++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java Thu Aug  9 14:40:09 2007
@@ -48,8 +48,13 @@
 @Entity
 @Table(name="ODE_PROCESS")
 @NamedQueries({
-    @NamedQuery(name="InstanceByCKey", query="SELECT cs._scope._processInstance FROM CorrelationSetDAOImpl as cs WHERE cs._correlationKey = :ckey"),
-    @NamedQuery(name="CorrelatorByKey", query="SELECT c FROM CorrelatorDAOImpl as c WHERE c._correlatorKey = :ckey")
+    @NamedQuery(name="InstanceByCKey", query="SELECT cs._scope._processInstance " +
+            "FROM CorrelationSetDAOImpl as cs " +
+            "WHERE cs._correlationKey = :ckey"),
+            
+    @NamedQuery(name="CorrelatorByKey", query="SELECT c " +
+            "FROM CorrelatorDAOImpl as c" +
+            " WHERE c._correlatorKey = :ckey AND c._process = :process")
 })
 public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
 
@@ -80,7 +85,7 @@
     }
 	
 	public CorrelatorDAO addCorrelator(String correlator) {
-		CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator);
+		CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this);
 		_correlators.add(corr);
         return corr;
     }
@@ -88,6 +93,7 @@
     public CorrelatorDAO getCorrelator(String correlatorId) {
         Query qry = getEM().createNamedQuery("CorrelatorByKey");
         qry.setParameter("ckey", correlatorId);
+        qry.setParameter("process", this);
         List res = qry.getResultList();
         if (res.size() == 0) return null;
         return (CorrelatorDAO) res.get(0);