You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/12 02:45:55 UTC

svn commit: r574736 - in /ode/trunk/bpel-runtime/src: main/java/org/apache/ode/bpel/engine/BpelProcess.java test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java

Author: mszefler
Date: Tue Sep 11 17:45:54 2007
New Revision: 574736

URL: http://svn.apache.org/viewvc?rev=574736&view=rev
Log:
Add hydrationlatch wrapper around recoverActivity.

Modified:
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=574736&r1=574735&r2=574736&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Sep 11 17:45:54 2007
@@ -175,23 +175,30 @@
         return "BpelProcess[" + _pid + "]";
     }
 
-    void recoverActivity(ProcessInstanceDAO instanceDAO, final String channel, final long activityId, final String action, final FaultData fault) {
+    void recoverActivity(ProcessInstanceDAO instanceDAO, final String channel, final long activityId, final String action,
+            final FaultData fault) {
         if (__log.isDebugEnabled())
             __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
-        markused();
-        BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceDAO.getInstanceId());
-        final BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(iworker,instanceDAO);
+
+        _hydrationLatch.latch(1);
         try {
-            iworker.execInCurrentThread(new Callable<Void> () {
+            markused();
+            BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceDAO.getInstanceId());
+            final BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(iworker, instanceDAO);
+            try {
+                iworker.execInCurrentThread(new Callable<Void>() {
 
-                public Void call() throws Exception {
-                    processInstance.recoverActivity(channel, activityId, action, fault);
-                    return null;
-                }
-                
-            });
-        } catch (Exception e) {
-            throw new BpelEngineException(e);
+                    public Void call() throws Exception {
+                        processInstance.recoverActivity(channel, activityId, action, fault);
+                        return null;
+                    }
+
+                });
+            } catch (Exception e) {
+                throw new BpelEngineException(e);
+            }
+        } finally {
+            _hydrationLatch.release(1);
         }
     }
 
@@ -226,7 +233,8 @@
                 return;
             }
 
-            mexdao.setPattern((op.getOutput() == null) ? MessageExchangePattern.REQUEST_ONLY : MessageExchangePattern.REQUEST_RESPONSE);
+            mexdao.setPattern((op.getOutput() == null) ? MessageExchangePattern.REQUEST_ONLY
+                    : MessageExchangePattern.REQUEST_RESPONSE);
             if (!processInterceptors(mexdao, InterceptorInvoker.__onProcessInvoked)) {
                 __log.debug("Aborting processing of mex " + mexdao.getMessageExchangeId() + " due to interceptors.");
                 onMyRoleMexAck(mexdao, oldstatus);
@@ -387,7 +395,7 @@
         ProcessInstanceDAO instanceDao = mexdao.getInstance();
         if (instanceDao == null)
             throw new BpelEngineException("InternalError: No instance for partner mex " + mexdao);
-        
+
         BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
 
@@ -400,7 +408,7 @@
     void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
         if (instanceId == null)
             throw new NullPointerException("instanceId was null!");
-        
+
         BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
         iworker.enqueue(_server.new TransactedRunnable(runnable));
     }
@@ -492,7 +500,7 @@
             for (MessageExchangeInterceptor interceptor : _server._contexts.globalIntereceptors)
                 invoker.invoke(interceptor, ictx);
         } catch (FailMessageExchangeException e) {
-            MexDaoUtil.setFailed(mexdao,FailureType.ABORTED, e.getMessage());
+            MexDaoUtil.setFailed(mexdao, FailureType.ABORTED, e.getMessage());
             return false;
         } catch (FaultMessageExchangeException e) {
             MexDaoUtil.setFaulted(mexdao, e.getFaultName(), e.getFaultData());
@@ -1079,14 +1087,14 @@
             if (old == Status.ASYNC) {
                 MyRoleMessageExchangeImpl mymex = _myRoleMexCache.get(mexdao);
                 mymex.onAsyncAck(mexdao);
-                
+
                 try {
                     _contexts.mexContext.onMyRoleMessageExchangeStateChanged(mymex);
                 } catch (Throwable t) {
                     __log.error("Integration layer threw an unexepcted exception.", t);
                 }
             }
-            
+
         }
 
     }
@@ -1253,11 +1261,11 @@
 
         Operation operation = oplink.getPartnerRoleOperation(mexdao.getOperation());
 
-        if (!processInterceptors(mexdao, InterceptorInvoker.__onPartnerInvoked))  {
+        if (!processInterceptors(mexdao, InterceptorInvoker.__onPartnerInvoked)) {
             __log.debug("Partner invocation intercepted.");
             return;
         }
-        
+
         mexdao.setStatus(Status.REQ);
         try {
             if (p2pProcess != null) {

Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java?rev=574736&r1=574735&r2=574736&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java Tue Sep 11 17:45:54 2007
@@ -108,7 +108,7 @@
         execute("FailureToRecovery");
         recover("retry");
         recover("retry");
-        assertTrue(lastInstance().getStatus() == TInstanceStatus.COMPLETED);
+        assertEquals(TInstanceStatus.COMPLETED, lastInstance().getStatus());
         assertNoFailures();
     }
 
@@ -340,7 +340,7 @@
      */
     protected void recover(String action) {
         ArrayList<TActivityInfo> recoveries = getRecoveriesInScope(lastInstance(), null, null);
-        assertTrue(recoveries.size() == 1);
+        assertEquals(1,recoveries.size());
         TActivityInfo activity = recoveries.get(0);
         assertNotNull(activity);
         _management.recoverActivity(Long.valueOf(lastInstance().getIid()), Long.valueOf(activity.getAiid()), action);