You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/12/08 18:08:50 UTC

svn commit: r484662 - in /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine: BpelEngineImpl.java BpelProcess.java BpelServerImpl.java

Author: mriou
Date: Fri Dec  8 09:08:49 2006
New Revision: 484662

URL: http://svn.apache.org/viewvc?view=rev&rev=484662
Log:
Implemented a simple process definition reaping thread. The idea is the disassociate the process definition from the BpelProcess instance when it hasn't been used for a certain time. The objective is to reduce the memory footprint of the engine when a very large quantity of processes are deployed in ODE.

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Fri Dec  8 09:08:49 2006
@@ -50,9 +50,11 @@
     private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
     /** RNG, for delays */
     private Random _random = new Random(System.currentTimeMillis());
-    
+
     private static double _delayMean = 0;
-    
+    // Reaping
+    private static long _processMaxAge = 60*60*1000;
+
     static {
         try {
             String delay = System.getenv("ODE_DEBUG_TX_DELAY");
@@ -67,14 +69,29 @@
                 __log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");
             }
         }
+        // TODO Clean this up and factorize engine configuration
+        try {
+            String processMaxAge = System.getenv("ODE_DEF_MAX_AGE");
+            if (processMaxAge != null && processMaxAge.length() > 0) {
+                _processMaxAge = Long.valueOf(processMaxAge);
+                __log.info("Process definition max age adjusted. Max age = " + _processMaxAge + "ms.");
+            }
+        } catch (Throwable t) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("Could not read ODE_DEF_MAX_AGE environment variable; assuming " + _processMaxAge + " delay", t);
+            } else {
+                __log.info("Could not read ODE_DEF_MAX_AGE environment variable; assuming " + _processMaxAge + " delay");
+            }
+        }
     }
 
-
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
     /** Active processes, keyed by process id. */
     final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
 
+    final HashMap<QName, Long> _processesLRU = new HashMap<QName, Long>();
+
     /** Mapping from myrole endpoint name to active process. */
     private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
 
@@ -82,6 +99,7 @@
 
     public BpelEngineImpl(Contexts contexts) {
         _contexts = contexts;
+        new Thread(new ProcessDefReaper()).start();
     }
 
     public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation)
@@ -103,8 +121,10 @@
         dao.setOperation(operation);
         MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao);
 
-        if (target != null)
+        if (target != null) {
             target.initMyRoleMex(mex);
+            refreshLRU(target._pid);
+        }
 
         return mex;
     }
@@ -119,36 +139,37 @@
 
         MessageExchangeImpl mex;
         switch (mexdao.getDirection()) {
-        case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
-            if (process == null) {
-                String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
-                __log.error(errmsg);
-                // TODO: Perhaps we should define a checked exception for this
-                // condition.
+            case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
+                if (process == null) {
+                    String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
+                    __log.error(errmsg);
+                    // TODO: Perhaps we should define a checked exception for this
+                    // condition.
+                    throw new BpelEngineException(errmsg);
+                } else {
+                    refreshLRU(process._pid);
+                    OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+                    PortType ptype = plink.partnerRolePortType;
+                    Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
+                    // TODO: recover Partner's EPR
+                    mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null,
+                            plink.hasMyRole() ? process.getInitialMyRoleEPR(plink) : null,
+                            process.getPartnerRoleChannel(plink));
+                }
+                break;
+            case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
+                mex = new MyRoleMessageExchangeImpl(this, mexdao);
+                if (process != null) {
+                    OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+                    PortType ptype = plink.myRolePortType;
+                    Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+                    mex.setPortOp(ptype, op);
+                }
+                break;
+            default:
+                String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
+                __log.fatal(errmsg);
                 throw new BpelEngineException(errmsg);
-            } else {
-                OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
-                PortType ptype = plink.partnerRolePortType;
-                Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
-                // TODO: recover Partner's EPR
-                mex = new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null,
-                        plink.hasMyRole() ? process.getInitialMyRoleEPR(plink) : null,
-                        process.getPartnerRoleChannel(plink));
-            }
-            break;
-        case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-            mex = new MyRoleMessageExchangeImpl(this, mexdao);
-            if (process != null) {
-                OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
-                PortType ptype = plink.myRolePortType;
-                Operation op = plink.getMyRoleOperation(mexdao.getOperation());
-                mex.setPortOp(ptype, op);
-            }
-            break;
-        default:
-            String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
-            __log.fatal(errmsg);
-            throw new BpelEngineException(errmsg);
         }
 
         return mex;
