You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/02/06 23:17:57 UTC

svn commit: r504332 [2/2] - in /incubator/ode/trunk: axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/ bpel-obj/src/main/java/org/apache/ode/bpel/o/ bpel...

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Feb  6 14:17:56 2007
@@ -24,21 +24,19 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.explang.ConfigurationException;
 import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.BpelEventListener;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.o.OExpressionLanguage;
 import org.apache.ode.bpel.o.OProcess;
-import org.apache.ode.bpel.o.Serializer;
-import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
 import org.apache.ode.utils.msg.MessageBundle;
 
 import javax.xml.namespace.QName;
-import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -46,7 +44,7 @@
  * <p>
  * The BPEL server implementation.
  * </p>
- * 
+ *
  * <p>
  * This implementation is intended to be thread safe. The key concurrency
  * mechanism is a "management" read/write lock that synchronizes all management
@@ -55,55 +53,55 @@
  * to the lock is scoped to the method, while read access is scoped to a
  * transaction.
  * </p>
- * 
+ *
  * @author Maciej Szefler <mszefler at gmail dot com>
- * @author mriou <mriou at apache dot org>
+ * @author Matthieu Riou <mriou at apache dot org>
  */
-public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
+public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor, ProcessLifecycleCallback {
 
     private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
-
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
+    private final List<BpelProcess> _runningProcesses =
+            Collections.synchronizedList(new ArrayList<BpelProcess>());
+
+    private State _state = State.SHUTDOWN;
+    private Contexts _contexts = new Contexts();
+    private DehydrationPolicy _dehydrationPolicy;
+    BpelEngineImpl _engine;
+    BpelDatabase _db;
+
+    /**
+     * Management lock for synchronizing management operations and preventing
+     * processing (transactions) from occuring while management operations are
+     * in progress.
+     */
+    private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
+
     static {
         // TODO Clean this up and factorize engine configuration
         try {
-            String processMaxAge = System.getenv("ODE_DEF_MAX_AGE");
+            String processMaxAge = System.getProperty("ode.process.maxage");
             if (processMaxAge != null && processMaxAge.length() > 0) {
                 __processMaxAge = Long.valueOf(processMaxAge);
                 __log.info("Process definition max age adjusted. Max age = " + __processMaxAge + "ms.");
             }
         } catch (Throwable t) {
             if (__log.isDebugEnabled()) {
-                __log.debug("Could not parse ODE_DEF_MAX_AGE environment variable.", t);
+                __log.debug("Could not parse ode.process.maxage environment variable.", t);
             } else {
-                __log.info("Could not parse ODE_DEF_MAX_AGE environment variable; reaping disabled.");
+                __log.info("Could not parse ode.process.maxage environment variable; reaping disabled.");
             }
         }
     }
 
-    /**
-     * Management lock for synchronizing management operations and preventing
-     * processing (transactions) from occuring while management operations are
-     * in progress.
-     */
-    private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
-
     private enum State {
         SHUTDOWN, INIT, RUNNING
     }
 
-    private State _state = State.SHUTDOWN;
-
-    private Contexts _contexts = new Contexts();
-
-    BpelEngineImpl _engine;
-
-    BpelDatabase _db;
-
     public void start() {
         _mngmtLock.writeLock().lock();
         try {
@@ -117,6 +115,7 @@
             _contexts.scheduler.start();
             _state = State.RUNNING;
             __log.info(__msgs.msgServerStarted());
+            if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
         } finally {
             _mngmtLock.writeLock().unlock();
         }
@@ -125,7 +124,6 @@
     /**
      * Register a global listener to receive {@link BpelEvent}s froom all
      * processes.
-     * 
      * @param listener
      */
     public void registerBpelEventListener(BpelEventListener listener) {
@@ -136,7 +134,6 @@
     /**
      * Unregister a global listener from receive {@link BpelEvent}s from all
      * processes.
-     * 
      * @param listener
      */
     public void unregisterBpelEventListener(BpelEventListener listener) {
@@ -163,37 +160,6 @@
         }
     }
 
-    public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
-        _contexts.mexContext = mexContext;
-    }
-
-    public void setScheduler(Scheduler scheduler) throws BpelEngineException {
-        _contexts.scheduler = scheduler;
-    }
-
-    public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
-        _contexts.eprContext = eprContext;
-    }
-
-    /**
-     * Set the DAO connection factory. The DAO is used by the BPEL engine to
-     * persist information about active processes.
-     * 
-     * @param daoCF
-     *            {@link BpelDAOConnectionFactory} implementation.
-     */
-    public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
-        _contexts.dao = daoCF;
-    }
-
-    public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
-        _contexts.inMemDao = daoCF;
-    }
-
-    public void setBindingContext(BindingContext bc) {
-        _contexts.bindingContext = bc;
-    }
-
     public void init() throws BpelEngineException {
         _mngmtLock.writeLock().lock();
         try {
@@ -226,7 +192,6 @@
     }
 
     public BpelEngine getEngine() {
-
         boolean registered = false;
         _mngmtLock.readLock().lock();
         try {
@@ -237,7 +202,6 @@
                 public void beforeCompletion() {
                     _mngmtLock.readLock().unlock();
                 }
-
             });
             registered = true;
         } finally {
@@ -255,16 +219,6 @@
 
         __log.debug("register: " + conf.getProcessId());
 
-        // Load the compiled process.
-        OProcess compiledProcess;
-        try {
-            compiledProcess = deserializeCompiledProcess(conf.getCBPInputStream());
-        } catch (Exception e) {
-            String errmsg = __msgs.msgProcessLoadError(conf.getProcessId());
-            __log.error(errmsg, e);
-            throw new BpelEngineException(errmsg, e);
-        }
-
         // Ok, IO out of the way, we will mod the server state, so need to get a
         // lock.
         try {
@@ -283,24 +237,10 @@
 
             __log.debug("Registering process " + conf.getProcessId() + " with server.");
 
-            // Create an expression language registry for this process
-            ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry();
-            for (OExpressionLanguage elang : compiledProcess.expressionLanguages) {
-                try {
-                    elangRegistry.registerRuntime(elang);
-                } catch (ConfigurationException e) {
-                    String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties);
-                    __log.error(msg, e);
-                    throw new BpelEngineException(msg, e);
-                }
-            }
-
-            // Create the processDAO if necessary.
-            createProcessDAO(conf.getProcessId(), conf.getVersion(), compiledProcess);
-
-            BpelProcess process = new BpelProcess(conf, compiledProcess, null, elangRegistry);
+            BpelProcess process = new BpelProcess(conf, null, this);
 
             _engine.registerProcess(process);
+            _runningProcesses.add(process);
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
         } finally {
@@ -320,8 +260,11 @@
         }
 
         try {
-            if (_engine != null)
+            BpelProcess p = null;
+            if (_engine != null) {
                 _engine.unregisterProcess(pid);
+                _runningProcesses.remove(p);
+            }
 
             __log.info(__msgs.msgProcessUnregistered(pid));
 
@@ -337,76 +280,38 @@
      * If necessary, create an object in the data store to represent the
      * process. We'll re-use an existing object if it already exists and matches
      * the GUID.
-     * 
-     * @param pid
-     * @param oprocess
      */
-    private void createProcessDAO(final QName pid, final long version, final OProcess oprocess) {
+    private void bounceProcessDAO(BpelDAOConnection conn, final QName pid,
+                                  final long version, final OProcess oprocess) {
         __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
         try {
-            boolean create = _db.exec(new BpelDatabase.Callable<Boolean>() {
-                public Boolean run(BpelDAOConnection conn) throws Exception {
-                    ProcessDAO old = conn.getProcess(pid);
-                    if (old == null) {
-                        // we couldnt find the process, clearly we need to
-                        // create it
-                        return true;
-                    }
-
-                    __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
-
-                    if (oprocess.guid == null) {
-                        // No guid, old version assume its good
-                        return false;
-                    }
-
+            boolean create = true;
+            ProcessDAO old = conn.getProcess(pid);
+            if (old != null) {
+                __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
+                if (oprocess.guid == null) {
+                    // No guid, old version assume its good
+                    create = false;
+                } else {
                     if (old.getGuid().equals(oprocess.guid)) {
                         // Guids match, no need to create
-                        return false;
+                        create = false;
+                    } else {
+                        // GUIDS dont match, delete and create new
+                        String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match "
+                                + oprocess.guid + "; replacing.";
+                        __log.debug(errmsg);
+                        old.delete();
                     }
-
-                    // GUIDS dont match, delete and create new
-                    String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid
-                            + "; replacing.";
-                    __log.debug(errmsg);
-                    old.delete();
-
-                    return true;
-
                 }
-            });
-
-            if (create)
-                _db.exec(new BpelDatabase.Callable<Object>() {
-                    public Object run(BpelDAOConnection conn) throws Exception {
-                        ProcessDAO old = conn.getProcess(pid);
-                        if (old != null) {
-                            __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
-
-                            if (oprocess.guid != null) {
-                                if (!old.getGuid().equals(oprocess.guid)) {
-                                    // TODO: Versioning will need to handle this
-                                    // differently.
-                                    String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match "
-                                            + oprocess.guid + "; replacing.";
-                                    __log.warn(errmsg);
-                                    old.delete();
-                                } else {
-                                    return null;
-                                }
-                            } else {
-                                // no guid, consider compatible.
-                                return null;
-                            }
-                        }
+            }
 
-                        ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, version);
-                        for (String correlator : oprocess.getCorrelators()) {
-                            newDao.addCorrelator(correlator);
-                        }
-                        return null;
-                    }
-                });
+            if (create) {
+                ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, version);
+                for (String correlator : oprocess.getCorrelators()) {
+                    newDao.addCorrelator(correlator);
+                }
+            }
         } catch (BpelEngineException ex) {
             throw ex;
         } catch (Exception dce) {
@@ -417,9 +322,7 @@
 
     /**
      * Register a global message exchange interceptor.
-     * 
-     * @param interceptor
-     *            message-exchange interceptor
+     * @param interceptor message-exchange interceptor
      */
     public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -428,9 +331,7 @@
 
     /**
      * Unregister a global message exchange interceptor.
-     * 
-     * @param interceptor
-     *            message-exchange interceptor
+     * @param interceptor message-exchange interceptor
      */
     public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -443,32 +344,13 @@
     private final boolean checkState(State i, State j) {
         if (_state == i)
             return true;
-
         if (_state == j)
             return false;
-
         throw new IllegalStateException("Unexpected state: " + i);
-
-    }
-
-    /**
-     * De-serialize the compiled process representation from a stream.
-     * 
-     * @param is
-     *            input stream
-     * @return process information from configuration database
-     */
-    static OProcess deserializeCompiledProcess(InputStream is) throws Exception {
-
-        OProcess compiledProcess;
-        Serializer ofh = new Serializer(is);
-        compiledProcess = ofh.readOProcess();
-        return compiledProcess;
     }
 
     /* TODO: We need to have a method of cleaning up old deployment data. */
     private boolean deleteProcessDAO(final QName pid) {
-
         try {
             // Delete it from the database.
             return _db.exec(new BpelDatabase.Callable<Boolean>() {
@@ -481,51 +363,92 @@
                     return false;
                 }
             });
-
         } catch (Exception ex) {
             String errmsg = "DbError";
             __log.error(errmsg, ex);
             throw new BpelEngineException(errmsg, ex);
         }
-
     }
 
     public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
-        getEngine().onScheduledJob(jobInfo);        
+        getEngine().onScheduledJob(jobInfo);
+    }
+
+    public void hydrated(final BpelProcess process) {
+        // Recreating the process DAO if the definition has changed, shouldn't do anything
+        // except after a redeploy
+        bounceProcessDAO(_contexts.dao.getConnection(), process.getPID(), process._pconf.getVersion(), process.getOProcess());
+
+        _runningProcesses.add(process);
+    }
+
+    private class ProcessDefReaper implements Runnable {
+        public void run() {
+            __log.debug("Starting process definition reaper thread.");
+            long pollingTime = 10000;
+            try {
+                while (true) {
+                    Thread.sleep(pollingTime);
+                    _mngmtLock.writeLock().lock();
+                    try {
+                        __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
+                        // Copying the runnning process list to avoid synchronization
+                        // problems and a potential mess if a policy modifies the list
+                        List<BpelProcess> runningProcesses;
+                        synchronized(_runningProcesses) {
+                            runningProcesses = new ArrayList<BpelProcess>(_runningProcesses);
+                        }
+                        // And the happy winners are...
+                        List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(runningProcesses);
+                        // Bye bye
+                        for (BpelProcess process : ripped) {
+                            __log.debug("Dehydrating process " + process.getPID());
+                            process.dehydrate();
+                            _runningProcesses.remove(process);
+                        }
+                    } finally {
+                        _mngmtLock.writeLock().unlock();
+                    }
+                }
+            } catch (InterruptedException e) {
+                __log.info(e);
+            }
+        }
+    }
+
+    public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
+        _dehydrationPolicy = dehydrationPolicy;
+    }
+
+    public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
+        _contexts.mexContext = mexContext;
+    }
+
+    public void setScheduler(Scheduler scheduler) throws BpelEngineException {
+        _contexts.scheduler = scheduler;
+    }
+
+    public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
+        _contexts.eprContext = eprContext;
+    }
+
+    /**
+     * Set the DAO connection factory. The DAO is used by the BPEL engine to
+     * persist information about active processes.
+     *
+     * @param daoCF
+     *            {@link BpelDAOConnectionFactory} implementation.
+     */
+    public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
+        _contexts.dao = daoCF;
+    }
+
+    public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
+        _contexts.inMemDao = daoCF;
+    }
+
+    public void setBindingContext(BindingContext bc) {
+        _contexts.bindingContext = bc;
     }
 
