You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/12/08 18:08:50 UTC
svn commit: r484662 - in
/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine:
BpelEngineImpl.java BpelProcess.java BpelServerImpl.java
Author: mriou
Date: Fri Dec 8 09:08:49 2006
New Revision: 484662
URL: http://svn.apache.org/viewvc?view=rev&rev=484662
Log:
Implemented a simple process definition reaping thread. The idea is the disassociate the process definition from the BpelProcess instance when it hasn't been used for a certain time. The objective is to reduce the memory footprint of the engine when a very large quantity of processes are deployed in ODE.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Fri Dec 8 09:08:49 2006
@@ -50,9 +50,11 @@
private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
/** RNG, for delays */
private Random _random = new Random(System.currentTimeMillis());
-
+
private static double _delayMean = 0;
-
+ // Reaping
+ private static long _processMaxAge = 60*60*1000;
+
static {
try {
String delay = System.getenv("ODE_DEBUG_TX_DELAY");
@@ -67,14 +69,29 @@
__log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");
}
}
+ // TODO Clean this up and factorize engine configuration
+ try {
+ String processMaxAge = System.getenv("ODE_DEF_MAX_AGE");
+ if (processMaxAge != null && processMaxAge.length() > 0) {
+ _processMaxAge = Long.valueOf(processMaxAge);
+ __log.info("Process definition max age adjusted. Max age = " + _processMaxAge + "ms.");
+ }
+ } catch (Throwable t) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("Could not read ODE_DEF_MAX_AGE environment variable; assuming " + _processMaxAge + " delay", t);
+ } else {
+ __log.info("Could not read ODE_DEF_MAX_AGE environment variable; assuming " + _processMaxAge + " delay");
+ }
+ }
}
-
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Active processes, keyed by process id. */
final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
+ final HashMap<QName, Long> _processesLRU = new HashMap<QName, Long>();
+
/** Mapping from myrole endpoint name to active process. */
private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
@@ -82,6 +99,7 @@
public BpelEngineImpl(Contexts contexts) {
_contexts = contexts;
+ new Thread(new ProcessDefReaper()).start();
}
public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation)
@@ -103,8 +121,10 @@
dao.setOperation(operation);
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao);
- if (target != null)
+ if (target != null) {
target.initMyRoleMex(mex);
+ refreshLRU(target._pid);
+ }
return mex;
}
@@ -119,36 +139,37 @@
MessageExchangeImpl mex;
switch (mexdao.getDirection()) {
- case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
- if (process == null) {
- String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
- __log.error(errmsg);
- // TODO: Perhaps we should define a checked exception for this
- // condition.
+ case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
+ if (process == null) {
+ String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
+ __log.error(errmsg);
+ // TODO: Perhaps we should define a checked exception for this
+ // condition.
+ throw new BpelEngineException(errmsg);
+ } else {
+ refreshLRU(process._pid);
+ OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+ PortType ptype = plink.partnerRolePortType;
+ Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
+ // TODO: recover Partner's EPR
+ mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null,
+ plink.hasMyRole() ? process.getInitialMyRoleEPR(plink) : null,
+ process.getPartnerRoleChannel(plink));
+ }
+ break;
+ case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
+ mex = new MyRoleMessageExchangeImpl(this, mexdao);
+ if (process != null) {
+ OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+ PortType ptype = plink.myRolePortType;
+ Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+ mex.setPortOp(ptype, op);
+ }
+ break;
+ default:
+ String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
+ __log.fatal(errmsg);
throw new BpelEngineException(errmsg);
- } else {
- OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
- PortType ptype = plink.partnerRolePortType;
- Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
- // TODO: recover Partner's EPR
- mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null,
- plink.hasMyRole() ? process.getInitialMyRoleEPR(plink) : null,
- process.getPartnerRoleChannel(plink));
- }
- break;
- case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
- mex = new MyRoleMessageExchangeImpl(this, mexdao);
- if (process != null) {
- OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
- PortType ptype = plink.myRolePortType;
- Operation op = plink.getMyRoleOperation(mexdao.getOperation());
- mex.setPortOp(ptype, op);
- }
- break;
- default:
- String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
- __log.fatal(errmsg);
- throw new BpelEngineException(errmsg);
}
return mex;
@@ -249,18 +270,19 @@
}
assert process != null;
+ refreshLRU(process._pid);
process.handleWorkEvent(jobDetail);
-
+
// Do a delay for debugging purposes.
if (_delayMean != 0 )
- try {
- double u = _random.nextDouble(); // Uniform
- long delay = (long)(-Math.log(u)*_delayMean); // Exponential distribution with mean _delayMean
- __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms." );
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- ; // ignore
- }
+ try {
+ double u = _random.nextDouble(); // Uniform
+ long delay = (long)(-Math.log(u)*_delayMean); // Exponential distribution with mean _delayMean
+ __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms." );
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ ; // ignore
+ }
}
@@ -279,11 +301,39 @@
/**
* Get the list of globally-registered message-exchange interceptors.
- *
- * @return
+ * @return list
*/
List<MessageExchangeInterceptor> getGlobalInterceptors() {
return _contexts.globalIntereceptors;
}
+ void refreshLRU(QName pid) {
+ synchronized(_processesLRU) {
+ _processesLRU.put(pid, System.currentTimeMillis());
+ }
+ }
+
+ private class ProcessDefReaper implements Runnable {
+ public void run() {
+ try {
+ while (true) {
+ Thread.sleep(10000);
+ for (BpelProcess process : _activeProcesses.values()) {
+ Long lru;
+ synchronized(_processesLRU) {
+ lru = _processesLRU.get(process._pid);
+ }
+ if (lru != null && process._oprocess != null
+ && System.currentTimeMillis() - lru > _processMaxAge) {
+ process._oprocess = null;
+ __log.debug("Process definition reaper cleaning " + process._pid);
+ }
+ Thread.sleep(10);
+ }
+ }
+ } catch (InterruptedException e) {
+ __log.info(e);
+ }
+ }
+ }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Fri Dec 8 09:08:49 2006
@@ -79,7 +79,7 @@
DebuggerSupport _debugger;
- final OProcess _oprocess;
+ OProcess _oprocess;
final ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
@@ -202,6 +202,7 @@
}
void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
+ reload();
PartnerLinkMyRoleImpl target = null;
for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
if (endpoint.serviceName.equals(mex.getServiceName()))
@@ -640,6 +641,7 @@
* @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
*/
public void handleWorkEvent(Map<String, Object> jobData) {
+ reload();
ProcessInstanceDAO procInstance;
if (__log.isDebugEnabled()) {
@@ -790,5 +792,18 @@
public boolean isInMemory() {
return _pconf.isTransient();
+ }
+
+ private void reload() {
+ // Reload OProcess if it has been disposed
+ if (_oprocess == null) {
+ try {
+ _oprocess = BpelServerImpl.deserializeCompiledProcess(_pconf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = __msgs.msgProcessLoadError(_pconf.getProcessId());
+ __log.error(errmsg, e);
+ throw new BpelEngineException(errmsg, e);
+ }
+ }
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Fri Dec 8 09:08:49 2006
@@ -18,12 +18,6 @@
*/
package org.apache.ode.bpel.engine;
-import java.io.InputStream;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.xml.namespace.QName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -31,15 +25,8 @@
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.explang.ConfigurationException;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngine;
-import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OExpressionLanguage;
import org.apache.ode.bpel.o.OProcess;
@@ -47,6 +34,11 @@
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.utils.msg.MessageBundle;
+import javax.xml.namespace.QName;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* <p>
* The BPEL server implementation.
@@ -393,7 +385,7 @@
* input stream
* @return process information from configuration database
*/
- private OProcess deserializeCompiledProcess(InputStream is) throws Exception {
+ static OProcess deserializeCompiledProcess(InputStream is) throws Exception {
OProcess compiledProcess;
Serializer ofh = new Serializer(is);