@@ -249,18 +270,19 @@
         }
 
         assert process != null;
+        refreshLRU(process._pid);
         process.handleWorkEvent(jobDetail);
-        
+
         // Do a delay for debugging purposes.
         if (_delayMean != 0 )
-        try {
-            double u = _random.nextDouble();  // Uniform
-            long delay  = (long)(-Math.log(u)*_delayMean); // Exponential distribution with mean _delayMean
-            __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms." );
-            Thread.sleep(delay);
-        } catch (InterruptedException e) {
-            ; // ignore
-        } 
+            try {
+                double u = _random.nextDouble();  // Uniform
+                long delay  = (long)(-Math.log(u)*_delayMean); // Exponential distribution with mean _delayMean
+                __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms." );
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                ; // ignore
+            }
 
     }
 
@@ -279,11 +301,39 @@
 
     /**
      * Get the list of globally-registered message-exchange interceptors.
-     * 
-     * @return
+     * @return list
      */
     List<MessageExchangeInterceptor> getGlobalInterceptors() {
         return _contexts.globalIntereceptors;
     }
 
+    void refreshLRU(QName pid) {
+        synchronized(_processesLRU) {
+            _processesLRU.put(pid, System.currentTimeMillis());
+        }
+    }
+
+    private class ProcessDefReaper implements Runnable {
+        public void run() {
+            try {
+                while (true) {
+                    Thread.sleep(10000);
+                    for (BpelProcess process : _activeProcesses.values()) {
+                        Long lru;
+                        synchronized(_processesLRU) {
+                            lru = _processesLRU.get(process._pid);
+                        }
+                        if (lru != null && process._oprocess != null
+                                && System.currentTimeMillis() - lru > _processMaxAge) {
+                            process._oprocess = null;
+                            __log.debug("Process definition reaper cleaning " + process._pid);
+                        }
+                        Thread.sleep(10);
+                    }
+                }
+            } catch (InterruptedException e) {
+                __log.info(e);
+            }
+        }
+    }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Fri Dec  8 09:08:49 2006
@@ -79,7 +79,7 @@
 
     DebuggerSupport _debugger;
 
-    final OProcess _oprocess;
+    OProcess _oprocess;
 
     final ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
 
@@ -202,6 +202,7 @@
     }
 
     void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
+        reload();
         PartnerLinkMyRoleImpl target = null;
         for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
             if (endpoint.serviceName.equals(mex.getServiceName()))
@@ -640,6 +641,7 @@
      * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
      */
     public void handleWorkEvent(Map<String, Object> jobData) {
+        reload();
         ProcessInstanceDAO procInstance;
 
         if (__log.isDebugEnabled()) {
@@ -790,5 +792,18 @@
 
     public boolean isInMemory() {
         return _pconf.isTransient();
+    }
+
+    private void reload() {
+        // Reload OProcess if it has been disposed
+        if (_oprocess == null) {
+            try {
+                _oprocess = BpelServerImpl.deserializeCompiledProcess(_pconf.getCBPInputStream());
+            } catch (Exception e) {
+                String errmsg = __msgs.msgProcessLoadError(_pconf.getProcessId());
+                __log.error(errmsg, e);
+                throw new BpelEngineException(errmsg, e);
+            }
+        }
     }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=484662&r1=484661&r2=484662
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Fri Dec  8 09:08:49 2006
@@ -18,12 +18,6 @@
  */
 package org.apache.ode.bpel.engine;
 
-import java.io.InputStream;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.xml.namespace.QName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -31,15 +25,8 @@
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.explang.ConfigurationException;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngine;
-import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OExpressionLanguage;
 import org.apache.ode.bpel.o.OProcess;
@@ -47,6 +34,11 @@
 import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
 import org.apache.ode.utils.msg.MessageBundle;
 
+import javax.xml.namespace.QName;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * <p>
  * The BPEL server implementation. 
@@ -393,7 +385,7 @@
      *            input stream
      * @return process information from configuration database
      */
-    private OProcess deserializeCompiledProcess(InputStream is) throws Exception {
+    static OProcess deserializeCompiledProcess(InputStream is) throws Exception {
 
         OProcess compiledProcess;
         Serializer ofh = new Serializer(is);