You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2009/01/07 16:58:11 UTC
svn commit: r732377 - in /ode/branches/APACHE_ODE_1.X:
bpel-dao/src/main/java/org/apache/ode/bpel/dao/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/
dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/
Author: mriou
Date: Wed Jan 7 07:58:10 2009
New Revision: 732377
URL: http://svn.apache.org/viewvc?rev=732377&view=rev
Log:
Migration to using names in place of omodel ids in correlation and partner link references from both the database and the runtime. Problem was that omodel ids tend to change a lot as soon as the process gets even slightly updated. And then past instances can't complete anymore.
Added:
ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java
Added: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorMessageDAO.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,10 @@
+package org.apache.ode.bpel.dao;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+
+public interface CorrelatorMessageDAO {
+
+ CorrelationKey getCorrelationKey();
+
+ void setCorrelationKey(CorrelationKey ckey);
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeyMigration.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,114 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.ReplacementMapImpl;
+import org.apache.ode.bpel.engine.OutstandingRequestManager;
+import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.o.*;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import javax.wsdl.Operation;
+import java.util.*;
+import java.io.ObjectStreamClass;
+import java.io.FileInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Migrates the correlation key values to a scheme containing the OModel correlation
+ * set id to one using its name. So something like 1~abc~de will become foo~abc~de.
+ */
+public class CorrelationKeyMigration implements Migration {
+ private static final Log __log = LogFactory.getLog(CorrelationKeyMigration.class);
+
+ public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+ // Map only used to avoid duplicates, set would force to re-implement equals
+ HashMap<Long,ProcessInstanceDAO> instances = new HashMap<Long,ProcessInstanceDAO>();
+
+ // Migrate correlation set values
+ Collection<CorrelationSetDAO> csets = connection.getActiveCorrelationSets();
+ for (CorrelationSetDAO cset : csets) {
+ CorrelationKey ckey = cset.getValue();
+ instances.put(cset.getInstance().getInstanceId(), cset.getInstance());
+ Integer ckeyInt = asInt(ckey.getCorrelationSetName());
+ if (ckeyInt != null) {
+ OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, cset.getProcess().getProcessId());
+ if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade set " + ckey.toCanonicalString());
+ else {
+ cset.setValue(null, new CorrelationKey(ocset.name, ckey.getValues()));
+ }
+ }
+ }
+
+ // Migrate routes and message queue for each correlator
+ for (BpelProcess process : registeredProcesses) {
+ __log.debug("Migrating correlators for process " + process.getConf().getProcessId());
+ ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+
+ for (OPartnerLink plink : process.getOProcess().getAllPartnerLinks()) {
+ if (plink.hasMyRole()) {
+ for (Iterator opI = plink.myRolePortType.getOperations().iterator(); opI.hasNext();) {
+ Operation op = (Operation)opI.next();
+ try {
+ CorrelatorDAO corr = processDao.getCorrelator(plink.getName() + "." + op.getName());
+ // Changing all routes
+ for (MessageRouteDAO routeDAO : corr.getAllRoutes()) {
+ CorrelationKey oldKey = routeDAO.getCorrelationKey();
+ Integer ckeyInt = asInt(oldKey.getCorrelationSetName());
+ if (ckeyInt != null) {
+ OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, process.getConf().getProcessId());
+ if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade route " + oldKey.toCanonicalString());
+ else {
+ routeDAO.setCorrelationKey(new CorrelationKey(ocset.name, oldKey.getValues()));
+ }
+ }
+ }
+ // Changing all queued messages
+ for (CorrelatorMessageDAO corrMsgDAO : corr.getAllMessages()) {
+ CorrelationKey oldKey = corrMsgDAO.getCorrelationKey();
+ Integer ckeyInt = asInt(oldKey.getCorrelationSetName());
+ if (ckeyInt != null) {
+ OScope.CorrelationSet ocset = findCorrelationById(ckeyInt, registeredProcesses, process.getConf().getProcessId());
+ if (ocset == null) __log.debug("Correlation set not found, couldn't upgrade route " + oldKey.toCanonicalString());
+ else {
+ corrMsgDAO.setCorrelationKey(new CorrelationKey(ocset.name, oldKey.getValues()));
+ }
+ }
+ }
+ __log.debug("Migrated routes and message queue for correlator " + plink.getName() + "." + op.getName());
+ } catch (IllegalArgumentException e) {
+ __log.debug("Correlator with id " + plink.getId() + "." +
+ op.getName() + " couldn't be found, skipping.");
+ }
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private Integer asInt(String s) {
+ try {
+ return Integer.parseInt(s);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private OScope.CorrelationSet findCorrelationById(int ckeyInt, Set<BpelProcess> processes, QName processId) {
+ for (BpelProcess process : processes) {
+ if (process.getConf().getProcessId().equals(processId)) {
+ OBase ocset = process.getOProcess().getChild(ckeyInt);
+ if (ocset instanceof OScope.CorrelationSet) return (OScope.CorrelationSet)ocset;
+ }
+ }
+ return null;
+ }
+
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelationKeySetMigration.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,172 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.OutstandingRequestManager;
+import org.apache.ode.bpel.engine.ReplacementMapImpl;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.namespace.QName;
+import java.util.Set;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.ObjectStreamClass;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Migrates the database from using single correlations to multiple correlations support.
+ */
+public class CorrelationKeySetMigration implements Migration {
+ private static final Log __log = LogFactory.getLog(CorrelationKeySetMigration.class);
+
+ public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+ boolean v1First = true;
+ for (BpelProcess process : registeredProcesses) {
+ ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+ Collection<ProcessInstanceDAO> pis = processDao.getActiveInstances();
+
+ // Migrate the correlation key stored in the jacob state of the instance
+ for (ProcessInstanceDAO instance : pis) {
+ __log.debug("Migrating correlation key in jacob for instance " + instance.getInstanceId());
+ OProcess oproc = findOProcess(registeredProcesses, instance.getProcess().getProcessId());
+ if (v1First) {
+ if (!updateV1Key(instance, oproc)) {
+ v1First = false;
+ updateV2Key(instance, oproc);
+ }
+ } else {
+ if (!updateV2Key(instance, oproc)) {
+ v1First = true;
+ updateV1Key(instance, oproc);
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private boolean updateV1Key(ProcessInstanceDAO instance, OProcess oproc) {
+ ExecutionQueueImpl soup;
+ try {
+ soup = readOldState(instance, oproc, getClass().getClassLoader(), true);
+ } catch (Exception e) {
+ __log.debug(" failed to read a v1 state for instance " + instance.getInstanceId());
+ ExecutionQueueImpl._classDescriptors.clear();
+ return false;
+ }
+ try {
+ OutstandingRequestManager orm = (OutstandingRequestManager) soup.getGlobalData();
+ for (OutstandingRequestManager.Entry entry : orm._byChannel.values()) {
+ Selector[] newSelectors = new Selector[entry.selectors.length];
+ int index = 0;
+ for (Object selector : entry.selectors) {
+ OldSelector sel = (OldSelector)selector;
+ Object selCKey = sel.correlationKey;
+ OldCorrelationKey old = (OldCorrelationKey) selCKey;
+ __log.debug(" Changing V1 key " + old.toCanonicalString());
+
+ CorrelationKeySet newKeySet = new CorrelationKeySet();
+ newKeySet.add(new CorrelationKey(""+old.getCSetId(), old.getValues()));
+ Selector newSelector = new Selector(sel.idx, sel.plinkInstance, sel.opName,
+ sel.oneWay, sel.messageExchangeId, newKeySet, "one");
+ newSelector.correlationKey = new CorrelationKey(""+old.getCSetId(), old.getValues());
+ newSelectors[index++] = newSelector;
+ }
+ entry.selectors = newSelectors;
+ }
+
+ writeOldState(instance, soup);
+ } finally {
+ ExecutionQueueImpl._classDescriptors.clear();
+ }
+ return true;
+ }
+
+ private boolean updateV2Key(ProcessInstanceDAO instance, OProcess oproc) {
+ ExecutionQueueImpl soup;
+ try {
+ soup = readOldState(instance, oproc, getClass().getClassLoader(), false);
+ } catch (Exception e) {
+ __log.debug(" failed to read a v2 state for instance " + instance.getInstanceId());
+ ExecutionQueueImpl._classDescriptors.clear();
+ return false;
+ }
+ OutstandingRequestManager orm = (OutstandingRequestManager) soup.getGlobalData();
+ for (OutstandingRequestManager.Entry entry : orm._byChannel.values()) {
+ Selector[] newSelectors = new Selector[entry.selectors.length];
+ int index = 0;
+ for (Object selector : entry.selectors) {
+ OldSelector sel = (OldSelector)selector;
+ CorrelationKey selCKey = (CorrelationKey) sel.correlationKey;
+ __log.debug(" Changing V2 key " + selCKey.toCanonicalString());
+
+ CorrelationKeySet newKeySet = new CorrelationKeySet();
+ newKeySet.add(new CorrelationKey(""+selCKey.getCorrelationSetName(), selCKey.getValues()));
+ Selector newSelector = new Selector(sel.idx, sel.plinkInstance, sel.opName,
+ sel.oneWay, sel.messageExchangeId, newKeySet, "one");
+ newSelector.correlationKey = new CorrelationKey(""+selCKey.getCorrelationSetName(), selCKey.getValues());
+ newSelectors[index++] = newSelector;
+ }
+ entry.selectors = newSelectors;
+ }
+
+ writeOldState(instance, soup);
+ return true;
+ }
+
+ private ExecutionQueueImpl readOldState(ProcessInstanceDAO instance, OProcess oprocess,
+ ClassLoader cl, boolean changeKey) {
+ try {
+ ExecutionQueueImpl soup = new ExecutionQueueImpl(cl);
+ ObjectStreamClass osc;
+ if (changeKey) {
+ osc = ObjectStreamClass.lookup(Class.forName(
+ "org.apache.ode.bpel.engine.migration.OldCorrelationKey", true, cl));
+ ExecutionQueueImpl._classDescriptors.put("org.apache.ode.bpel.common.CorrelationKey", osc);
+ }
+ osc = ObjectStreamClass.lookup(Class.forName(
+ "org.apache.ode.bpel.engine.migration.OldSelector", true, cl));
+ ExecutionQueueImpl._classDescriptors.put("org.apache.ode.bpel.runtime.Selector", osc);
+ osc = ObjectStreamClass.lookup(Class.forName(
+ "[Lorg.apache.ode.bpel.engine.migration.OldSelector;", true, getClass().getClassLoader()));
+ ExecutionQueueImpl._classDescriptors.put("[Lorg.apache.ode.bpel.runtime.Selector;", osc);
+
+ soup.setReplacementMap(new ReplacementMapImpl(oprocess));
+ ByteArrayInputStream iis = new ByteArrayInputStream(instance.getExecutionState());
+ soup.read(iis);
+ return soup;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void writeOldState(ProcessInstanceDAO instance, ExecutionQueueImpl soup) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ soup.write(bos);
+ bos.close();
+ instance.setExecutionState(bos.toByteArray());
+ ExecutionQueueImpl._classDescriptors.clear();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private OProcess findOProcess(Set<BpelProcess> registeredProcesses, QName name) {
+ for (BpelProcess process : registeredProcesses) {
+ if (process.getConf().getProcessId().equals(name)) return process.getOProcess();
+ }
+ return null;
+ }
+
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/CorrelatorsMigration.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,45 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.wsdl.Operation;
+import java.util.Set;
+import java.util.Iterator;
+
+/**
+ * Migrates correlators used for routing from a selection scheme that's based on
+ * partnerLinkId+"."+operationName to partnerLinkName+"."+operationName
+ */
+public class CorrelatorsMigration implements Migration {
+ private static final Log __log = LogFactory.getLog(CorrelatorsMigration.class);
+
+ public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+ for (BpelProcess process : registeredProcesses) {
+ __log.debug("Migrating correlators for process " + process.getConf().getProcessId());
+ ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+
+ for (OPartnerLink plink : process.getOProcess().getAllPartnerLinks()) {
+ if (plink.hasMyRole()) {
+ for (Iterator opI = plink.myRolePortType.getOperations().iterator(); opI.hasNext();) {
+ Operation op = (Operation)opI.next();
+ try {
+ CorrelatorDAO corr = processDao.getCorrelator(plink.getId() + "." + op.getName());
+ corr.setCorrelatorId(plink.getName() + "." + op.getName());
+ __log.debug("Migrated correlator " + plink.getName() + "." + op.getName());
+ } catch (IllegalArgumentException e) {
+ __log.debug("Correlator with id " + plink.getId() + "." +
+ op.getName() + " couldn't be found, skipping.");
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/Migration.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,22 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+
+import java.util.Set;
+
+/**
+ * Implement and add to the list of migrations in MigrationHandler to allow database
+ * level migration.
+ */
+public interface Migration {
+
+ /**
+ * All database migrations are run in the same transaction so if one fails, they will
+ * all be rollbacked. There are two ways to fail: either return false or throw an
+ * exception. The difference is that throwing an exception will stop the server
+ * startup whereas returning false will let the server continue its starting
+ * cycle and run on the non-migrated data.
+ */
+ boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection);
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,115 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelDatabase;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Callable;
+
+/**
+ * Checks database schema versions and migrates when necessary.
+ */
+public class MigrationHandler {
+ private static final Log __log = LogFactory.getLog(MigrationHandler.class);
+
+ public static final int CURRENT_SCHEMA_VERSION = 3;
+
+ private Contexts _contexts;
+ private List<Object[]> migrations = new ArrayList<Object[]>() {{
+ add(new Object[] { 2, new CorrelatorsMigration() });
+ add(new Object[] { 2, new CorrelationKeyMigration() });
+ add(new Object[] { 3, new CorrelationKeySetMigration() });
+ }};
+
+ public MigrationHandler(Contexts _contexts) {
+ this._contexts = _contexts;
+ }
+
+ public boolean migrate(final Set<BpelProcess> registeredProcesses) {
+ if (_contexts.dao.getDataSource() == null) {
+ __log.debug("No datasource available, stopping migration. Probably running fully in-memory.");
+ return false;
+ }
+
+ final int version = getDbVersion();
+ if (version == -1) {
+ __log.info("No schema version available from the database, migrations will be skipped.");
+ return false;
+ }
+
+ try {
+ boolean success = _contexts.scheduler.execTransaction(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ boolean res = true;
+ boolean migrated = false;
+ for (Object[] me : migrations) {
+ if (((Integer)me[0]) > version) {
+ __log.debug("Running migration " + me[1]);
+ res = ((Migration)me[1]).migrate(registeredProcesses, _contexts.dao.getConnection()) && res;
+ migrated = true;
+ }
+ }
+ if (!res) _contexts.scheduler.setRollbackOnly();
+ else if (migrated) setDbVersion(CURRENT_SCHEMA_VERSION);
+ return res;
+ }
+ });
+ return success;
+ } catch (Exception e) {
+ __log.error("An error occured while migrating your database to a newer version of ODE, changes have " +
+ "been aborted", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int getDbVersion() {
+ int version = -1;
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ try {
+ conn = _contexts.dao.getDataSource().getConnection();
+ stmt = conn.prepareStatement("SELECT VERSION FROM ODE_SCHEMA_VERSION");
+ rs = stmt.executeQuery();
+ if (rs.next()) version = rs.getInt("VERSION");
+ } catch (Exception e) {
+ // Swallow, we'll just abort based on the version number
+ } finally {
+ try {
+ if (rs != null) rs.close();
+ if (stmt != null) stmt.close();
+ if (conn != null) conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return version;
+ }
+
+ private void setDbVersion(int version) {
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = _contexts.dao.getDataSource().getConnection();
+ stmt = conn.createStatement();
+ int res = stmt.executeUpdate("UPDATE ODE_SCHEMA_VERSION SET VERSION = " + version);
+ // This should never happen but who knows?
+ if (res == 0) throw new RuntimeException("Couldn't update schema version.");
+ } catch (Exception e) {
+ // Swallow, we'll just abort based on the version number
+ } finally {
+ try {
+ if (stmt != null) stmt.close();
+ if (conn != null) conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldCorrelationKey.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,167 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.utils.CollectionUtils;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.io.Serializable;
+
+public class OldCorrelationKey implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** CorrelationSet identifier. */
+ private int _csetId;
+ /** Key values. */
+ private String _keyValues[];
+
+ /**
+ * Constructor.
+ *
+ * @param csetId
+ * correlation set identifier
+ * @param keyValues
+ * correlation key values
+ */
+ public OldCorrelationKey(int csetId, String[] keyValues) {
+ _csetId = csetId;
+ _keyValues = keyValues;
+ }
+
+ public OldCorrelationKey(String canonicalForm) {
+ int firstTilde = canonicalForm.indexOf('~');
+ _csetId = Integer.parseInt(canonicalForm.substring(0, firstTilde == -1 ? canonicalForm.length() : firstTilde));
+
+ if (firstTilde != -1) {
+ List<String> keys = new ArrayList<String>();
+ char chars[] = canonicalForm.toCharArray();
+ StringBuffer work = new StringBuffer();
+ for (int i = firstTilde + 1; i < chars.length; ++i) {
+ boolean isLast = (i == chars.length - 1);
+ if (chars[i] == '~' && !isLast && chars[i + 1] == '~') {
+ work.append(chars[i++]);
+ } else if (chars[i] == '~') {
+ keys.add(work.toString());
+ work = new StringBuffer();
+ } else {
+ work.append(chars[i]);
+ }
+ }
+ keys.add(work.toString());
+ _keyValues = new String[keys.size()];
+ keys.toArray(_keyValues);
+ } else {
+ _keyValues = new String[0];
+ }
+ }
+
+ /** Return the OCorrelation id for the correlation set */
+ public int getCSetId() {
+ return _csetId;
+ }
+
+ /** Return the values for the correlation set */
+ public String[] getValues() {
+ return _keyValues;
+ }
+
+ /**
+ * Check if this key matches any member in a set of keys.
+ *
+ * @param keys
+ * set of keys to match against
+ *
+ * @return <code>true</code> if one of the keys in the set
+ * <code>equals(..)</code> this key, <code>false</code>
+ * otherwise
+ */
+ public boolean isMatch(OldCorrelationKey[] keys) {
+ for (OldCorrelationKey key : keys)
+ if (key.equals(this)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Equals comperator method.
+ *
+ * @param o
+ * <code>CorrelationKey</code> object to compare with
+ *
+ * @return <code>true</code> if the given object
+ */
+ public boolean equals(Object o) {
+ OldCorrelationKey okey = (OldCorrelationKey) o;
+
+ if (okey == null || okey._csetId != _csetId || okey._keyValues.length != _keyValues.length)
+ return false;
+
+ for (int i = 0; i < _keyValues.length; ++i)
+ if (!_keyValues[i].equals(okey._keyValues[i]))
+ return false;
+
+ return true;
+ }
+
+ /**
+ * Generate a hash code from the hash codes of the elements.
+ *
+ * @see java.util.HashMap#hashCode
+ * @see Object#hashCode
+ */
+ public int hashCode() {
+ int hashCode = _csetId;
+ for (String _keyValue : _keyValues)
+ hashCode ^= _keyValue.hashCode();
+ return hashCode;
+ }
+
+ public List<String> toCanonicalList() {
+ ArrayList<String> ret = new ArrayList<String>(_keyValues.length + 1);
+ ret.add(((Integer) _csetId).toString());
+ for (String i : _keyValues)
+ ret.add(i);
+ return ret;
+ }
+
+ /**
+ * @see Object#toString
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer("{CorrelationKey ");
+ buf.append("setId=");
+ buf.append(_csetId);
+ buf.append(", values=");
+ buf.append(CollectionUtils.makeCollection(ArrayList.class, _keyValues));
+ buf.append('}');
+
+ return buf.toString();
+ }
+
+ public String toCanonicalString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(this.getCSetId());
+ buf.append('~');
+ for (int i = 0; i < getValues().length; ++i) {
+ if (i != 0)
+ buf.append('~');
+ escapeTilde(buf, getValues()[i]);
+ }
+ return buf.toString();
+ }
+
+ static void escapeTilde(StringBuffer buf, String str) {
+ if (str == null)
+ return;
+ char[] chars = str.toCharArray();
+ for (char achar : chars) {
+ if (achar == '~') {
+ buf.append("~~");
+ } else {
+ buf.append(achar);
+ }
+ }
+ }
+}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/OldSelector.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,37 @@
+package org.apache.ode.bpel.engine.migration;
+
+import org.apache.ode.bpel.runtime.PartnerLinkInstance;
+import org.apache.ode.utils.ObjectPrinter;
+
+import java.io.Serializable;
+
+public class OldSelector implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final PartnerLinkInstance plinkInstance;
+ // here for the backward compatibility
+ @SuppressWarnings("unused")
+ public Object correlationKey = null;
+ public final String opName;
+ public final String messageExchangeId;
+ public final int idx;
+ public final boolean oneWay;
+
+ OldSelector(int idx, PartnerLinkInstance plinkInstance, String opName, boolean oneWay, String mexId) {
+ this.idx = idx;
+ this.plinkInstance = plinkInstance;
+ this.opName = opName;
+ this.messageExchangeId = mexId;
+ this.oneWay = oneWay;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] {
+ "plinkInstnace", plinkInstance,
+ "opName" ,opName,
+ "oneWay", oneWay ? "yes" : "no",
+ "mexId", messageExchangeId,
+ "idx", Integer.valueOf(idx)
+ });
+ }
+}
Added: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java?rev=732377&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java (added)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorMessageDaoImpl.java Wed Jan 7 07:58:10 2009
@@ -0,0 +1,25 @@
+package org.apache.ode.daohib.bpel;
+
+import org.apache.ode.bpel.dao.CorrelatorMessageDAO;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.daohib.SessionManager;
+
+public class CorrelatorMessageDaoImpl extends HibernateDao implements CorrelatorMessageDAO {
+
+ private HCorrelatorMessage _hobj;
+
+ public CorrelatorMessageDaoImpl(SessionManager sm, HCorrelatorMessage hobj) {
+ super(sm, hobj);
+ entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+ _hobj = hobj;
+ }
+
+ public CorrelationKey getCorrelationKey() {
+ return new CorrelationKey(_hobj.getCorrelationKey());
+ }
+
+ public void setCorrelationKey(CorrelationKey ckey) {
+ _hobj.setCorrelationKey(ckey.toCanonicalString());
+ }
+}