-    //   
-    // I've moved this code out of BpelEngineImpl, it should be here not there.
-    // -Maciej 12/22/06
-    // /**
-    //     
-    // */
-    // private class ProcessDefReaper implements Runnable {
-    // public void run() {
-    // try {
-    // while (true) {
-    // Thread.sleep(10000);
-    // _mngmtLock.writeLock().lock();
-    // try {
-    // for (BpelProcess process : _activeProcesses.values()) {
-    // Long lru;
-    // synchronized(_processesLRU) {
-    // lru = _processesLRU.get(process._pid);
-    // }
-    // if (lru != null && process._oprocess != null
-    // && System.currentTimeMillis() - lru > _processMaxAge) {
-    // process._oprocess = null;
-    // __log.debug("Process definition reaper cleaning " + process._pid);
-    // }
-    // Thread.sleep(10);
-    // }
-    // } finally {
-    // _mngmtLock.writeLock().unlock();
-    // }
-    // }
-    // } catch (InterruptedException e) {
-    // __log.info(e);
-    // }
-    // }
-    // }
 }

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,56 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public class CountLRUDehydrationPolicy implements DehydrationPolicy {
+
+    /** Maximum age of a process before it is quiesced */
+    private long _processMaxAge = 20 * 60 * 1000;
+    /** Maximum process count before oldest ones get quiesced */
+    private int _processMaxCount = 1000;
+
+    public List<BpelProcess> markForDehydration(List<BpelProcess> runningProcesses) {
+        ArrayList<BpelProcess> ripped = new ArrayList<BpelProcess>();
+
+        if (_processMaxAge > 0) {
+            // The oldies have to go first
+            long now = System.currentTimeMillis();
+            for (BpelProcess process : runningProcesses) {
+                if (now - process.getLastUsed() > _processMaxAge) {
+                    ripped.add(process);
+                }
+            }
+        }
+
+        // If it's not enough, other ones must be put to the axe
+        if (runningProcesses.size() - ripped.size() > _processMaxCount) {
+            runningProcesses.removeAll(ripped);
+            Collections.sort(runningProcesses, new Comparator<BpelProcess>() {
+                public int compare(BpelProcess p1, BpelProcess p2) {
+                    if (p1.getLastUsed() > p2.getLastUsed()) return 1;
+                    if (p1.getLastUsed() < p2.getLastUsed()) return -1;
+                    return 0;
+                }
+            });
+            for (int m = _processMaxCount; m < runningProcesses.size(); m++) {
+                ripped.add(runningProcesses.get(m));
+            }
+        }
+
+        return ripped;
+    }
+
+    public void setProcessMaxAge(long processMaxAge) {
+        _processMaxAge = processMaxAge;
+    }
+
+    public void setProcessMaxCount(int processMaxCount) {
+        _processMaxCount = processMaxCount;
+    }
+}

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,20 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.List;
+
+/**
+ * Defines a policy to dehydrate running processes based on a limit in total
+ * process count or processes that haven't been used for a while.
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public interface DehydrationPolicy {
+
+    /**
+     * Checks the currently running processes and marks some of them for
+     * dehydration according to a specifically configured policy. The
+     * returned processes will be dehydrated by the engine.
+     * @param runningProcesses all running (currently hydrated) processes
+     * @return processes elected for dehydration
+     */
+    List<BpelProcess> markForDehydration(List<BpelProcess> runningProcesses);
+}

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,304 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.common.InvalidMessageException;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageRouteDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.evt.CorrelationMatchEvent;
+import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
+import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.intercept.InterceptorInvoker;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.OScope;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
+import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.utils.ArrayUtils;
+import org.apache.ode.utils.ObjectPrinter;
+import org.apache.ode.utils.msg.MessageBundle;
+import org.w3c.dom.Element;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+import java.util.*;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
+    private static final Log __log = LogFactory.getLog(BpelProcess.class);
+    private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
+
+    /** The local endpoint for this "myrole". */
+    public Endpoint _endpoint;
+
+    PartnerLinkMyRoleImpl(BpelProcess process, OPartnerLink plink, Endpoint endpoint) {
+        super(process, plink);
+        _endpoint = endpoint;
+    }
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer("{PartnerLinkRole-");
+        buf.append(_plinkDef.name);
+        buf.append('.');
+        buf.append(_plinkDef.myRoleName);
+        buf.append(" on ");
+        buf.append(_endpoint);
+        buf.append('}');
+
+        return buf.toString();
+    }
+
+    /**
+     * Called when an input message has been received.
+     *
+     * @param mex
+     *            exchange to which the message is related
+     */
+    public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
+                    "messageExchange", mex }));
+        }
+
+        Operation operation = getMyRoleOperation(mex.getOperationName());
+        if (operation == null) {
+            __log.error(__msgs.msgUnknownOperation(mex.getOperationName(), _plinkDef.myRolePortType.getQName()));
+            mex.setFailure(MessageExchange.FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
+            return;
+        }
+
+        mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
+        mex.setPortOp(_plinkDef.myRolePortType, operation);
+        mex.setPattern(operation.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
+                : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE);
+
+        // Is this a /possible/ createInstance Operation?
+        boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
+
+        // now, the tricks begin: when a message arrives we have to see if
+        // there
+        // is anyone waiting for it.
+        // Get the correlator, a persisted communnication-reduction data
+        // structure
+        // supporting correlation correlationKey matching!
+        String correlatorId = BpelProcess.genCorrelatorId(_plinkDef, operation.getName());
+
+        CorrelatorDAO correlator = _process.getProcessDAO().getCorrelator(correlatorId);
+
+        CorrelationKey[] keys;
+        MessageRouteDAO messageRoute = null;
+
+        // We need to compute the correlation keys (based on the operation
+        // we can  infer which correlation keys to compute - this is merely a set
+        // consisting of each correlationKey used in each correlation sets
+        // that is ever referenced in an <receive>/<onMessage> on this
+        // partnerlink/operation.
+        try {
+            keys = computeCorrelationKeys(mex);
+        } catch (InvalidMessageException ime) {
+            // We'd like to do a graceful exit here, no sense in rolling back due to a
+            // a message format problem.
+            __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
+            mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
+            return;
+        }
+
+        String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+        String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
+        if (__log.isDebugEnabled()) {
+            __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
+                    + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
+                    + " partnerSessionId=" + partnerSessionId);
+        }
+
+        CorrelationKey matchedKey = null;
+
+        // Try to find a route for one of our keys.
+        for (CorrelationKey key : keys) {
+            messageRoute = correlator.findRoute(key);
+            if (messageRoute != null) {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+                }
+                matchedKey = key;
+                break;
+            }
+        }
+
+        // TODO - ODE-58
+
+        // If no luck, and this operation qualifies for create-instance
+        // treatment, then create a new process
+        // instance.
+        if (messageRoute == null && isCreateInstnace) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+            }
+            ProcessDAO processDAO = _process.getProcessDAO();
+
+            if (_process._pconf.getState() == ProcessState.RETIRED) {
+                throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+            }
+
+            if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+                __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+                return;
+            }
+
+            ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
+
+            BpelRuntimeContextImpl instance = _process
+                    .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
+
+            // send process instance event
+            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace,
+                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+            evt.setPortType(mex.getPortType().getQName());
+            evt.setOperation(operation.getName());
+            evt.setMexId(mex.getMessageExchangeId());
+            _process._debugger.onEvent(evt);
+            _process.saveEvent(evt, newInstance);
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE);
+            mex.getDAO().setInstance(newInstance);
+
+            instance.execute();
+        } else if (messageRoute != null) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
+                        + messageRoute.getTargetInstance().getInstanceId());
+            }
+
+            ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
+
+            // Reload process instance for DAO.
+            BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
+            instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
+
+            // Kill the route so some new message does not get routed to
+            // same process instance.
+            correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
+
+            // send process instance event
+            CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace,
+                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(),
+                    instanceDao.getInstanceId(), matchedKey);
+            evt.setPortType(mex.getPortType().getQName());
+            evt.setOperation(operation.getName());
+            evt.setMexId(mex.getMessageExchangeId());
+
+            _process._debugger.onEvent(evt);
+            // store event
+            _process.saveEvent(evt, instanceDao);
+
+            // EXPERIMENTAL -- LOCK
+            // instanceDao.lock();
+
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+            mex.getDAO().setInstance(messageRoute.getTargetInstance());
+            instance.execute();
+        } else {
+            if (__log.isDebugEnabled()) {
+                __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+            }
+
+            if (!mex.isAsynchronous()) {
+                mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+
+            } else {
+                // send event
+                CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+                        .getOperation().getName(), mex.getMessageExchangeId(), keys);
+
+                evt.setProcessId(_process.getProcessDAO().getProcessId());
+                evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+                _process._debugger.onEvent(evt);
+
+                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+
+                // No match, means we add message exchange to the queue.
+                correlator.enqueueMessage(mex.getDAO(), keys);
+
+            }
+        }
+
+        // Now we have to update our message exchange status. If the <reply>
+        // was not hit during the
+        // invocation, then we will be in the "REQUEST" phase which means
+        // that either this was a one-way
+        // or a two-way that needs to delivery the reply asynchronously.
+        if (mex.getStatus() == MessageExchange.Status.REQUEST) {
+            mex.setStatus(MessageExchange.Status.ASYNC);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private Operation getMyRoleOperation(String operationName) {
+        Operation op = _plinkDef.getMyRoleOperation(operationName);
+        return op;
+    }
+
+    private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) {
+        Operation operation = mex.getOperation();
+        Element msg = mex.getRequest().getMessage();
+        javax.wsdl.Message msgDescription = operation.getInput().getMessage();
+        List<CorrelationKey> keys = new ArrayList<CorrelationKey>();
+
+        Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
+
+        for (OScope.CorrelationSet cset : csets) {
+            CorrelationKey key = computeCorrelationKey(cset,
+                    _process.getOProcess().messageTypes.get(msgDescription.getQName()), msg);
+            keys.add(key);
+        }
+
+        // Let's creata a key based on the sessionId
+        String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+        if (mySessionId != null)
+            keys.add(new CorrelationKey(-1, new String[] { mySessionId }));
+
+        return keys.toArray(new CorrelationKey[keys.size()]);
+    }
+
+    private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
+            Element msg) {
+        String[] values = new String[cset.properties.size()];
+
+        int jIdx = 0;
+        for (Iterator j = cset.properties.iterator(); j.hasNext(); ++jIdx) {
+            OProcess.OProperty property = (OProcess.OProperty) j.next();
+            OProcess.OPropertyAlias alias = property.getAlias(messagetype);
+
+            if (alias == null) {
+                // TODO: Throw a real exception! And catch this at compile
+                // time.
+                throw new IllegalArgumentException("No alias matching property '" + property.name
+                        + "' with message type '" + messagetype + "'");
+            }
+
+            String value;
+            try {
+                value = _process.extractProperty(msg, alias, msg.toString());
+            } catch (FaultException fe) {
+                String emsg = __msgs.msgPropertyAliasDerefFailedOnMessage(alias.getDescription(), fe.getMessage());
+                __log.error(emsg, fe);
+                throw new InvalidMessageException(emsg, fe);
+            }
+            values[jIdx] = value;
+        }
+
+        CorrelationKey key = new CorrelationKey(cset.getId(), values);
+        return key;
+    }
+
+}

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,35 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
+    static final Log __log = LogFactory.getLog(BpelProcess.class);
+
+    Endpoint _initialPartner;
+
+    public PartnerRoleChannel _channel;
+
+    PartnerLinkPartnerRoleImpl(BpelProcess process, OPartnerLink plink, Endpoint initialPartner) {
+        super(process, plink);
+        _initialPartner = initialPartner;
+    }
+
+    public void processPartnerResponse(PartnerRoleMessageExchangeImpl messageExchange) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("Processing partner's response for partnerLink: " + messageExchange);
+        }
+
+        BpelRuntimeContextImpl processInstance =
+                _process.createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
+        processInstance.invocationResponse(messageExchange);
+        processInstance.execute();
+    }
+
+}

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,31 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+abstract class PartnerLinkRoleImpl {
+    protected OPartnerLink _plinkDef;
+    protected EndpointReference _initialEPR;
+    protected BpelProcess _process;
+
+    PartnerLinkRoleImpl(BpelProcess process, OPartnerLink plink) {
+        _plinkDef = plink;
+        _process = process;
+    }
+    String getPartnerLinkName() {
+        return _plinkDef.name;
+    }
+    /**
+     * Get the initial value of this role's EPR. This value is obtained from
+     * the integration layer when the process is enabled on the server.
+     *
+     * @return initial epr
+     */
+    EndpointReference getInitialEPR() {
+        return _initialEPR;
+    }
+
+}

Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java Tue Feb  6 14:17:56 2007
@@ -0,0 +1,9 @@
+package org.apache.ode.bpel.engine;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public interface ProcessLifecycleCallback {
+
+    void hydrated(BpelProcess process);
+}

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java Tue Feb  6 14:17:56 2007
@@ -18,39 +18,66 @@
  */
 package org.apache.ode.bpel.engine;
 
