You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/03 23:00:09 UTC
svn commit: r552978 - in /incubator/ode/branches/bart:
axis2/src/main/java/org/apache/ode/axis2/
bpel-dao/src/main/java/org/apache/ode/bpel/dao/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
jbi/src/main/java/org/apache/ode/jbi/
Author: mszefler
Date: Tue Jul 3 14:00:06 2007
New Revision: 552978
URL: http://svn.apache.org/viewvc?view=rev&rev=552978
Log:
BART: BpelEngineImpl refactor/eleimination
Modified:
incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Jul 3 14:00:06 2007
@@ -40,7 +40,12 @@
public MessageExchangeContextImpl(ODEServer server) {
}
- public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
+
+ public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void invokePartnerBlocking(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
if (__log.isDebugEnabled())
__log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName());
@@ -48,14 +53,29 @@
if (__log.isDebugEnabled())
__log.debug("The service to invoke is the external service " + service);
service.invoke(partnerRoleMessageExchange);
+
+ }
+
+ public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException();
}
+ public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+ throw new UnsupportedOperationException();
+ }
+
+
+
public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException {
if (__log.isDebugEnabled())
__log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName());
// Nothing to do, no callback is necessary, the client just synchornizes itself with the
// mex reply when invoking the engine.
+ }
+
+ public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+ // We don't support this yet, so not much to do here.
}
}
Modified: incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original)
+++ incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Tue Jul 3 14:00:06 2007
@@ -260,4 +260,4 @@
void setFailureType(String failureType);
-}
+ }
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -23,7 +23,7 @@
ResponseFuture _future;
- public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public AsyncMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
super(engine, mexId);
}
@@ -33,7 +33,7 @@
_future = new ResponseFuture();
- BpelProcess target = _engine.route(_callee, _request);
+ BpelProcess target = _server.route(_callee, _request);
if (target == null) {
if (__log.isWarnEnabled())
__log.warn(__msgs.msgUnknownEPR("" + _epr));
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -16,7 +16,7 @@
Future<Status> _future;
boolean _done = false;
- public BlockingMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public BlockingMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
super(engine, mexId);
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Jul 3 14:00:06 2007
@@ -8,7 +8,7 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,10 +65,6 @@
class BpelEngineImpl {
private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
- /** RNG, for delays */
- private Random _random = new Random(System.currentTimeMillis());
-
- private static double _delayMean = 0;
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -81,18 +77,7 @@
_contexts = contexts;
}
- OProcess getOProcess(QName processId) {
- BpelProcess process = _activeProcesses.get(processId);
-
- if (process == null)
- return null;
-
- return process.getOProcess();
- }
- public void processJob(WorkEvent we) throws BpelEngineException {
- }
- }
private boolean checkRetry(final JobInfo jobInfo, Throwable t) {
// TODO, better handling of failed jobs (put them in the DB perhaps?)
@@ -138,39 +123,6 @@
// No more retries.
return false;
- }
-
- /**
- * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
- * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
- */
- private void debuggingDelay() {
- // Do a delay for debugging purposes.
- if (_delayMean != 0)
- try {
- long delay = randomExp(_delayMean);
- // distribution
- // with mean
- // _delayMean
- __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- ; // ignore
- }
- }
-
- private long randomExp(double mean) {
- double u = _random.nextDouble(); // Uniform
- long delay = (long) (-Math.log(u) * mean); // Exponential
- return delay;
- }
-
- void fireEvent(BpelEvent event) {
- // Note that the eventListeners list is a copy-on-write array, so need
- // to mess with synchronization.
- for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
- l.onEvent(event);
- }
}
/**
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul 3 14:00:06 2007
@@ -19,11 +19,14 @@
package org.apache.ode.bpel.engine;
import java.io.InputStream;
+import java.sql.Date;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -44,7 +47,6 @@
import org.apache.ode.bpel.explang.ConfigurationException;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
@@ -53,8 +55,10 @@
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
+import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.bpel.o.OElementVarType;
import org.apache.ode.bpel.o.OExpressionLanguage;
import org.apache.ode.bpel.o.OMessageVarType;
@@ -68,6 +72,7 @@
import org.apache.ode.jacob.soup.ReplacementMap;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.msg.MessageBundle;
+import org.omg.CosNaming.IstringHelper;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@@ -76,7 +81,7 @@
/**
* Entry point into the runtime of a BPEL process.
*
- * @author Maciej Szefler
+ * @author Maciej Szefler
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelProcess {
@@ -108,8 +113,6 @@
/** Last time the process was used. */
private volatile long _lastUsed;
- BpelEngineImpl _engine;
-
DebuggerSupport _debugger;
ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
@@ -125,14 +128,33 @@
private HydrationLatch _hydrationLatch;
private Contexts _contexts;
-
+
/** Manage instance-level locks. */
private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+ private final Set<InvocationStyle> _invocationStyles;
+
+ private BpelDAOConnectionFactoryImpl _inMemDao;
+
+ private Random _random = new Random();
+
+ private BpelServerImpl _server;
+
public BpelProcess(ProcessConf conf, BpelEventListener debugger) {
_pid = conf.getProcessId();
_pconf = conf;
_hydrationLatch = new HydrationLatch();
+ _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.scheduler);
+
+ // TODO : do this on a per-partnerlink basis, support transacted styles.
+ HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
+ istyles.add(InvocationStyle.BLOCKING);
+ if (!conf.isTransient()) {
+ istyles.add(InvocationStyle.ASYNC);
+ istyles.add(InvocationStyle.RELIABLE);
+ }
+
+ _invocationStyles = Collections.unmodifiableSet(istyles);
}
public String toString() {
@@ -159,41 +181,37 @@
*
* @param mex
*/
- void invokeProcess(MyRoleMessageExchangeImpl mex) {
+ void invokeProcess(MessageExchangeDAO mexdao) {
_hydrationLatch.latch(1);
try {
- MessageExchangeDAO mexdao = getDAO(mex);
-
- PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
+ PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
if (target == null) {
- String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
+ String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
__log.error(errmsg);
- mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null);
+ mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
+ mexdao.setFaultExplanation(errmsg);
+ mexdao.setStatus(Status.FAILURE.toString());
return;
}
- getDAO(mex).setProcess(getProcessDAO());
+ mexdao.setProcess(getProcessDAO());
- if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
- __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
- return;
- }
+ // TODO: fix this
+ // if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
+ // __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
+ // return;
+ // }
markused();
- target.invokeMyRole(mex);
- markused();
+ target.invokeMyRole(mexdao);
} finally {
_hydrationLatch.release(1);
}
- // For a one way, once the engine is done, the mex can be safely released.
- if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
- mex.release();
- }
- }
-
- private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
-
+ // TODO: relocate this code // For a one way, once the engine is done, the mex can be safely released.
+ // if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
+ // mex.release();
+ // }
}
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
@@ -291,59 +309,40 @@
* @return <code>true</code> if execution should continue, <code>false</code> otherwise
*/
boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf);
-
- for (MessageExchangeInterceptor i : _mexInterceptors)
- if (!mex.processInterceptor(i, mex, ictx, invoker))
- return false;
- for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
- if (!mex.processInterceptor(i, mex, ictx, invoker))
- return false;
-
+ // InterceptorContextImpl ictx = new InterceptorContextImpl(_contexts.dao.getConnection(), getProcessDAO(), _pconf);
+ //
+ // for (MessageExchangeInterceptor i : _mexInterceptors)
+ // if (!mex.processInterceptor(i, mex, ictx, invoker))
+ // return false;
+ // for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
+ // if (!mex.processInterceptor(i, mex, ictx, invoker))
+ // return false;
+ //
return true;
}
-
/*
-
- // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
- // all types of failure here, the scheduler is not going to know how to handle our errors,
- // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
- // to a grinding halt.
- try {
-
- ProcessInstanceDAO instance;
- if (process.isInMemory())
- instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
- else
- instance = _contexts.dao.getConnection().getInstance(we.getIID());
-
- if (instance == null) {
- __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
- // nothing we can do, this instance is not in the database, it will
- // always
- // fail.
- return;
- }
- ProcessDAO processDao = instance.getProcess();
- process = _activeProcesses.get(processDao.getProcessId());
-
- process.handleWorkEvent(we.getDetail());
- debuggingDelay();
- } catch (BpelEngineException bee) {
- __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
- throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));
- } catch (ContextException ce) {
- __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
- throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));
- } catch (RuntimeException rte) {
- __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
- throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));
- } catch (Throwable t) {
- __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
- throw new Scheduler.JobProcessorException(false);
-
-
+ * // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the
+ * scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest
+ * the whole system come // to a grinding halt. try {
+ *
+ * ProcessInstanceDAO instance; if (process.isInMemory()) instance =
+ * _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance =
+ * _contexts.dao.getConnection().getInstance(we.getIID());
+ *
+ * if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this
+ * instance is not in the database, it will // always // fail. return; } ProcessDAO processDao = instance.getProcess(); process =
+ * _activeProcesses.get(processDao.getProcessId());
+ *
+ * process.handleWorkEvent(we.getDetail()); debuggingDelay(); } catch (BpelEngineException bee) {
+ * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee,
+ * checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
+ * throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) {
+ * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte,
+ * checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new
+ * Scheduler.JobProcessorException(false);
+ *
+ *
*/
/**
* @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
@@ -364,9 +363,8 @@
if (__log.isDebugEnabled()) {
__log.debug("InvokeInternal event for mexid " + we.getMexId());
}
- ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we
- .getMexId());
- invokeProcess(mex);
+ MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
+ invokeProcess(mexdao);
} else {
// Instance level events
// We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
@@ -390,7 +388,7 @@
__log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
// TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
- + Math.min(randomExp(1000), 10000)));
+ + _random.nextInt(1000)));
return;
}
@@ -435,6 +433,11 @@
}
}
+ private MessageExchangeDAO loadMexDao(String mexId) {
+ return isInMemory() ? _inMemDao.getConnection().getMessageExchange(mexId) : _contexts.dao.getConnection()
+ .getMessageExchange(mexId);
+ }
+
private void setRoles(OProcess oprocess) {
_partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
_myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
@@ -485,8 +488,7 @@
}
ProcessDAO getProcessDAO() {
- return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : getEngine()._contexts.dao
- .getConnection().getProcess(_pid);
+ return isInMemory() ? _inMemDao.getConnection().getProcess(_pid) : _contexts.dao.getConnection().getProcess(_pid);
}
static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -539,7 +541,7 @@
void deactivate() {
// Deactivate all the my-role endpoints.
for (Endpoint endpoint : _myEprs.keySet())
- _engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
+ _contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
// TODO Deactivate all the partner-role channels
}
@@ -653,10 +655,10 @@
MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) {
InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
-
+
_hydrationLatch.latch(1);
try {
- MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId());
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.myRolePortType;
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
@@ -667,7 +669,7 @@
}
}
- PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
+ PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
PartnerRoleMessageExchangeImpl mex;
_hydrationLatch.latch(1);
@@ -677,24 +679,27 @@
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
switch (istyle) {
case BLOCKING:
- mex = new BlockingPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new BlockingPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+ null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case ASYNC:
- mex = new AsyncPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+ mex = new AsyncPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case TRANSACTED:
- mex = new TransactedPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new TransactedPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+ null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case RELIABLE:
- mex = new ReliablePartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new ReliablePartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+ null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
-
+
default:
throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
@@ -706,6 +711,10 @@
}
+ Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
+ return _invocationStyles;
+ }
+
private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
_hydrationLatch.latch(1);
try {
@@ -724,10 +733,6 @@
}
}
- BpelEngineImpl getEngine() {
- return _engine;
- }
-
public boolean isInMemory() {
return _pconf.isTransient();
}
@@ -853,7 +858,7 @@
if (!_hydratedOnce) {
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
- PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid,
+ PartnerRoleChannel channel = _contexts.bindingContext.createPartnerRoleChannel(_pid,
prole._plinkDef.partnerRolePortType, prole._initialPartner);
prole._channel = channel;
_partnerChannels.put(prole._initialPartner, prole._channel);
@@ -880,16 +885,16 @@
}
if (isInMemory()) {
- bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
- } else if (_engine._contexts.scheduler.isTransacted()) {
+ bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ } else if (_contexts.scheduler.isTransacted()) {
// If we have a transaction, we do this in the current transaction.
- bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
} else {
// If we do not have a transaction we need to create one.
try {
- _engine._contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
+ _contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
public Object call() throws Exception {
- bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
return null;
}
});
@@ -902,4 +907,13 @@
}
}
+
+ MessageExchangeDAO createMessageExchange(final char dir) {
+ if (isInMemory()) {
+ return _inMemDao.getConnection().createMessageExchange(dir);
+ } else {
+ return _contexts.dao.getConnection().createMessageExchange(dir);
+ }
+ }
+
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jul 3 14:00:06 2007
@@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Random;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
@@ -93,24 +94,28 @@
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
+ /** RNG, for delays */
+ private Random _random = new Random(System.currentTimeMillis());
+
+ private static double _delayMean = 0;
+
/**
* Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
* _mngmtLock.writeLock().
*/
private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>();
- /** Mapping from myrole endpoint name to active process. */
- private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
+ /** Mapping from myrole service name to active process. */
+ private final HashMap<QName, BpelProcess> _serviceMap = new HashMap<QName, BpelProcess>();
private State _state = State.SHUTDOWN;
- private Contexts _contexts = new Contexts();
+ Contexts _contexts = new Contexts();
private DehydrationPolicy _dehydrationPolicy;
private Properties _configProperties;
-
BpelDatabase _db;
/**
@@ -273,9 +278,9 @@
for (Endpoint e : process.getServiceNames()) {
__log.debug("Register process: serviceId=" + e + ", process=" + process);
- _serviceMap.put(e, process);
+ _serviceMap.put(e.serviceName, process);
}
-
+
process.activate(_contexts);
_registeredProcesses.put(process.getPID(), process);
@@ -350,15 +355,12 @@
// TODO: use the message to route to the correct service if more than
// one service is listening on the same endpoint.
- BpelProcess routed = null;
- for (Endpoint endpoint : _serviceMap.keySet()) {
- if (endpoint.serviceName.equals(service))
- routed = _serviceMap.get(endpoint);
- }
- if (__log.isDebugEnabled())
- __log.debug("Routed: svcQname " + service + " --> " + routed);
- return routed;
-
+ _mngmtLock.readLock().lock();
+ try {
+ return _serviceMap.get(service);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
}
/**
@@ -446,10 +448,6 @@
_contexts.dao = daoCF;
}
- public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
- _contexts.inMemDao = daoCF;
- }
-
public void setBindingContext(BindingContext bc) {
_contexts.bindingContext = bc;
}
@@ -467,13 +465,7 @@
Callable<String> createDao = new Callable<String>() {
public String call() throws Exception {
- MessageExchangeDAO dao;
- if (target.isInMemory()) {
- dao = _contexts.inMemDao.getConnection().createMessageExchange(
- MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
- } else {
- dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
- }
+ MessageExchangeDAO dao = target.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
dao.setInvocationStyle(istyle.toString());
dao.setCorrelationId(clientKey);
dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
@@ -554,7 +546,7 @@
return null;
ProcessDAO pdao = mexdao.getProcess();
- BpelProcess process = pdao == null ? null : _engine._activeProcesses.get(pdao.getProcessId());
+ BpelProcess process = pdao == null ? null : _registeredProcesses.get(pdao.getProcessId());
if (process == null) {
String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
@@ -607,23 +599,73 @@
_mngmtLock.readLock().lock();
try {
-
+ BpelProcess process = _serviceMap.get(serviceId);
+ if (process == null)
+ throw new BpelEngineException("No such service: " + serviceId);
+ return process.getSupportedInvocationStyle(serviceId);
} finally {
_mngmtLock.readLock().unlock();
}
}
-
void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
}
+ OProcess getOProcess(QName processId) {
+ _mngmtLock.readLock().lock();
+ try {
+ BpelProcess process = _registeredProcesses.get(processId);
+
+ if (process == null)
+ return null;
+
+ return process.getOProcess();
+
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
protected void assertTransaction() {
if (!_contexts.scheduler.isTransacted())
throw new BpelEngineException("Operation must be performed in a transaction!");
}
+ void fireEvent(BpelEvent event) {
+ // Note that the eventListeners list is a copy-on-write array, so need
+ // to mess with synchronization.
+ for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
+ l.onEvent(event);
+ }
+ }
+
+ /**
+ * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
+ * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
+ */
+ private void debuggingDelay() {
+ // Do a delay for debugging purposes.
+ if (_delayMean != 0)
+ try {
+ long delay = randomExp(_delayMean);
+ // distribution
+ // with mean
+ // _delayMean
+ __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ ; // ignore
+ }
+ }
+
+ private long randomExp(double mean) {
+ double u = _random.nextDouble(); // Uniform
+ long delay = (long) (-Math.log(u) * mean); // Exponential
+ return delay;
+ }
+
private class ProcessDefReaper implements Runnable {
public void run() {
__log.debug("Starting process definition reaper thread.");
@@ -661,6 +703,4 @@
}
}
-
-
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Jul 3 14:00:06 2007
@@ -45,7 +45,6 @@
BindingContext bindingContext;
BpelDAOConnectionFactory dao;
- BpelDAOConnectionFactory inMemDao;
/** Global Message-Exchange interceptors. Must be copy-on-write!!! */
final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -93,7 +93,7 @@
Contexts _contexts;
- BpelEngineImpl _engine;
+ BpelServerImpl _server;
boolean _associated;
@@ -121,9 +121,9 @@
private Set<String> _propNames;
- public MessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public MessageExchangeImpl(BpelServerImpl engine, String mexId) {
_contexts = engine._contexts;
- _engine = engine;
+ _server = engine;
_mexId = mexId;
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -28,7 +28,7 @@
protected QName _callee;
- public MyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public MyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
super(engine, mexId);
}
@@ -116,8 +116,8 @@
doInTX(new InDbAction<Void>() {
public Void call(MessageExchangeDAO mexdao) {
- _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
- _engine._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
+ _server._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ _server._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
return null;
}
@@ -134,9 +134,9 @@
* @return <code>true</code> if execution should continue, <code>false</code> otherwise
*/
protected boolean processInterceptors(InterceptorInvoker invoker, MessageExchangeDAO mexDao) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), mexDao.getProcess(), null);
+ InterceptorContextImpl ictx = new InterceptorContextImpl(_server._contexts.dao.getConnection(), mexDao.getProcess(), null);
- for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
+ for (MessageExchangeInterceptor i : _server.getGlobalInterceptors())
if (!processInterceptor(i, this, ictx, invoker))
return false;
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -57,7 +57,7 @@
public static final int TIMEOUT = 2 * 60 * 1000;
- public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public ReliableMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
super(engine, mexId);
// RELIABLE means we are bound to a transaction
@@ -76,7 +76,7 @@
if (_status != Status.NEW)
throw new BpelEngineException("Invalid state: " + _status);
- final BpelProcess target = _engine.route(_callee, _request);
+ final BpelProcess target = _server.route(_callee, _request);
if (target == null) {
if (__log.isWarnEnabled())
__log.warn(__msgs.msgUnknownEPR("" + _epr));
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007
@@ -9,7 +9,7 @@
*/
public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
- public TransactedMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+ public TransactedMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
super(engine, mexId);
}
Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Tue Jul 3 14:00:06 2007
@@ -1,4 +1,4 @@
-/*
+at/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information