You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/19 02:33:40 UTC
svn commit: r577122 - in /ode/trunk/bpel-runtime/src:
main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Author: mszefler
Date: Tue Sep 18 17:33:39 2007
New Revision: 577122
URL: http://svn.apache.org/viewvc?rev=577122&view=rev
Log:
Tweaked (again) the waitForQuiessence logic.. Not pretty.. but seems to work.
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=577122&r1=577121&r2=577122&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Sep 18 17:33:39 2007
@@ -29,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -120,6 +121,11 @@
*/
private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
+ /**
+ * The last time we started a {@link ServerCallable}. Useful for keeping track of idleness.
+ */
+ private final AtomicLong _lastTimeOfServerCallable = new AtomicLong(System.currentTimeMillis());
+
static {
// TODO Clean this up and factorize engine configuration
try {
@@ -145,10 +151,28 @@
}
protected void waitForQuiessence() {
- _mngmtLock.writeLock().lock();
- _mngmtLock.writeLock().unlock();
+ do {
+ _mngmtLock.writeLock().lock();
+ _mngmtLock.writeLock().unlock();
+ long ltime = _lastTimeOfServerCallable.get();
+ try {
+ Thread.sleep(150);
+ } catch (InterruptedException e) {
+ ;
+ }
+ _mngmtLock.writeLock().lock();
+ _mngmtLock.writeLock().unlock();
+ try {
+ Thread.sleep(150);
+ } catch (InterruptedException ie) {
+ ;
+ }
+ if (_lastTimeOfServerCallable.get() == ltime)
+ return;
+ } while (true);
+
}
-
+
public void start() {
_mngmtLock.writeLock().lock();
try {
@@ -159,7 +183,6 @@
__log.debug("BPEL SERVER starting.");
-
if (_exec == null)
_exec = Executors.newCachedThreadPool();
@@ -168,13 +191,13 @@
__log.fatal(errmsg);
throw new IllegalStateException(errmsg);
}
-
- if (_contexts.scheduler == null) {
+
+ if (_contexts.scheduler == null) {
String errmsg = "Scheduler not specified; call setScheduler(...)!";
__log.fatal(errmsg);
throw new IllegalStateException(errmsg);
}
-
+
_contexts.scheduler.start();
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
@@ -433,10 +456,10 @@
_contexts.scheduler.jobCompleted(jobInfo.jobName);
Date future = new Date(System.currentTimeMillis() + (60 * 1000));
__log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
return null;
}
-
+
});
return;
}
@@ -452,7 +475,7 @@
public void setTransactionManager(TransactionManager txm) {
_contexts.txManager = txm;
}
-
+
public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
_dehydrationPolicy = dehydrationPolicy;
}
@@ -501,14 +524,13 @@
assertTransaction();
else
assertNoTransaction();
-
-
+
return target.createNewMyRoleMex(istyle, targetService, operation, clientKey);
} finally {
_mngmtLock.readLock().unlock();
}
}
-
+
public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
_mngmtLock.readLock().lock();
@@ -552,9 +574,9 @@
};
try {
- if (inmemdao != null || _contexts.isTransacted())
+ if (inmemdao != null || _contexts.isTransacted())
return loadMex.call();
- else
+ else
return enqueueTransaction(loadMex).get();
} catch (ContextException e) {
throw new BpelEngineException(e);
@@ -589,18 +611,18 @@
MessageExchangeDAO getInMemMexDAO(String mexId) {
_mngmtLock.readLock().lock();
try {
- for (BpelProcess p : _registeredProcesses.values()) {
- MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
- if (mexDao != null)
- return mexDao;
- }
+ for (BpelProcess p : _registeredProcesses.values()) {
+ MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
+ if (mexDao != null)
+ return mexDao;
+ }
} finally {
_mngmtLock.readLock().unlock();
}
-
+
return null;
}
-
+
OProcess getOProcess(QName processId) {
_mngmtLock.readLock().lock();
try {
@@ -616,7 +638,6 @@
}
}
-
<T> Future<T> enqueueTransaction(final Callable<T> transaction) throws ContextException {
return _exec.submit(new ServerCallable<T>(new TransactedCallable<T>(transaction)));
}
@@ -624,9 +645,10 @@
void enqueueRunnable(final Runnable runnable) {
_exec.submit(new ServerRunnable(runnable));
}
-
+
/**
- * Schedule a {@link Runnable} object for execution after the completion of the current transaction.
+ * Schedule a {@link Runnable} object for execution after the completion of the current transaction.
+ *
* @param runnable
*/
void scheduleRunnable(final Runnable runnable) {
@@ -636,12 +658,11 @@
public void run() {
_exec.submit(new ServerRunnable(runnable));
}
-
+
});
-
+
}
-
protected void assertTransaction() {
if (!_contexts.isTransacted())
throw new BpelEngineException("Operation must be performed in a transaction!");
@@ -731,48 +752,58 @@
}
}
-
-
-
+ private void ticktock() {
+ _lastTimeOfServerCallable.set(System.currentTimeMillis());
+
+ }
+
class ServerRunnable implements Runnable {
final Runnable _work;
+
ServerRunnable(Runnable work) {
_work = work;
}
-
+
public void run() {
+ ticktock();
_mngmtLock.readLock().lock();
try {
+ ticktock();
_work.run();
+ ticktock();
} catch (Throwable ex) {
+ ticktock();
__log.fatal("Internal Error", ex);
} finally {
_mngmtLock.readLock().unlock();
}
}
-
+
}
-
-
-
- class ServerCallable<T> implements Callable<T>{
+
+ class ServerCallable<T> implements Callable<T> {
final Callable<T> _work;
+
ServerCallable(Callable<T> work) {
_work = work;
}
-
- public T call () throws Exception {
+
+ public T call() throws Exception {
+ ticktock();
_mngmtLock.readLock().lock();
try {
+ ticktock();
return _work.call();
} catch (Exception ex) {
+ ticktock();
__log.fatal("Internal Error", ex);
throw ex;
} finally {
_mngmtLock.readLock().unlock();
+ ticktock();
}
}
-
+
}
class TransactedCallable<T> implements Callable<T> {
@@ -786,7 +817,6 @@
return _contexts.execTransaction(_work);
}
}
-
class TransactedRunnable implements Runnable {
Runnable _work;
Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=577122&r1=577121&r2=577122&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Sep 18 17:33:39 2007
@@ -296,12 +296,5 @@
public void waitForBlocking() {
_server.waitForQuiessence();
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- _server.waitForQuiessence();
}
}