You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by va...@apache.org on 2010/10/11 21:20:59 UTC
svn commit: r1021476 - in
/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine:
BpelRuntimeContextImpl.java IMAManager.java IMAManager2.java
migration/IMAManagerMigration.java migration/MigrationHandler.java
Author: vanto
Date: Mon Oct 11 19:20:59 2010
New Revision: 1021476
URL: http://svn.apache.org/viewvc?rev=1021476&view=rev
Log:
Better fix for ODE-888, provides migration for running instances.
Added:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java (with props)
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java (with props)
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=1021476&r1=1021475&r2=1021476&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Oct 11 19:20:59 2010
@@ -134,7 +134,7 @@ public class BpelRuntimeContextImpl impl
protected OutstandingRequestManager _outstandingRequests;
- protected IMAManager _imaManager;
+ protected IMAManager2 _imaManager;
protected BpelProcess _bpelProcess;
@@ -157,14 +157,14 @@ public class BpelRuntimeContextImpl impl
_soup = new ExecutionQueueImpl(null);
_soup.setReplacementMap(_bpelProcess.getReplacementMap(dao.getProcess().getProcessId()));
_outstandingRequests = null;
- _imaManager = new IMAManager();
+ _imaManager = new IMAManager2();
_vpu.setContext(_soup);
if (bpelProcess.isInMemory()) {
ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
if (inmem.getSoup() != null) {
_soup = (ExecutionQueueImpl) inmem.getSoup();
- _imaManager = (IMAManager) _soup.getGlobalData();
+ _imaManager = (IMAManager2) _soup.getGlobalData();
_vpu.setContext(_soup);
}
} else {
@@ -176,7 +176,7 @@ public class BpelRuntimeContextImpl impl
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- _imaManager = (IMAManager) _soup.getGlobalData();
+ _imaManager = (IMAManager2) _soup.getGlobalData();
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager.java?rev=1021476&r1=1021475&r2=1021476&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager.java Mon Oct 11 19:20:59 2010
@@ -29,7 +29,6 @@ import javax.wsdl.OperationType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.runtime.PartnerLinkInstance;
import org.apache.ode.bpel.runtime.Selector;
import org.apache.ode.utils.ObjectPrinter;
@@ -39,7 +38,9 @@ import org.apache.ode.utils.ObjectPrinte
* This class handles behaviour of IMAs (Inbound Message Activities) as specified in WS BPEL.
* This includes detecting conflictingReceive and conflictingRequest faults.
* </p>
+ * @deprecated use IMAManager2 instead.
*/
+@Deprecated
public class IMAManager implements Serializable {
private static final long serialVersionUID = -5556374398943757951L;
@@ -64,7 +65,7 @@ public class IMAManager implements Seria
Set<RequestIdTuple> workingSet = new HashSet<RequestIdTuple>(_byRid.keySet());
for (int i = 0; i < selectors.length; ++i) {
- final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName, selectors[i].correlationKeySet);
+ final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName);
if (workingSet.contains(rid)) {
return i;
}
@@ -94,7 +95,7 @@ public class IMAManager implements Seria
Entry entry = new Entry(pickResponseChannel, selectors);
for (int i = 0; i < selectors.length; ++i) {
- final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName, selectors[i].correlationKeySet);
+ final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName);
if (_byRid.containsKey(rid)) {
String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RID " + rid;
__log.fatal(errmsg);
@@ -187,7 +188,7 @@ public class IMAManager implements Seria
_byOrid.put(orid, oldEntry.mexRef);
} else {
//registered IMA
- RequestIdTuple rid = new RequestIdTuple(oldRid.partnerLink, oldRid.opName, null);
+ RequestIdTuple rid = new RequestIdTuple(oldRid.partnerLink, oldRid.opName);
Entry entry = new Entry(oldEntry.pickResponseChannel, (Selector[]) oldEntry.selectors);
_byRid.put(rid, entry);
_byChannel.put(entry.pickResponseChannel, entry);
@@ -216,6 +217,27 @@ public class IMAManager implements Seria
public String toString() {
return ObjectPrinter.toString(this, new Object[] { "byRid", _byRid, "byOrid", _byOrid, "byChannel", _byChannel });
}
+
+ public IMAManager2 toIMAManager2() {
+ IMAManager2 newIMA = new IMAManager2();
+ for (String channel : _byChannel.keySet()) {
+ IMAManager2.Entry entry = new IMAManager2.Entry(_byChannel.get(channel).pickResponseChannel, _byChannel.get(channel).selectors);
+ newIMA._byChannel.put(channel, entry);
+ }
+ for (OutstandingRequestIdTuple orid : _byOrid.keySet()) {
+ IMAManager2.OutstandingRequestIdTuple newOrid = new IMAManager2.OutstandingRequestIdTuple(orid.partnerLink, orid.opName, orid.mexId);
+ newIMA._byOrid.put(newOrid, _byOrid.get(orid));
+ }
+ for (RequestIdTuple rid : _byRid.keySet()) {
+ IMAManager2.Entry entry = new IMAManager2.Entry(_byRid.get(rid).pickResponseChannel, _byRid.get(rid).selectors);
+ for (Selector sel : entry.selectors) {
+ IMAManager2.RequestIdTuple newRid = new IMAManager2.RequestIdTuple(rid.partnerLink, rid.opName, sel.correlationKeySet);
+ newIMA._byRid.put(newRid, entry);
+ }
+ }
+
+ return newIMA;
+ }
private class RequestIdTuple implements Serializable {
private static final long serialVersionUID = -1059389611839777482L;
@@ -223,74 +245,24 @@ public class IMAManager implements Seria
PartnerLinkInstance partnerLink;
/** Name of the operation. */
String opName;
- /** cset */
- CorrelationKeySet ckeySet;
- /** migrated tuple. This is true if the tuple has been created based on an old tuple which didn't contain a cset.*/
- boolean isMigrated = false;
/** Constructor. */
- private RequestIdTuple(PartnerLinkInstance partnerLink, String opName, CorrelationKeySet ckeySet) {
+ private RequestIdTuple(PartnerLinkInstance partnerLink, String opName) {
this.partnerLink = partnerLink;
this.opName = opName;
- this.ckeySet = ckeySet;
- if (ckeySet == null) {
- this.isMigrated = true;
- }
- }
-
- public String toString() {
- return ObjectPrinter.toString(this, new Object[] { "partnerLink", partnerLink, "opName", opName, "cSet", ckeySet});
}
- @Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + getOuterType().hashCode();
- result = prime * result
- + ((ckeySet == null) ? 0 : ckeySet.hashCode());
- result = prime * result
- + ((opName == null) ? 0 : opName.hashCode());
- if (!isMigrated) {
- result = prime * result
- + ((partnerLink == null) ? 0 : partnerLink.hashCode());
- }
- return result;
+ return this.partnerLink.hashCode() ^ this.opName.hashCode();
}
- @Override
public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
RequestIdTuple other = (RequestIdTuple) obj;
- if (!getOuterType().equals(other.getOuterType()))
- return false;
- if (!isMigrated) {
- if (ckeySet == null) {
- if (other.ckeySet != null)
- return false;
- } else if (!ckeySet.equals(other.ckeySet))
- return false;
- }
- if (opName == null) {
- if (other.opName != null)
- return false;
- } else if (!opName.equals(other.opName))
- return false;
- if (partnerLink == null) {
- if (other.partnerLink != null)
- return false;
- } else if (!partnerLink.equals(other.partnerLink))
- return false;
- return true;
- }
+ return other.partnerLink.equals(partnerLink) && other.opName.equals(opName);
+ }
- private IMAManager getOuterType() {
- return IMAManager.this;
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "partnerLink", partnerLink, "opName", opName});
}
}
Added: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java?rev=1021476&view=auto
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java (added)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java Mon Oct 11 19:20:59 2010
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.engine;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.wsdl.OperationType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.runtime.PartnerLinkInstance;
+import org.apache.ode.bpel.runtime.Selector;
+import org.apache.ode.utils.ObjectPrinter;
+
+/**
+ * <p>
+ * This class handles behaviour of IMAs (Inbound Message Activities) as specified in WS BPEL.
+ * This includes detecting conflictingReceive and conflictingRequest faults.
+ * </p>
+ */
+public class IMAManager2 implements Serializable {
+ private static final long serialVersionUID = -5556374398943757951L;
+
+ private static final Log __log = LogFactory.getLog(IMAManager2.class);
+
+ // holds rid for registered IMAs
+ public final Map<RequestIdTuple, Entry> _byRid = new HashMap<RequestIdTuple, Entry>();
+ // holds outstanding rid that are now waiting to reply (Open IMAs)
+ public final Map<OutstandingRequestIdTuple, String> _byOrid = new HashMap<OutstandingRequestIdTuple, String>();
+ public final Map<String, Entry> _byChannel = new HashMap<String, Entry>();
+
+ /**
+ * finds conflictingReceive
+ *
+ * @param selectors
+ * @return
+ */
+ int findConflict(Selector selectors[]) {
+ if (__log.isTraceEnabled()) {
+ __log.trace(ObjectPrinter.stringifyMethodEnter("findConflict", new Object[] { "selectors", selectors }));
+ }
+
+ Set<RequestIdTuple> workingSet = new HashSet<RequestIdTuple>(_byRid.keySet());
+ for (int i = 0; i < selectors.length; ++i) {
+ final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName, selectors[i].correlationKeySet);
+ if (workingSet.contains(rid)) {
+ return i;
+ }
+ workingSet.add(rid);
+ }
+ return -1;
+ }
+
+ /**
+ * Register IMA
+ *
+ * @param pickResponseChannel
+ * response channel associated with this receive/pick
+ * @param selectors
+ * selectors for this receive/pick
+ */
+ void register(String pickResponseChannel, Selector selectors[]) {
+ if (__log.isTraceEnabled()) {
+ __log.trace(ObjectPrinter.stringifyMethodEnter("register", new Object[] { "pickResponseChannel", pickResponseChannel, "selectors", selectors }));
+ }
+
+ if (_byChannel.containsKey(pickResponseChannel)) {
+ String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalArgumentException(errmsg);
+ }
+
+ Entry entry = new Entry(pickResponseChannel, selectors);
+ for (int i = 0; i < selectors.length; ++i) {
+ final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance, selectors[i].opName, selectors[i].correlationKeySet);
+ if (_byRid.containsKey(rid)) {
+ String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RID " + rid;
+ __log.fatal(errmsg);
+ throw new IllegalStateException(errmsg);
+ }
+ _byRid.put(rid, entry);
+ }
+
+ _byChannel.put(pickResponseChannel, entry);
+ }
+
+ /**
+ * Registers Open IMA.
+ * It doesn't open IMA for non two way operations.
+ *
+ * @param partnerLink
+ * @param opName
+ * @param mexId
+ * @param mexRef
+ * @return
+ */
+ String processOutstandingRequest(PartnerLinkInstance partnerLink, String opName, String mexId, String mexRef) {
+ if (__log.isTraceEnabled()) {
+ __log.trace(ObjectPrinter.stringifyMethodEnter("process", new Object[] { "partnerLinkInstance", partnerLink, "operationName", opName, "messageExchangeId", mexId, "mexRef", mexRef }));
+ }
+ final OutstandingRequestIdTuple orid = new OutstandingRequestIdTuple(partnerLink, opName, mexId);
+ if (_byOrid.containsKey(orid)) {
+ //conflictingRequest found
+ return mexRef;
+ }
+ // We convert into outstanding request only for in-out operations (pending release operation)
+ if (partnerLink.partnerLink.getMyRoleOperation(opName).getStyle().equals(OperationType.REQUEST_RESPONSE)) {
+ _byOrid.put(orid, mexRef);
+ }
+ return null;
+ }
+
+ /**
+ * This is used to remove IMA from registered state.
+ *
+ * @see #register(String, Selector[])
+ * @param pickResponseChannel
+ */
+ void cancel(String pickResponseChannel, boolean isTimer) {
+ if (__log.isTraceEnabled())
+ __log.trace(ObjectPrinter.stringifyMethodEnter("cancel", new Object[] { "pickResponseChannel", pickResponseChannel }));
+
+ Entry entry = _byChannel.remove(pickResponseChannel);
+ if (entry != null) {
+ while (_byRid.values().remove(entry));
+ } else if (!isTimer){
+ String errmsg = "INTERNAL ERROR: No ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
+ __log.fatal(errmsg);
+ throw new IllegalArgumentException(errmsg);
+ }
+ }
+
+ /**
+ * Release Open IMA.
+ *
+ * @param plinkInstnace
+ * partner link
+ * @param opName
+ * operation
+ * @param mexId
+ * message exchange identifier IN THE BPEL SENSE OF THE TERM (i.e. a receive/reply disambiguator).
+ * @return message exchange identifier associated with the registration that matches the parameters
+ */
+ public String release(PartnerLinkInstance plinkInstnace, String opName, String mexId) {
+ if (__log.isTraceEnabled())
+ __log.trace(ObjectPrinter.stringifyMethodEnter("release", new Object[] { "plinkInstance", plinkInstnace, "opName", opName, "mexId", mexId }));
+
+ final OutstandingRequestIdTuple orid = new OutstandingRequestIdTuple(plinkInstnace, opName, mexId);
+ String mexRef = _byOrid.remove(orid);
+ if (mexRef == null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("==release: ORID " + orid + " not found in " + _byOrid);
+ }
+ return null;
+ }
+ return mexRef;
+ }
+
+ /**
+ * "Release" all Open IMAs
+ *
+ * @return a list of message exchange identifiers for message exchanges that were begun (receive/pick got a message) but not yet completed (reply not yet sent)
+ */
+ public String[] releaseAll() {
+ if (__log.isTraceEnabled())
+ __log.trace(ObjectPrinter.stringifyMethodEnter("releaseAll", null));
+
+ ArrayList<String> mexRefs = new ArrayList<String>();
+ while (!_byOrid.isEmpty()) {
+ String mexRef = _byOrid.entrySet().iterator().next().getValue();
+ mexRefs.add(mexRef);
+ _byOrid.values().remove(mexRef);
+ }
+ return mexRefs.toArray(new String[mexRefs.size()]);
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "byRid", _byRid, "byOrid", _byOrid, "byChannel", _byChannel });
+ }
+
+ public static class RequestIdTuple implements Serializable {
+ private static final long serialVersionUID = -1059389611839777482L;
+ /** On which partner link it was received. */
+ PartnerLinkInstance partnerLink;
+ /** Name of the operation. */
+ String opName;
+ /** cset */
+ CorrelationKeySet ckeySet;
+
+ /** Constructor. */
+ RequestIdTuple(PartnerLinkInstance partnerLink, String opName, CorrelationKeySet ckeySet) {
+ this.partnerLink = partnerLink;
+ this.opName = opName;
+ this.ckeySet = ckeySet;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "partnerLink", partnerLink, "opName", opName, "cSet", ckeySet});
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((ckeySet == null) ? 0 : ckeySet.hashCode());
+ result = prime * result
+ + ((opName == null) ? 0 : opName.hashCode());
+ result = prime * result
+ + ((partnerLink == null) ? 0 : partnerLink.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RequestIdTuple)) {
+ return false;
+ }
+ RequestIdTuple other = (RequestIdTuple) obj;
+ if (ckeySet == null) {
+ if (other.ckeySet != null) {
+ return false;
+ }
+ } else if (!ckeySet.equals(other.ckeySet)) {
+ return false;
+ }
+ if (opName == null) {
+ if (other.opName != null) {
+ return false;
+ }
+ } else if (!opName.equals(other.opName)) {
+ return false;
+ }
+ if (partnerLink == null) {
+ if (other.partnerLink != null) {
+ return false;
+ }
+ } else if (!partnerLink.equals(other.partnerLink)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ public static class OutstandingRequestIdTuple implements Serializable {
+ private static final long serialVersionUID = -1059389611839777482L;
+ /** On which partner link it was received. */
+ PartnerLinkInstance partnerLink;
+ /** Name of the operation. */
+ String opName;
+ /** Message exchange identifier. */
+ String mexId;
+
+ /** Constructor. */
+ OutstandingRequestIdTuple(PartnerLinkInstance partnerLink, String opName, String mexId) {
+ this.partnerLink = partnerLink;
+ this.opName = opName;
+ this.mexId = mexId == null ? "" : mexId;
+ }
+
+ public int hashCode() {
+ return this.partnerLink.hashCode() ^ this.opName.hashCode() ^ this.mexId.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ OutstandingRequestIdTuple other = (OutstandingRequestIdTuple) obj;
+ return other.partnerLink.equals(partnerLink) && other.opName.equals(opName) && other.mexId.equals(mexId);
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "partnerLink", partnerLink, "opName", opName, "mexId", mexId });
+ }
+ }
+
+ public static class Entry implements Serializable {
+ private static final long serialVersionUID = -583743124656582887L;
+ final String pickResponseChannel;
+ public Selector[] selectors;
+
+ Entry(String pickResponseChannel, Selector[] selectors) {
+ this.pickResponseChannel = pickResponseChannel;
+ this.selectors = selectors;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "pickResponseChannel", pickResponseChannel, "selectors", selectors });
+ }
+ }
+}
Propchange: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/IMAManager2.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java?rev=1021476&view=auto
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java (added)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java Mon Oct 11 19:20:59 2010
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.bpel.engine.migration;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.IMAManager;
+import org.apache.ode.bpel.engine.IMAManager2;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+
+/**
+ * Migrates OutstandingRequestManager to IMAManager
+ *
+ */
+public class IMAManagerMigration implements Migration {
+ private static Log __log = LogFactory.getLog(IMAManagerMigration.class);
+
+ public boolean migrate(Set<BpelProcess> registeredProcesses, BpelDAOConnection connection) {
+ boolean migrationResult = true;
+ for (BpelProcess process : registeredProcesses) {
+ ProcessDAO processDao = connection.getProcess(process.getConf().getProcessId());
+ Collection<ProcessInstanceDAO> pis = processDao.getActiveInstances();
+
+ for (ProcessInstanceDAO instance : pis) {
+ __log.debug("Migrating from IMAManager to IMAManager2 for instance " + instance.getInstanceId());
+
+ try {
+ if (instance.getExecutionState() == null) {
+ //Completed instance
+ __log.debug("Skipped");
+ } else {
+ ExecutionQueueImpl soup = new ExecutionQueueImpl(this.getClass().getClassLoader());
+ soup.setReplacementMap(process.getReplacementMap(processDao.getProcessId()));
+ soup.read(new ByteArrayInputStream(instance.getExecutionState()));
+ Object data = soup.getGlobalData();
+ if (data instanceof IMAManager) {
+ IMAManager imaOld = (IMAManager) data;
+
+ soup.setGlobalData(imaOld.toIMAManager2());
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ soup.write(bos);
+ instance.setExecutionState(bos.toByteArray());
+ __log.debug("Migrated outstanding requests for instance " + instance.getInstanceId());
+ }
+ }
+ } catch (Exception e) {
+ __log.debug("", e);
+ __log.error("Error migrating outstanding requests for instance " + instance.getInstanceId());
+ migrationResult = false;
+ }
+ }
+ }
+
+ return migrationResult;
+ }
+}
Propchange: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/IMAManagerMigration.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java?rev=1021476&r1=1021475&r2=1021476&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/migration/MigrationHandler.java Mon Oct 11 19:20:59 2010
@@ -42,7 +42,7 @@ import org.apache.ode.bpel.engine.Contex
public class MigrationHandler {
private static final Log __log = LogFactory.getLog(MigrationHandler.class);
- public static final int CURRENT_SCHEMA_VERSION = 6;
+ public static final int CURRENT_SCHEMA_VERSION = 7;
private Contexts _contexts;
@@ -53,6 +53,7 @@ public class MigrationHandler {
add(new MigrationLink(4, 3, new Migration[] { new CorrelationKeySetMigration() } ));
add(new MigrationLink(3, 5, new Migration[] { new CorrelationKeySetDataMigration() } ));
add(new MigrationLink(5, 6, new Migration[] { new OutstandingRequestsMigration() } ));
+ add(new MigrationLink(6, 7, new Migration[] { new IMAManagerMigration() } ));
}};