You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/19 02:33:40 UTC

svn commit: r577122 - in /ode/trunk/bpel-runtime/src: main/java/org/apache/ode/bpel/engine/BpelServerImpl.java test/java/org/apache/ode/bpel/runtime/MockBpelServer.java

Author: mszefler
Date: Tue Sep 18 17:33:39 2007
New Revision: 577122

URL: http://svn.apache.org/viewvc?rev=577122&view=rev
Log:
Tweaked (again) the waitForQuiessence logic.. Not pretty.. but seems to work.

Modified:
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=577122&r1=577121&r2=577122&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Sep 18 17:33:39 2007
@@ -29,6 +29,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -120,6 +121,11 @@
      */
     private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
 
+    /**
+     * The last time we started a {@link ServerCallable}. Useful for keeping track of idleness.
+     */
+    private final AtomicLong _lastTimeOfServerCallable = new AtomicLong(System.currentTimeMillis());
+
     static {
         // TODO Clean this up and factorize engine configuration
         try {
@@ -145,10 +151,28 @@
     }
 
     protected void waitForQuiessence() {
-        _mngmtLock.writeLock().lock();
-        _mngmtLock.writeLock().unlock();
+        do {
+            _mngmtLock.writeLock().lock();
+            _mngmtLock.writeLock().unlock();
+            long ltime = _lastTimeOfServerCallable.get();
+            try {
+                Thread.sleep(150);
+            } catch (InterruptedException e) {
+                ;
+            }
+            _mngmtLock.writeLock().lock();
+            _mngmtLock.writeLock().unlock();
+            try {
+                Thread.sleep(150);
+            } catch (InterruptedException ie) {
+                ;
+            }
+            if (_lastTimeOfServerCallable.get() == ltime)
+                return;
+        } while (true);
+
     }
-    
+
     public void start() {
         _mngmtLock.writeLock().lock();
         try {
@@ -159,7 +183,6 @@
 
             __log.debug("BPEL SERVER starting.");
 
-
             if (_exec == null)
                 _exec = Executors.newCachedThreadPool();
 
@@ -168,13 +191,13 @@
                 __log.fatal(errmsg);
                 throw new IllegalStateException(errmsg);
             }
-            
-            if (_contexts.scheduler == null) { 
+
+            if (_contexts.scheduler == null) {
                 String errmsg = "Scheduler not specified; call setScheduler(...)!";
                 __log.fatal(errmsg);
                 throw new IllegalStateException(errmsg);
             }
-            
+
             _contexts.scheduler.start();
             _state = State.RUNNING;
             __log.info(__msgs.msgServerStarted());
@@ -433,10 +456,10 @@
                         _contexts.scheduler.jobCompleted(jobInfo.jobName);
                         Date future = new Date(System.currentTimeMillis() + (60 * 1000));
                         __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
-                        _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);            
+                        _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
                         return null;
                     }
-                    
+
                 });
                 return;
             }
@@ -452,7 +475,7 @@
     public void setTransactionManager(TransactionManager txm) {
         _contexts.txManager = txm;
     }
-    
+
     public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
         _dehydrationPolicy = dehydrationPolicy;
     }
@@ -501,14 +524,13 @@
                 assertTransaction();
             else
                 assertNoTransaction();
-            
-            
+
             return target.createNewMyRoleMex(istyle, targetService, operation, clientKey);
         } finally {
             _mngmtLock.readLock().unlock();
         }
     }
-    
+
     public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
 
         _mngmtLock.readLock().lock();
@@ -552,9 +574,9 @@
             };
 
             try {
-                if (inmemdao != null || _contexts.isTransacted()) 
+                if (inmemdao != null || _contexts.isTransacted())
                     return loadMex.call();
-                else 
+                else
                     return enqueueTransaction(loadMex).get();
             } catch (ContextException e) {
                 throw new BpelEngineException(e);
@@ -589,18 +611,18 @@
     MessageExchangeDAO getInMemMexDAO(String mexId) {
         _mngmtLock.readLock().lock();
         try {
-          for (BpelProcess p : _registeredProcesses.values()) {
-              MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
-              if (mexDao != null)
-                  return mexDao;
-          }
+            for (BpelProcess p : _registeredProcesses.values()) {
+                MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
+                if (mexDao != null)
+                    return mexDao;
+            }
         } finally {
             _mngmtLock.readLock().unlock();
         }
-        
+
         return null;
     }
-    
+
     OProcess getOProcess(QName processId) {
         _mngmtLock.readLock().lock();
         try {
@@ -616,7 +638,6 @@
         }
     }
 
-
     <T> Future<T> enqueueTransaction(final Callable<T> transaction) throws ContextException {
         return _exec.submit(new ServerCallable<T>(new TransactedCallable<T>(transaction)));
     }
@@ -624,9 +645,10 @@
     void enqueueRunnable(final Runnable runnable) {
         _exec.submit(new ServerRunnable(runnable));
     }
-    
+
     /**
-     * Schedule a {@link Runnable} object for execution after the completion of the current transaction. 
+     * Schedule a {@link Runnable} object for execution after the completion of the current transaction.
+     * 
      * @param runnable
      */
     void scheduleRunnable(final Runnable runnable) {
@@ -636,12 +658,11 @@
             public void run() {
                 _exec.submit(new ServerRunnable(runnable));
             }
-            
+
         });
-        
+
     }
 
-    
     protected void assertTransaction() {
         if (!_contexts.isTransacted())
             throw new BpelEngineException("Operation must be performed in a transaction!");
@@ -731,48 +752,58 @@
         }
     }
 
-    
-   
-    
+    private void ticktock() {
+        _lastTimeOfServerCallable.set(System.currentTimeMillis());
+
+    }
+
     class ServerRunnable implements Runnable {
         final Runnable _work;
+
         ServerRunnable(Runnable work) {
             _work = work;
         }
-        
+
         public void run() {
+            ticktock();
             _mngmtLock.readLock().lock();
             try {
+                ticktock();
                 _work.run();
+                ticktock();
             } catch (Throwable ex) {
+                ticktock();
                 __log.fatal("Internal Error", ex);
             } finally {
                 _mngmtLock.readLock().unlock();
             }
         }
-        
+
     }
-    
-   
-    
-    class ServerCallable<T> implements Callable<T>{
+
+    class ServerCallable<T> implements Callable<T> {
         final Callable<T> _work;
+
         ServerCallable(Callable<T> work) {
             _work = work;
         }
-        
-        public T call () throws Exception {
+
+        public T call() throws Exception {
+            ticktock();
             _mngmtLock.readLock().lock();
             try {
+                ticktock();
                 return _work.call();
             } catch (Exception ex) {
+                ticktock();
                 __log.fatal("Internal Error", ex);
                 throw ex;
             } finally {
                 _mngmtLock.readLock().unlock();
+                ticktock();
             }
         }
-        
+
     }
 
     class TransactedCallable<T> implements Callable<T> {
@@ -786,7 +817,6 @@
             return _contexts.execTransaction(_work);
         }
     }
-
 
     class TransactedRunnable implements Runnable {
         Runnable _work;

Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=577122&r1=577121&r2=577122&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Sep 18 17:33:39 2007
@@ -296,12 +296,5 @@
 
     public void waitForBlocking() {
         _server.waitForQuiessence();
-        try {
-            Thread.sleep(100);
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        _server.waitForQuiessence();
     }
 }