You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/27 02:51:21 UTC
svn commit: r579857 -
/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java
Author: mszefler
Date: Wed Sep 26 17:51:21 2007
New Revision: 579857
URL: http://svn.apache.org/viewvc?rev=579857&view=rev
Log:
Corrected link status deferral.
- Always use deferal for atomic/isolated
- use UNLOCKER to set the state of the LINKSTATUSINTERCEPTOR
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java?rev=579857&r1=579856&r2=579857&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java Wed Sep 26 17:51:21 2007
@@ -20,19 +20,27 @@
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.ode.bpel.o.OLink;
import org.apache.ode.bpel.o.OScope;
import org.apache.ode.bpel.o.OScope.Variable;
import org.apache.ode.bpel.runtime.channels.FaultData;
+import org.apache.ode.bpel.runtime.channels.LinkStatusChannel;
+import org.apache.ode.bpel.runtime.channels.LinkStatusChannelListener;
import org.apache.ode.bpel.runtime.channels.ParentScopeChannel;
import org.apache.ode.bpel.runtime.channels.ParentScopeChannelListener;
import org.apache.ode.bpel.runtime.channels.ReadWriteLockChannel;
+import org.apache.ode.jacob.ChannelListener;
import org.apache.ode.jacob.SynchChannel;
import org.apache.ode.jacob.SynchChannelListener;
+import org.apache.ode.jacob.ValChannel;
+import org.apache.ode.jacob.ValChannelListener;
import org.w3c.dom.Element;
/**
@@ -54,7 +62,19 @@
ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
_scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
- instance(new SCOPE(_self, newFrame, _linkFrame));
+ // Depending on whether we are ATOMIC or not, we'll need to create outgoing link status interceptors
+ LinkFrame linkframe;
+ if (((OScope) _self.o).atomicScope && !_self.o.outgoingLinks.isEmpty()) {
+ ValChannel linkInterceptorControl = newChannel(ValChannel.class);
+ ParentScopeChannel psc = newChannel(ParentScopeChannel.class);
+ linkframe = createInterceptorLinkFrame();
+ instance(new LINKSTATUSINTERCEPTOR(linkInterceptorControl,linkframe));
+ instance(new UNLOCKER(psc, _self.parent, null, Collections.<IsolationLock>emptyList(), linkInterceptorControl));
+ _self.parent = psc;
+ } else
+ linkframe = _linkFrame;
+
+ instance(new SCOPE(_self, newFrame, linkframe));
}
}
@@ -81,15 +101,123 @@
return requiredLocks;
}
+ /**
+ * Create outgoing link interceptors. Necessary for ISOLATED and ATOMIC (non-standard ext) scopes. I.e we need to prevent the
+ * links from coming out until the scope completes successfully.
+ *
+ */
+ private LinkFrame createInterceptorLinkFrame() {
+ LinkFrame newframe = new LinkFrame(_linkFrame);
+ for (OLink outlink : _self.o.outgoingLinks) {
+ LinkInfo original = _linkFrame.resolve(outlink);
+ LinkStatusChannel newchannel = newChannel(LinkStatusChannel.class);
+ newframe.links.put(original.olink, new LinkInfo(original.olink, newchannel));
+ }
+ return newframe;
+ }
+
+ /**
+ * Link Status interceptor. Used in ISOLATED and ATOMIC scopes to prevent links from getting out until its time.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ private class LINKSTATUSINTERCEPTOR extends BpelJacobRunnable {
+ private static final long serialVersionUID = 3104008741240676253L;
+
+ /** We'll listen here for notification that its ok to send links status out. */
+ private final ValChannel _self;
+
+ private final LinkFrame _interceptedChannels;
+
+ /** The statuses that have been received */
+ private final Map<OLink, Boolean> _statuses = new HashMap<OLink, Boolean>();
+
+ /** NULL means defer links, TRUE means passthrough, FALSE means send FALSE */
+ private Boolean _status;
+
+ LINKSTATUSINTERCEPTOR(ValChannel self, LinkFrame interceptedChannels) {
+ _self = self;
+ _interceptedChannels = interceptedChannels;
+ }
+
+ @Override
+ public void run() {
+
+ Set<ChannelListener> mlset = new HashSet<ChannelListener>();
+
+ if (_status == null)
+ mlset.add(new ValChannelListener(_self) {
+
+ private static final long serialVersionUID = 5029554538593371750L;
+
+ /** Our owner will notify us when it becomes clear what to do with the links. */
+ public void val(Object retVal) {
+ _status = (Boolean) retVal;
+ for (OLink available : _statuses.keySet())
+ _linkFrame.resolve(available).channel.linkStatus(_statuses.get(available) && _status);
+
+ // Check if we still need to wait around for more links.
+ if (!isDone())
+ instance(LINKSTATUSINTERCEPTOR.this);
+
+ }
+
+ });
+
+ for (final Map.Entry<OLink, LinkInfo> m : _interceptedChannels.links.entrySet()) {
+ if (_statuses.containsKey(m.getKey()))
+ continue;
+
+ mlset.add(new LinkStatusChannelListener(m.getValue().channel) {
+ private static final long serialVersionUID = 1568144473514091593L;
+
+ public void linkStatus(boolean value) {
+ _statuses.put(m.getKey(), value);
+ if (_status != null)
+ _linkFrame.resolve(m.getKey()).channel.linkStatus(value && _status);
+
+ if (!isDone())
+ instance(LINKSTATUSINTERCEPTOR.this);
+
+ }
+
+ });
+ }
+
+ object(false, mlset);
+
+ }
+
+ /**
+ * Did we get all the links we need?
+ * @return
+ */
+ private boolean isDone() {
+ return (_statuses.keySet().size() < SCOPEACT.this._self.o.outgoingLinks.size());
+
+ }
+
+ }
+
+
+ /**
+ * Guard for ISOLATED scopes to prevent start until all locks are acquired.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
private class ISOLATEDGUARD extends BpelJacobRunnable {
private static final long serialVersionUID = -5017579415744600900L;
-
+
final List<IsolationLock> _locksNeeded;
+
final LinkedList<IsolationLock> _locksAcquired = new LinkedList<IsolationLock>();
+
final SynchChannel _synchChannel;
- public ISOLATEDGUARD(List<IsolationLock> locks, SynchChannel synchChannel) {
+ ISOLATEDGUARD(List<IsolationLock> locks, SynchChannel synchChannel) {
_locksNeeded = locks;
_synchChannel = synchChannel;
}
@@ -98,18 +226,18 @@
public void run() {
if (_locksNeeded.isEmpty()) {
// acquired all locks.
-
+
ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
_scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
- // need to make sure to release the locks after the scope completes, to do this, intercept messages
- // on the parent scope channel.
- if (!_locksAcquired.isEmpty()) {
- final ParentScopeChannel parent = _self.parent;
- _self.parent = newChannel(ParentScopeChannel.class);
- instance(new UNLOCKER(_self.parent, parent,_synchChannel, _locksAcquired));
- }
+
- instance(new SCOPE(_self, newFrame, _linkFrame));
+ final ParentScopeChannel parent = _self.parent;
+ _self.parent = newChannel(ParentScopeChannel.class);
+ ValChannel lsi = newChannel(ValChannel.class);
+ instance(new UNLOCKER(_self.parent, parent, _synchChannel, _locksAcquired, lsi));
+ LinkFrame linkframe = createInterceptorLinkFrame();
+ instance(new LINKSTATUSINTERCEPTOR(lsi,linkframe));
+ instance(new SCOPE(_self, newFrame, linkframe));
return;
} else {
// try to acquire the locks in sequence (IMPORTANT) not all at once.
@@ -118,36 +246,49 @@
il.lockChannel.writeLock(_synchChannel);
else
il.lockChannel.readLock(_synchChannel);
-
+
object(new SynchChannelListener(_synchChannel) {
- private static final long serialVersionUID = 2857261074409098274L;
+ private static final long serialVersionUID = 2857261074409098274L;
- public void ret() {
- _locksAcquired.add(_locksNeeded.remove(0));
- instance(ISOLATEDGUARD.this);
- }
+ public void ret() {
+ _locksAcquired.add(_locksNeeded.remove(0));
+ instance(ISOLATEDGUARD.this);
+ }
});
-
- }
+
+ }
}
-
+
}
-
+ /**
+ * Interceptor that waits for the scope to finish and unlock the acquired locks.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
private static class UNLOCKER extends BpelJacobRunnable {
private static final long serialVersionUID = -476393080609348172L;
-
+
private final ParentScopeChannel _self;
+
private final ParentScopeChannel _parent;
+
private final SynchChannel _synchChannel;
- private final LinkedList<IsolationLock> _locks;
+
+ private final List<IsolationLock> _locks;
+
+ private final ValChannel _linkStatusInterceptor;
- public UNLOCKER(ParentScopeChannel self, ParentScopeChannel parent, SynchChannel synchChannel, LinkedList<IsolationLock> locksAcquired) {
+ public UNLOCKER(ParentScopeChannel self, ParentScopeChannel parent, SynchChannel synchChannel,
+ List<IsolationLock> locksAcquired,
+ ValChannel linkStatusInterceptor) {
_self = self;
_parent = parent;
_synchChannel = synchChannel;
_locks = locksAcquired;
+ _linkStatusInterceptor = linkStatusInterceptor;
}
@Override
@@ -158,6 +299,7 @@
public void cancelled() {
_parent.cancelled();
unlockAll();
+ _linkStatusInterceptor.val(false);
// no more listening.
}
@@ -169,30 +311,32 @@
public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
_parent.completed(faultData, compensations);
+ _linkStatusInterceptor.val(faultData == null);
unlockAll();
// no more listening
-
+
}
public void failure(String reason, Element data) {
_parent.failure(reason, data);
+ _linkStatusInterceptor.val(false);
unlockAll();
// no more listening
}
-
- });
+
+ });
}
/**
- * Unlock all the acquired locks.
- *
+ * Unlock all the acquired locks.
+ *
*/
private void unlockAll() {
- for (IsolationLock il : _locks)
+ for (IsolationLock il : _locks)
il.lockChannel.unlock(_synchChannel);
_locks.clear();
}
-
+
}
/**
@@ -201,7 +345,7 @@
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
- private static class IsolationLock implements Comparable<IsolationLock>,Serializable {
+ private static class IsolationLock implements Comparable<IsolationLock>, Serializable {
private static final long serialVersionUID = 4214864393241172705L;
OScope.Variable guardedObject;