-import org.apache.ode.jacob.soup.ReplacementMap;
 import org.apache.ode.bpel.o.OBase;
 import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.jacob.soup.ReplacementMap;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 
 /**
  * A JACOB {@link ReplacementMap} implementation that eliminates unnecessary serialization
  * of the (constant) compiled process model.
  */
 class ReplacementMapImpl implements ReplacementMap {
-  private OProcess _oprocess;
+    private OProcess _oprocess;
 
-  ReplacementMapImpl(OProcess oprocess) {
-    _oprocess = oprocess;
-  }
-
-  public boolean isReplacement(Object obj) {
-    return obj instanceof BpelProcess.OBaseReplacementImpl;
-  }
-
-  public Object getOriginal(Object replacement) throws IllegalArgumentException {
-    if (!(replacement instanceof BpelProcess.OBaseReplacementImpl))
-      throw new IllegalArgumentException("Not OBaseReplacementObject!");
-    return _oprocess.getChild(((BpelProcess.OBaseReplacementImpl)replacement)._id);
-  }
-
-  public Object getReplacement(Object original) throws IllegalArgumentException {
-    if (!(original instanceof OBase))
-      throw new IllegalArgumentException("Not OBase!");
-    return new BpelProcess.OBaseReplacementImpl(((OBase)original).getId());
-  }
-
-  public boolean isReplaceable(Object obj) {
-    return obj instanceof OBase;
-  }
+    ReplacementMapImpl(OProcess oprocess) {
+        _oprocess = oprocess;
+    }
+
+    public boolean isReplacement(Object obj) {
+        return obj instanceof OBaseReplacementImpl;
+    }
+
+    public Object getOriginal(Object replacement) throws IllegalArgumentException {
+        if (!(replacement instanceof OBaseReplacementImpl))
+            throw new IllegalArgumentException("Not OBaseReplacementObject!");
+        return _oprocess.getChild(((OBaseReplacementImpl)replacement)._id);
+    }
+
+    public Object getReplacement(Object original) throws IllegalArgumentException {
+        if (!(original instanceof OBase))
+            throw new IllegalArgumentException("Not OBase!");
+        return new OBaseReplacementImpl(((OBase)original).getId());
+    }
+
+    public boolean isReplaceable(Object obj) {
+        return obj instanceof OBase;
+    }
+
+    /**
+     * Replacement object for serializtation of the {@link OBase} (compiled
+     * BPEL) objects in the JACOB VPU.
+     */
+    public static final class OBaseReplacementImpl implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        int _id;
+
+        public OBaseReplacementImpl() {
+        }
+        public OBaseReplacementImpl(int id) {
+            _id = id;
+        }
+        public void readExternal(ObjectInput in) throws IOException {
+            _id = in.readInt();
+        }
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(_id);
+        }
+    }
 
 }

