You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2009/01/07 16:58:11 UTC

svn commit: r732377 - in /ode/branches/APACHE_ODE_1.X: bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/ dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/

Author: mriou
Date: Wed Jan  7 07:58:10 2009
New Revision: 732377

URL: http://svn.apache.org/viewvc?rev=732377&view=rev
Log:
Migration to using names in place of omodel ids in correlation and partner link references from both the database and the runtime. Problem was that omodel ids tend to change a lot as soon as the process gets even slightly updated. And then past instances can't complete anymore.

Added:
    ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java
    ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java

Added: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,10 @@
+package org.apache.ode.bpel.dao;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+
+public interface CorrelatorMessageDAO {
+
+    CorrelationKey getCorrelationKey();
+
+    void setCorrelationKey(CorrelationKey ckey);
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,114 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.ReplacementMapImpl;
+import org.apache.ode.bpel.engine.OutstandingRequestManager;
+import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.o.*;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import javax.wsdl.Operation;
+import java.util.*;
+import java.io.ObjectStreamClass;
+import java.io.FileInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Migrates the correlation key values to a scheme containing the OModel correlation
+ * set id to one using its name. So something like 1~abc~de will become foo~abc~de.
+ */
+public class CorrelationKeyMigration implements Migration {
+    private static final Log __log = LogFactory.getLog(CorrelationKeyMigration.class);
+
+    public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+        // Map only used to avoid duplicates, set would force to re-implement equals
+        HashMap<Long,ProcessInstanceDAO> instances = new HashMap<Long,ProcessInstanceDAO>();
+
+        // Migrate correlation set values
+        Collection<CorrelationSetDAO> csets = connection.getActiveCorrelationSets();
+        for (CorrelationSetDAO cset : csets) {
+            CorrelationKey ckey = cset.getValue();
+            instances.put(cset.getInstance().getInstanceId(), cset.getInstance());
+            Integer ckeyInt = asInt(ckey.getCorrelationSetName());
+            if (ckeyInt != null) {
+                OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, cset.getProcess().getProcessId());
+                if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade set " + ckey.toCanonicalString());
+                else {
+                    cset.setValue(null, new CorrelationKey(ocset.name, ckey.getValues()));
+                }
+            }
+        }
+
+        // Migrate routes and message queue for each correlator
+        for (BpelProcess process : registeredProcesses) {
+            __log.debug("Migrating correlators for process " + process.getConf().getProcessId());
+            ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+
+            for (OPartnerLink plink : process.getOProcess().getAllPartnerLinks()) {
+                if (plink.hasMyRole()) {
+                    for (Iterator opI = plink.myRolePortType.getOperations().iterator(); opI.hasNext();) {
+                        Operation op = (Operation)opI.next();
+                        try {
+                            CorrelatorDAO corr = processDao.getCorrelator(plink.getName() + "." + op.getName());
+                            // Changing all routes
+                            for (MessageRouteDAO routeDAO : corr.getAllRoutes()) {
+                                CorrelationKey oldKey = routeDAO.getCorrelationKey();
+                                Integer ckeyInt = asInt(oldKey.getCorrelationSetName());
+                                if (ckeyInt != null) {
+                                    OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, process.getConf().getProcessId());
+                                    if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade route " + oldKey.toCanonicalString());
+                                    else {
+                                        routeDAO.setCorrelationKey(new CorrelationKey(ocset.name, oldKey.getValues()));
+                                    }
+                                }
+                            }
+                            // Changing all queued messages
+                            for (CorrelatorMessageDAO corrMsgDAO : corr.getAllMessages()) {
+                                CorrelationKey oldKey = corrMsgDAO.getCorrelationKey();
+                                Integer ckeyInt = asInt(oldKey.getCorrelationSetName());
+                                if (ckeyInt != null) {
+                                    OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, process.getConf().getProcessId());
+                                    if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade route " + oldKey.toCanonicalString());
+                                    else {
+                                        corrMsgDAO.setCorrelationKey(new CorrelationKey(ocset.name, oldKey.getValues()));
+                                    }
+                                }
+                            }
+                            __log.debug("Migrated routes and message queue for correlator " + plink.getName() + "." + op.getName());
+                        } catch (IllegalArgumentException e) {
+                            __log.debug("Correlator with id " + plink.getId() + "." +
+                                    op.getName() + " couldn't be found, skipping.");
+                        }
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private Integer asInt(String s) {
+        try {
+            return Integer.parseInt(s);
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    private OScope.CorrelationSet findCorrelationById(int ckeyInt, Set<BpelProcess> processes, QName processId) {
+        for (BpelProcess process : processes) {
+            if (process.getConf().getProcessId().equals(processId)) {
+                OBase ocset = process.getOProcess().getChild(ckeyInt);
+                if (ocset instanceof OScope.CorrelationSet) return (OScope.CorrelationSet)ocset;
+            }
+        }
+        return null;
+    }
+
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,172 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.OutstandingRequestManager;
+import org.apache.ode.bpel.engine.ReplacementMapImpl;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import java.util.Set;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.ObjectStreamClass;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Migrates the database from using single correlations to multiple correlations support.
+ */
+public class CorrelationKeySetMigration implements Migration {
+    private static final Log __log = LogFactory.getLog(CorrelationKeySetMigration.class);
+
+    public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+        boolean v1First = true;
+        for (BpelProcess process : registeredProcesses) {
+            ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+            Collection<ProcessInstanceDAO> pis = processDao.getActiveInstances();
+
+            // Migrate the correlation key stored in the jacob state of the instance
+            for (ProcessInstanceDAO instance : pis) {
+                __log.debug("Migrating correlation key in jacob for instance " + instance.getInstanceId());
+                OProcess oproc = findOProcess(registeredProcesses, instance.getProcess().getProcessId());
+                if (v1First) {
+                    if (!updateV1Key(instance, oproc)) {
+                        v1First = false;
+                        updateV2Key(instance, oproc);
+                    }
+                } else {
+                    if (!updateV2Key(instance, oproc)) {
+                        v1First = true;
+                        updateV1Key(instance, oproc);
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private boolean updateV1Key(ProcessInstanceDAO instance, OProcess oproc) {
+        ExecutionQueueImpl soup;
+        try {
+            soup = readOldState(instance, oproc, getClass().getClassLoader(), true);
+        } catch (Exception e) {
+            __log.debug("  failed to read a v1 state for instance " + instance.getInstanceId());
+            ExecutionQueueImpl._classDescriptors.clear();
+            return false;
+        }
+        try {
+            OutstandingRequestManager orm = (OutstandingRequestManager) soup.getGlobalData();
+            for (OutstandingRequestManager.Entry entry : orm._byChannel.values()) {
+                Selector[] newSelectors = new Selector[entry.selectors.length];
+                int index = 0;
+                for (Object selector : entry.selectors) {
+                    OldSelector sel = (OldSelector)selector;
+                    Object selCKey = sel.correlationKey;
+                    OldCorrelationKey old = (OldCorrelationKey) selCKey;
+                    __log.debug("   Changing V1 key " + old.toCanonicalString());
+
+                    CorrelationKeySet newKeySet = new CorrelationKeySet();
+                    newKeySet.add(new CorrelationKey(""+old.getCSetId(), old.getValues()));
+                    Selector newSelector = new Selector(sel.idx, sel.plinkInstance, sel.opName,
+                            sel.oneWay, sel.messageExchangeId, newKeySet, "one");
+                    newSelector.correlationKey = new CorrelationKey(""+old.getCSetId(), old.getValues());
+                    newSelectors[index++] = newSelector;
+                }
+                entry.selectors = newSelectors;
+            }
+
+            writeOldState(instance, soup);
+        } finally {
+            ExecutionQueueImpl._classDescriptors.clear();
+        }
+        return true;
+    }
+
+    private boolean updateV2Key(ProcessInstanceDAO instance, OProcess oproc) {
+        ExecutionQueueImpl soup;
+        try {
+            soup = readOldState(instance, oproc, getClass().getClassLoader(), false);
+        } catch (Exception e) {
+            __log.debug("  failed to read a v2 state for instance " + instance.getInstanceId());
+            ExecutionQueueImpl._classDescriptors.clear();
+            return false;
+        }
+        OutstandingRequestManager orm = (OutstandingRequestManager) soup.getGlobalData();
+        for (OutstandingRequestManager.Entry entry : orm._byChannel.values()) {
+            Selector[] newSelectors = new Selector[entry.selectors.length];
+            int index = 0;
+            for (Object selector : entry.selectors) {
+                OldSelector sel = (OldSelector)selector;
+                CorrelationKey selCKey = (CorrelationKey) sel.correlationKey;
+                __log.debug("   Changing V2 key " + selCKey.toCanonicalString());
+
+                CorrelationKeySet newKeySet = new CorrelationKeySet();
+                newKeySet.add(new CorrelationKey(""+selCKey.getCorrelationSetName(), selCKey.getValues()));
+                Selector newSelector = new Selector(sel.idx, sel.plinkInstance, sel.opName,
+                        sel.oneWay, sel.messageExchangeId, newKeySet, "one");
+                newSelector.correlationKey = new CorrelationKey(""+selCKey.getCorrelationSetName(), selCKey.getValues());
+                newSelectors[index++] = newSelector;
+            }
+            entry.selectors = newSelectors;
+        }
+
+        writeOldState(instance, soup);
+        return true;
+    }
+
+    private ExecutionQueueImpl readOldState(ProcessInstanceDAO instance, OProcess oprocess,
+                                            ClassLoader cl, boolean changeKey) {
+        try {
+            ExecutionQueueImpl soup = new ExecutionQueueImpl(cl);
+            ObjectStreamClass osc;
+            if (changeKey) {
+                osc = ObjectStreamClass.lookup(Class.forName(
+                        "org.apache.ode.bpel.engine.migration.OldCorrelationKey", true, cl));
+                ExecutionQueueImpl._classDescriptors.put("org.apache.ode.bpel.common.CorrelationKey", osc);
+            }
+            osc = ObjectStreamClass.lookup(Class.forName(
+                    "org.apache.ode.bpel.engine.migration.OldSelector", true, cl));
+            ExecutionQueueImpl._classDescriptors.put("org.apache.ode.bpel.runtime.Selector", osc);
+            osc = ObjectStreamClass.lookup(Class.forName(
+                    "[Lorg.apache.ode.bpel.engine.migration.OldSelector;", true, getClass().getClassLoader()));
+            ExecutionQueueImpl._classDescriptors.put("[Lorg.apache.ode.bpel.runtime.Selector;", osc);
+
+            soup.setReplacementMap(new ReplacementMapImpl(oprocess));
+            ByteArrayInputStream iis = new ByteArrayInputStream(instance.getExecutionState());
+            soup.read(iis);
+            return soup;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void writeOldState(ProcessInstanceDAO instance, ExecutionQueueImpl soup) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            soup.write(bos);
+            bos.close();
+            instance.setExecutionState(bos.toByteArray());
+            ExecutionQueueImpl._classDescriptors.clear();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private OProcess findOProcess(Set<BpelProcess> registeredProcesses, QName name) {
+        for (BpelProcess process : registeredProcesses) {
+            if (process.getConf().getProcessId().equals(name)) return process.getOProcess();
+        }
+        return null;
+    }
+
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,45 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.wsdl.Operation;
+import java.util.Set;
+import java.util.Iterator;
+
+/**
+ * Migrates correlators used for routing from a selection scheme that's based on
+ * partnerLinkId+"."+operationName to partnerLinkName+"."+operationName
+ */
+public class CorrelatorsMigration implements Migration {
+    private static final Log __log = LogFactory.getLog(CorrelatorsMigration.class);
+
+    public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+        for (BpelProcess process : registeredProcesses) {
+            __log.debug("Migrating correlators for process " + process.getConf().getProcessId());
+            ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+            
+            for (OPartnerLink plink : process.getOProcess().getAllPartnerLinks()) {
+                if (plink.hasMyRole()) {
+                    for (Iterator opI = plink.myRolePortType.getOperations().iterator(); opI.hasNext();) {
+                        Operation op = (Operation)opI.next();
+                        try {
+                            CorrelatorDAO corr = processDao.getCorrelator(plink.getId() + "." + op.getName());
+                            corr.setCorrelatorId(plink.getName() + "." + op.getName());
+                            __log.debug("Migrated correlator " + plink.getName() + "." + op.getName());
+                        } catch (IllegalArgumentException e) {
+                            __log.debug("Correlator with id " + plink.getId() + "." +
+                                    op.getName() + " couldn't be found, skipping.");
+                        }
+                    }
+                }
+            }
+        }
+        return true;
+    }
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,22 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+
+import java.util.Set;
+
+/**
+ * Implement and add to the list of migrations in MigrationHandler to allow database
+ * level migration.
+ */
+public interface Migration {
+
+    /**
+     * All database migrations are run in the same transaction so if one fails, they will
+     * all be rollbacked. There are two ways to fail: either return false or throw an
+     * exception. The difference is that throwing an exception will stop the server
+     * startup whereas returning false will let the server continue its starting
+     * cycle and run on the non-migrated data.
+     */
+    boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection);
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,115 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelDatabase;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Callable;
+
+/**
+ * Checks database schema versions and migrates when necessary.
+ */
+public class MigrationHandler {
+    private static final Log __log = LogFactory.getLog(MigrationHandler.class);
+
+    public static final int CURRENT_SCHEMA_VERSION = 3;
+
+    private Contexts _contexts;
+    private List<Object[]> migrations = new ArrayList<Object[]>() {{
+        add(new Object[] { 2, new CorrelatorsMigration() });
+        add(new Object[] { 2, new CorrelationKeyMigration() });
+        add(new Object[] { 3, new CorrelationKeySetMigration() });
+    }};
+
+    public MigrationHandler(Contexts _contexts) {
+        this._contexts = _contexts;
+    }
+
+    public boolean migrate(final Set<BpelProcess> registeredProcesses) {
+        if (_contexts.dao.getDataSource() == null) {
+            __log.debug("No datasource available, stopping migration. Probably running fully in-memory.");
+            return false;
+        }
+
+        final int version = getDbVersion();
+        if (version == -1) {
+            __log.info("No schema version available from the database, migrations will be skipped.");
+            return false;
+        }
+
+        try {
+            boolean success = _contexts.scheduler.execTransaction(new Callable<Boolean>() {
+                public Boolean call() throws Exception {
+                    boolean res = true;
+                    boolean migrated = false;
+                    for (Object[] me : migrations) {
+                        if (((Integer)me[0]) > version) {
+                            __log.debug("Running migration " + me[1]);
+                            res = ((Migration)me[1]).migrate(registeredProcesses, _contexts.dao.getConnection()) && res;
+                            migrated = true;
+                        }
+                    }
+                    if (!res) _contexts.scheduler.setRollbackOnly();
+                    else if (migrated) setDbVersion(CURRENT_SCHEMA_VERSION);
+                    return res;
+                }
+            });
+            return success;
+        } catch (Exception e) {
+            __log.error("An error occured while migrating your database to a newer version of ODE, changes have " +
+                    "been aborted", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private int getDbVersion() {
+        int version = -1;
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        ResultSet rs = null;
+        try {
+            conn = _contexts.dao.getDataSource().getConnection();
+            stmt = conn.prepareStatement("SELECT VERSION FROM ODE_SCHEMA_VERSION");
+            rs = stmt.executeQuery();
+            if (rs.next()) version = rs.getInt("VERSION");
+        } catch (Exception e) {
+            // Swallow, we'll just abort based on the version number
+        } finally {
+            try {
+                if (rs != null) rs.close();
+                if (stmt != null) stmt.close();
+                if (conn != null) conn.close();
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return version;
+    }
+
+    private void setDbVersion(int version) {
+        Connection conn = null;
+        Statement stmt = null;
+        try {
+            conn = _contexts.dao.getDataSource().getConnection();
+            stmt = conn.createStatement();
+            int res = stmt.executeUpdate("UPDATE ODE_SCHEMA_VERSION SET VERSION = " + version);
+            // This should never happen but who knows?
+            if (res == 0) throw new RuntimeException("Couldn't update schema version.");
+        } catch (Exception e) {
+            // Swallow, we'll just abort based on the version number
+        } finally {
+            try {
+                if (stmt != null) stmt.close();
+                if (conn != null) conn.close();
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,167 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.utils.CollectionUtils;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.Serializable;
+
+public class OldCorrelationKey implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** CorrelationSet identifier. */
+    private int _csetId;
+    /** Key values. */
+    private String _keyValues[];
+
+    /**
+     * Constructor.
+     *
+     * @param csetId
+     *            correlation set identifier
+     * @param keyValues
+     *            correlation key values
+     */
+    public OldCorrelationKey(int csetId, String[] keyValues) {
+        _csetId = csetId;
+        _keyValues = keyValues;
+    }
+
+    public OldCorrelationKey(String canonicalForm) {
+        int firstTilde = canonicalForm.indexOf('~');
+        _csetId = Integer.parseInt(canonicalForm.substring(0, firstTilde == -1 ? canonicalForm.length() : firstTilde));
+
+        if (firstTilde != -1) {
+            List<String> keys = new ArrayList<String>();
+            char chars[] = canonicalForm.toCharArray();
+            StringBuffer work = new StringBuffer();
+            for (int i = firstTilde + 1; i < chars.length; ++i) {
+                boolean isLast = (i == chars.length - 1);
+                if (chars[i] == '~' && !isLast && chars[i + 1] == '~') {
+                    work.append(chars[i++]);
+                } else if (chars[i] == '~') {
+                    keys.add(work.toString());
+                    work = new StringBuffer();
+                } else {
+                    work.append(chars[i]);
+                }
+            }
+            keys.add(work.toString());
+            _keyValues = new String[keys.size()];
+            keys.toArray(_keyValues);
+        } else {
+            _keyValues = new String[0];
+        }
+    }
+
+    /** Return the OCorrelation id for the correlation set */
+    public int getCSetId() {
+        return _csetId;
+    }
+
+    /** Return the values for the correlation set */
+    public String[] getValues() {
+        return _keyValues;
+    }
+
+    /**
+     * Check if this key matches any member in a set of keys.
+     *
+     * @param keys
+     *            set of keys to match against
+     *
+     * @return <code>true</code> if one of the keys in the set
+     *         <code>equals(..)</code> this key, <code>false</code>
+     *         otherwise
+     */
+    public boolean isMatch(OldCorrelationKey[] keys) {
+        for (OldCorrelationKey key : keys)
+            if (key.equals(this)) {
+                return true;
+            }
+
+        return false;
+    }
+
+    /**
+     * Equals comperator method.
+     *
+     * @param o
+     *            <code>CorrelationKey</code> object to compare with
+     *
+     * @return <code>true</code> if the given object
+     */
+    public boolean equals(Object o) {
+        OldCorrelationKey okey = (OldCorrelationKey) o;
+
+        if (okey == null || okey._csetId != _csetId || okey._keyValues.length != _keyValues.length)
+            return false;
+
+        for (int i = 0; i < _keyValues.length; ++i)
+            if (!_keyValues[i].equals(okey._keyValues[i]))
+                return false;
+
+        return true;
+    }
+
+    /**
+     * Generate a hash code from the hash codes of the elements.
+     *
+     * @see java.util.HashMap#hashCode
+     * @see Object#hashCode
+     */
+    public int hashCode() {
+        int hashCode = _csetId;
+        for (String _keyValue : _keyValues)
+            hashCode ^= _keyValue.hashCode();
+        return hashCode;
+    }
+
+    public List<String> toCanonicalList() {
+        ArrayList<String> ret = new ArrayList<String>(_keyValues.length + 1);
+        ret.add(((Integer) _csetId).toString());
+        for (String i : _keyValues)
+            ret.add(i);
+        return ret;
+    }
+
+    /**
+     * @see Object#toString
+     */
+    public String toString() {
+        StringBuffer buf = new StringBuffer("{CorrelationKey ");
+        buf.append("setId=");
+        buf.append(_csetId);
+        buf.append(", values=");
+        buf.append(CollectionUtils.makeCollection(ArrayList.class, _keyValues));
+        buf.append('}');
+
+        return buf.toString();
+    }
+
+    public String toCanonicalString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append(this.getCSetId());
+        buf.append('~');
+        for (int i = 0; i < getValues().length; ++i) {
+            if (i != 0)
+                buf.append('~');
+            escapeTilde(buf, getValues()[i]);
+        }
+        return buf.toString();
+    }
+
+    static void escapeTilde(StringBuffer buf, String str) {
+        if (str == null)
+            return;
+        char[] chars = str.toCharArray();
+        for (char achar : chars) {
+            if (achar == '~') {
+                buf.append("~~");
+            } else {
+                buf.append(achar);
+            }
+        }
+    }
+}

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,37 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.runtime.PartnerLinkInstance;
+import org.apache.ode.utils.ObjectPrinter;
+
+import java.io.Serializable;
+
+public class OldSelector implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final PartnerLinkInstance plinkInstance;
+    // here for the backward compatibility
+    @SuppressWarnings("unused")
+    public Object correlationKey = null;
+  public final String opName;
+  public final String messageExchangeId;
+  public final int idx;
+  public final boolean oneWay;
+
+  OldSelector(int idx, PartnerLinkInstance plinkInstance, String opName, boolean oneWay, String mexId) {
+    this.idx = idx;
+    this.plinkInstance = plinkInstance;
+    this.opName = opName;
+    this.messageExchangeId = mexId;
+    this.oneWay = oneWay;
+  }
+
+  public String toString() {
+    return ObjectPrinter.toString(this, new Object[] {
+      "plinkInstnace", plinkInstance,
+      "opName" ,opName,
+      "oneWay", oneWay ? "yes" : "no",
+      "mexId", messageExchangeId,
+      "idx", Integer.valueOf(idx)
+    });
+  }
+}

Added: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java (added)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java Wed Jan  7 07:58:10 2009
@@ -0,0 +1,25 @@
+package org.apache.ode.daohib.bpel;
+
+import org.apache.ode.bpel.dao.CorrelatorMessageDAO;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.daohib.SessionManager;
+
+public class CorrelatorMessageDaoImpl extends HibernateDao implements CorrelatorMessageDAO {
+
+    private HCorrelatorMessage _hobj;
+
+    public CorrelatorMessageDaoImpl(SessionManager sm, HCorrelatorMessage hobj) {
+        super(sm, hobj);
+        entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+        _hobj = hobj;
+    }
+
+    public CorrelationKey getCorrelationKey() {
+        return new CorrelationKey(_hobj.getCorrelationKey());
+    }
+
+    public void setCorrelationKey(CorrelationKey ckey) {
+        _hobj.setCorrelationKey(ckey.toCanonicalString());
+    }
+}