You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/09 23:40:09 UTC
svn commit: r564394 - in /ode/branches/bart:
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
dao-jpa/src/main/java/org/apache/ode/dao/jpa/
Author: mszefler
Date: Thu Aug 9 14:40:09 2007
New Revision: 564394
URL: http://svn.apache.org/viewvc?view=rev&rev=564394
Log:
Fix bug in jpa where routes were not getting deleted.
Improve performance of matcher events by not loading BRC until it is determined that the event is valid.
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelInstanceWorker.java Thu Aug 9 14:40:09 2007
@@ -52,7 +52,7 @@
* @param runnable
*/
synchronized void enqueue(Runnable runnable) {
- __log.debug("Enqueue work for instance IID " + _iid + ": " + runnable);
+ __log.debug("enqueue: for instance " + _process.getPID() + "#" + _iid + ": " + runnable);
_todoQueue.add(runnable);
// We mayh need to reschedule this thread if we've dropped out of the end of the run() method.
if (!_running) {
@@ -77,12 +77,10 @@
* forwarded from {@link Callable#call()}
*/
<T> T execInCurrentThread(Callable<T> callable) throws Exception {
- // If we are in the current thread we can just keep working. allows re-entrant calls.
-
+
if (isWorkerThread())
- return callable.call();
+ throw new BpelEngineException("InternalError: Attempt to reenter instance worker " + toString());
- __log.debug("Importing thread " + Thread.currentThread() + " for IID " + _iid);
final Semaphore ready = new Semaphore(0);
final Semaphore finished = new Semaphore(0);
enqueue(new Runnable() {
@@ -97,7 +95,6 @@
}
});
- __log.debug("Blocking main worker thread for IID " + _iid);
try {
ready.acquire();
} catch (InterruptedException ex) {
@@ -105,14 +102,13 @@
throw new BpelEngineException("Thread interrupted.", ex);
}
- __log.debug("Executing worker for IID " + _iid + " in imported thread " + Thread.currentThread());
+
_activeInstance.set(_iid);
try {
return doInstanceWork(callable);
} catch (Exception ex) {
throw ex;
} finally {
- __log.debug("Releasing worker thread for IID " + _iid + " imported thread " + Thread.currentThread());
finished.release();
_activeInstance.set(null);
}
@@ -125,7 +121,7 @@
* Implementation of the {@link Runnable} interface.
*/
public void run() {
- __log.debug("Running worker thread " + Thread.currentThread() + " for instance IID " + _iid);
+ __log.debug("Starting worker thread " + Thread.currentThread() + " for instance IID " + instanceId());
_activeInstance.set(_iid);
_workerThread = Thread.currentThread();
try {
@@ -142,7 +138,6 @@
}
next = _todoQueue.remove(0);
- __log.debug("Worker thread " + Thread.currentThread() + " for instance IID " + _iid + " found work: " + next);
}
try {
@@ -176,19 +171,19 @@
* @throws Exception
*/
private <T> T doInstanceWork(Callable<T> work) throws Exception {
- __log.debug("Doing work for instance " + _iid +" in thread " + Thread.currentThread());
+ __log.debug("Doing work for instance " + instanceId() +" in thread " + Thread.currentThread());
try {
return work.call();
} catch (Exception ex) {
- __log.error("Work for instance " + _iid + " in thread " + Thread.currentThread() + " resulted in an exception." ,ex);
+ __log.error("Work for instance " + instanceId() + " in thread " + Thread.currentThread() + " resulted in an exception." ,ex);
throw ex;
} finally {
- __log.debug("Finished work for instance " + _iid + " in thread " + Thread.currentThread());
+ __log.debug("Finished work for instance " + instanceId() + " in thread " + Thread.currentThread());
}
}
public String toString() {
- return "{BpelInstanceWorker: PID=" + _process.getPID() + " IID=" + _iid + " workerThread="+ _workerThread + "}";
+ return "{BpelInstanceWorker for " + instanceId() + "}";
}
boolean isWorkerThread() {
@@ -215,5 +210,9 @@
this.state = state;
}
}
-
+
+ private String instanceId() {
+ return _process.getPID() + "#" + _iid;
+ }
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Thu Aug 9 14:40:09 2007
@@ -41,8 +41,10 @@
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.MessageRouteDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -278,7 +280,7 @@
// If we did not get an ACK during this method, then mark this MEX as needing an ASYNC wake-up
if (mexdao.getStatus() != Status.ACK)
mexdao.setStatus(Status.ASYNC);
-
+
assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
}
@@ -329,12 +331,45 @@
private void executeContinueInstanceMatcherEvent(ProcessInstanceDAO instanceDao, String correlatorId,
CorrelationKey correlationKey) {
- BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
- assert worker.isWorkerThread();
- BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
- if (brc.matcherEvent(correlatorId, correlationKey))
+ if (__log.isDebugEnabled()) {
+ __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + correlationKey);
+ }
+
+ CorrelatorDAO correlator = instanceDao.getProcess().getCorrelator(correlatorId);
+
+ // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
+ // So we want to acquire the lock before we do anthing else.
+ MessageRouteDAO mroute = correlator.findRoute(correlationKey);
+ if (mroute == null) {
+ // Ok, this means that a message arrived before we did, so nothing to do.
+ __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
+ return;
+ }
+
+ // Now see if there is a message that matches this selector.
+ MessageExchangeDAO mexdao = correlator.dequeueMessage(correlationKey);
+ if (mexdao != null) {
+ __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
+
+ // We have a match, so we can get rid of the routing entries.
+ correlator.removeRoutes(mroute.getGroupId(), instanceDao);
+
+ // Found message matching one of our selectors.
+ if (__log.isDebugEnabled()) {
+ __log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + correlationKey);
+ }
+
+ BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+ assert worker.isWorkerThread();
+
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+ brc.injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
brc.execute();
+ } else {
+ __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
+
+ }
}
@@ -997,7 +1032,7 @@
}
void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
-
+
if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
BpelProcess caller = _server.getBpelProcess(mexdao.getPipedPID());
@@ -1032,7 +1067,7 @@
if (old == Status.ASYNC)
caller.p2pWakeup(pmex);
-
+
} else /* not p2p */{
// TODO: force a myrole mex to be created if it is not in cache.
for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
@@ -1216,7 +1251,7 @@
} finally {
if (mexdao.getStatus() != Status.ACK)
mexdao.setStatus(Status.ASYNC);
-
+
}
assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
@@ -1250,14 +1285,12 @@
myRoleMex.setTimeout(partnerRoleMex.getTimeout());
myRoleMex.setRequest(partnerRoleMex.getRequest());
myRoleMex.setInvocationStyle(partnerRoleMex.getInvocationStyle());
-
- // Piped cross-references.
+ // Piped cross-references.
myRoleMex.setPipedMessageExchangeId(partnerRoleMex.getMessageExchangeId());
myRoleMex.setPipedPID(getPID());
partnerRoleMex.setPipedPID(target.getPID());
partnerRoleMex.setPipedMessageExchangeId(myRoleMex.getMessageExchangeId());
-
// Properties used by stateful-exchange protocol.
String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
@@ -1295,12 +1328,12 @@
try {
iworker.execInCurrentThread(new Callable<Void>() {
-
+
public Void call() throws Exception {
executeContinueInstancePartnerRoleResponseReceived(prolemex);
return null;
}
-
+
});
} catch (Exception ex) {
throw new BpelEngineException(ex);
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu Aug 9 14:40:09 2007
@@ -770,6 +770,8 @@
_dao.setExecutionStateCounter(newcount);
_dao.setExecutionState(bos.toByteArray());
_instanceWorker.setCachedState(newcount, _soup);
+
+ __log.debug("CACHE SAVE: #" + newcount + " for instance " + _dao.getInstanceId());
}
void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
@@ -1153,47 +1155,4 @@
fetchPartnerLinkDAO(pLink).setPartnerSessionId(session);
}
-
- /**
- * Attempt to match message exchanges on a correlator.
- *
- */
- boolean matcherEvent(String correlatorId, CorrelationKey ckey) {
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
- }
- CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
-
- // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
- // So we want to acquire the lock before we do anthing else.
- MessageRouteDAO mroute = correlator.findRoute(ckey);
- if (mroute == null) {
- // Ok, this means that a message arrived before we did, so nothing to do.
- __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
- return false;
- }
-
- // Now see if there is a message that matches this selector.
- MessageExchangeDAO mexdao = correlator.dequeueMessage(ckey);
- if (mexdao != null) {
- __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
-
- // We have a match, so we can get rid of the routing entries.
- correlator.removeRoutes(mroute.getGroupId(), _dao);
-
- // Found message matching one of our selectors.
- if (BpelProcess.__log.isDebugEnabled()) {
- BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + ckey);
- }
-
- injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
- return true;
- } else {
- __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
-
- }
-
- return false;
- }
-
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Thu Aug 9 14:40:09 2007
@@ -156,7 +156,8 @@
messageRoute = correlator.findRoute(key);
if (messageRoute != null) {
if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+ __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " ROUTED TO (grp,index,iid) = (" + messageRoute.getGroupId() + "," + messageRoute.getIndex() + ", " + messageRoute.getTargetInstance().getInstanceId() + ")");
+
}
matchedKey = key;
break;
Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java (original)
+++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java Thu Aug 9 14:40:09 2007
@@ -57,8 +57,9 @@
private ProcessDAOImpl _process;
public CorrelatorDAOImpl(){}
- public CorrelatorDAOImpl(String correlatorKey) {
+ public CorrelatorDAOImpl(String correlatorKey, ProcessDAOImpl process) {
_correlatorKey = correlatorKey;
+ _process = process;
}
public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKey correlationKey) {
Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java?view=diff&rev=564394&r1=564393&r2=564394
==============================================================================
--- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java (original)
+++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java Thu Aug 9 14:40:09 2007
@@ -48,8 +48,13 @@
@Entity
@Table(name="ODE_PROCESS")
@NamedQueries({
- @NamedQuery(name="InstanceByCKey", query="SELECT cs._scope._processInstance FROM CorrelationSetDAOImpl as cs WHERE cs._correlationKey = :ckey"),
- @NamedQuery(name="CorrelatorByKey", query="SELECT c FROM CorrelatorDAOImpl as c WHERE c._correlatorKey = :ckey")
+ @NamedQuery(name="InstanceByCKey", query="SELECT cs._scope._processInstance " +
+ "FROM CorrelationSetDAOImpl as cs " +
+ "WHERE cs._correlationKey = :ckey"),
+
+ @NamedQuery(name="CorrelatorByKey", query="SELECT c " +
+ "FROM CorrelatorDAOImpl as c" +
+ " WHERE c._correlatorKey = :ckey AND c._process = :process")
})
public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
@@ -80,7 +85,7 @@
}
public CorrelatorDAO addCorrelator(String correlator) {
- CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator);
+ CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this);
_correlators.add(corr);
return corr;
}
@@ -88,6 +93,7 @@
public CorrelatorDAO getCorrelator(String correlatorId) {
Query qry = getEM().createNamedQuery("CorrelatorByKey");
qry.setParameter("ckey", correlatorId);
+ qry.setParameter("process", this);
List res = qry.getResultList();
if (res.size() == 0) return null;
return (CorrelatorDAO) res.get(0);