Modified: incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Feb  6 14:17:56 2007
@@ -18,35 +18,9 @@
  */
 package org.apache.ode.bpel.runtime;
 
-import java.io.File;
-import java.sql.DriverManager;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
 import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.store.ProcessStoreImpl;
@@ -58,6 +32,20 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 
 class MockBpelServer {
 
@@ -242,7 +230,7 @@
 
     protected BindingContext createBindingContext() {
         _bindContext = new BindingContext() {
-            public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint, PortType portType) {
+            public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
                 final Document doc = DOMUtils.newDocument();
                 Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
                     EndpointReference.SERVICE_REF_QNAME.getLocalPart());

Modified: incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java (original)
+++ incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java Tue Feb  6 14:17:56 2007
@@ -37,9 +37,6 @@
  */
 package org.apache.ode.test;
 
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
 import org.apache.ode.bpel.iapi.BindingContext;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
@@ -48,11 +45,13 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
 public class BindingContextImpl implements BindingContext {
 	
 
-	public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint,
-			PortType portType) {
+	public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
 		final Document doc = DOMUtils.newDocument();
 		Element serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
                 EndpointReference.SERVICE_REF_QNAME.getLocalPart());

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java Tue Feb  6 14:17:56 2007
@@ -42,10 +42,9 @@
         _ode = ode;
     }
 
-    public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint,
-            PortType portType) {
+    public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
         try {
-            return _ode.activateEndpoint(processId, myRoleEndpoint, portType.getQName());
+            return _ode.activateEndpoint(processId, myRoleEndpoint);
         } catch (Exception ex) {
             throw new ContextException("Could not activate endpoint " + myRoleEndpoint + " for process " + processId,
                     ex);

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java Tue Feb  6 14:17:56 2007
@@ -24,7 +24,6 @@
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.ProcessStore;
 import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
 import org.apache.ode.jbi.msgmap.Mapper;
 import org.apache.ode.jbi.util.WSDLFlattener;
@@ -160,7 +159,7 @@
         return (TransactionManager) getContext().getTransactionManager();
     }
 
-    public MyEndpointReference activateEndpoint(QName pid, Endpoint endpoint, QName portType) throws Exception {
+    public MyEndpointReference activateEndpoint(QName pid, Endpoint endpoint) throws Exception {
         if (__log.isDebugEnabled()) {
             __log.debug("Activate endpoint: " + endpoint);
         }
@@ -168,8 +167,8 @@
         OdeService service = new OdeService(this, endpoint);
         try {
             ProcessConf pc = _store.getProcessConfiguration(pid);
-            Definition def = pc.getDefinitionForPortType(portType);
-            def = new WSDLFlattener(def).getDefinition(portType);
+            Definition def = pc.getDefinitionForService(endpoint.serviceName);
+            def = new WSDLFlattener(def).getDefinition(endpoint);
             Document doc = WSDLFactory.newInstance().newWSDLWriter().getDocument(def);
             addEndpointDoc(endpoint.serviceName, doc);
         } catch (Exception e) {

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java Tue Feb  6 14:17:56 2007
@@ -16,32 +16,18 @@
  */
 package org.apache.ode.jbi.util;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.ibm.wsdl.extensions.schema.SchemaImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
 
-import javax.wsdl.Definition;
-import javax.wsdl.Fault;
-import javax.wsdl.Import;
-import javax.wsdl.Input;
-import javax.wsdl.Message;
-import javax.wsdl.Operation;
-import javax.wsdl.Output;
-import javax.wsdl.Part;
-import javax.wsdl.PortType;
-import javax.wsdl.Types;
+import javax.wsdl.*;
 import javax.wsdl.extensions.ExtensibilityElement;
 import javax.wsdl.extensions.schema.SchemaImport;
 import javax.wsdl.factory.WSDLFactory;
 import javax.xml.namespace.QName;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.wsdl.extensions.schema.SchemaImpl;
+import java.net.URI;
+import java.util.*;
 
 public class WSDLFlattener {
 
@@ -49,7 +35,7 @@
     
     private Definition definition;
     private SchemaCollection schemas;
-    private Map flattened;
+    private Map<Endpoint,Definition> flattened;
     private boolean initialized;
     
     public WSDLFlattener() {
@@ -82,15 +68,16 @@
     
     /**
      * Retrieve a flattened definition for a given port type name.
-     * @param portType the port type to create a flat definition for
+     * @param endpoint the port type to create a flat definition for
      * @return a flat definition for the port type
      * @throws Exception if an error occurs
      */
-    public Definition getDefinition(QName portType) throws Exception {
-        Definition def = (Definition) flattened.get(portType);
+    public Definition getDefinition(Endpoint endpoint) throws Exception {
+        Definition def = (Definition) flattened.get(endpoint);
         if (def == null) {
-            def = flattenDefinition(portType);
-            flattened.put(portType, def);
+            PortType pt = def.getService(endpoint.serviceName).getPort(endpoint.portName).getBinding().getPortType();
+            def = flattenDefinition(pt.getQName());
+            flattened.put(endpoint, def);
         }
         return def;
     }