You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/27 02:51:21 UTC

svn commit: r579857 - /ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java

Author: mszefler
Date: Wed Sep 26 17:51:21 2007
New Revision: 579857

URL: http://svn.apache.org/viewvc?rev=579857&view=rev
Log:
Corrected link status deferral.
- Always use deferal for atomic/isolated
- use UNLOCKER to set the state of the LINKSTATUSINTERCEPTOR

Modified:
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java?rev=579857&r1=579856&r2=579857&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SCOPEACT.java Wed Sep 26 17:51:21 2007
@@ -20,19 +20,27 @@
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.ode.bpel.o.OLink;
 import org.apache.ode.bpel.o.OScope;
 import org.apache.ode.bpel.o.OScope.Variable;
 import org.apache.ode.bpel.runtime.channels.FaultData;
+import org.apache.ode.bpel.runtime.channels.LinkStatusChannel;
+import org.apache.ode.bpel.runtime.channels.LinkStatusChannelListener;
 import org.apache.ode.bpel.runtime.channels.ParentScopeChannel;
 import org.apache.ode.bpel.runtime.channels.ParentScopeChannelListener;
 import org.apache.ode.bpel.runtime.channels.ReadWriteLockChannel;
+import org.apache.ode.jacob.ChannelListener;
 import org.apache.ode.jacob.SynchChannel;
 import org.apache.ode.jacob.SynchChannelListener;
+import org.apache.ode.jacob.ValChannel;
+import org.apache.ode.jacob.ValChannelListener;
 import org.w3c.dom.Element;
 
 /**
@@ -54,7 +62,19 @@
             ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
                     _scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
 
-            instance(new SCOPE(_self, newFrame, _linkFrame));
+            // Depending on whether we are ATOMIC or not, we'll need to create outgoing link status interceptors
+            LinkFrame linkframe;
+            if (((OScope) _self.o).atomicScope && !_self.o.outgoingLinks.isEmpty()) {
+                ValChannel linkInterceptorControl = newChannel(ValChannel.class);
+                ParentScopeChannel psc = newChannel(ParentScopeChannel.class);
+                linkframe = createInterceptorLinkFrame();
+                instance(new LINKSTATUSINTERCEPTOR(linkInterceptorControl,linkframe));
+                instance(new UNLOCKER(psc, _self.parent, null, Collections.<IsolationLock>emptyList(), linkInterceptorControl));
+                _self.parent = psc;
+            } else
+                linkframe = _linkFrame;
+            
+            instance(new SCOPE(_self, newFrame, linkframe));
         }
 
     }
@@ -81,15 +101,123 @@
         return requiredLocks;
     }
 
+    /**
+     * Create outgoing link interceptors. Necessary for ISOLATED and ATOMIC (non-standard ext) scopes. I.e we need to prevent the
+     * links from coming out until the scope completes successfully.
+     * 
+     */
+    private LinkFrame createInterceptorLinkFrame() {
+        LinkFrame newframe = new LinkFrame(_linkFrame);
+        for (OLink outlink : _self.o.outgoingLinks) {
+            LinkInfo original = _linkFrame.resolve(outlink);
+            LinkStatusChannel newchannel = newChannel(LinkStatusChannel.class);
+            newframe.links.put(original.olink, new LinkInfo(original.olink, newchannel));
+        }
+        return newframe;
+    }
+
+    /**
+     * Link Status interceptor. Used in ISOLATED and ATOMIC scopes to prevent links from getting out until its time.
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     * 
+     */
+    private class LINKSTATUSINTERCEPTOR extends BpelJacobRunnable {
+        private static final long serialVersionUID = 3104008741240676253L;
+
+        /** We'll listen here for notification that its ok to send links status out. */
+        private final ValChannel _self;
+
+        private final LinkFrame _interceptedChannels;
+
+        /** The statuses that have been received */
+        private final Map<OLink, Boolean> _statuses = new HashMap<OLink, Boolean>();
+
+        /** NULL means defer links, TRUE means passthrough, FALSE means send FALSE */
+        private Boolean _status;
+
+        LINKSTATUSINTERCEPTOR(ValChannel self, LinkFrame interceptedChannels) {
+            _self = self;
+            _interceptedChannels = interceptedChannels;
+        }
+
+        @Override
+        public void run() {
+
+            Set<ChannelListener> mlset = new HashSet<ChannelListener>();
+            
+            if (_status == null)
+                mlset.add(new ValChannelListener(_self) {
+    
+                    private static final long serialVersionUID = 5029554538593371750L;
+    
+                    /** Our owner will notify us when it becomes clear what to do with the links. */
+                    public void val(Object retVal) {
+                        _status = (Boolean) retVal;
+                        for (OLink available : _statuses.keySet())
+                            _linkFrame.resolve(available).channel.linkStatus(_statuses.get(available) && _status);
+    
+                        // Check if we still need to wait around for more links.
+                        if (!isDone())
+                            instance(LINKSTATUSINTERCEPTOR.this);
+    
+                    }
+    
+                });
+
+            for (final Map.Entry<OLink, LinkInfo> m : _interceptedChannels.links.entrySet()) {
+                if (_statuses.containsKey(m.getKey()))
+                    continue;
+            
+                mlset.add(new LinkStatusChannelListener(m.getValue().channel) {
+                    private static final long serialVersionUID = 1568144473514091593L;
+
+                    public void linkStatus(boolean value) {
+                        _statuses.put(m.getKey(), value);
+                        if (_status != null)
+                            _linkFrame.resolve(m.getKey()).channel.linkStatus(value && _status);
+                        
+                        if (!isDone())
+                            instance(LINKSTATUSINTERCEPTOR.this);
+
+                    }
+
+                });
+            }
+            
+            object(false, mlset);
+
+        }
+
+        /**
+         * Did we get all the links we need?
+         * @return
+         */
+        private boolean isDone() {
+            return (_statuses.keySet().size() < SCOPEACT.this._self.o.outgoingLinks.size());
+
+        }
+
+    }
+    
+    
+    /**
+     * Guard for ISOLATED scopes to prevent start until all locks are acquired.
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     *
+     */
     private class ISOLATEDGUARD extends BpelJacobRunnable {
 
         private static final long serialVersionUID = -5017579415744600900L;
-        
+
         final List<IsolationLock> _locksNeeded;
+
         final LinkedList<IsolationLock> _locksAcquired = new LinkedList<IsolationLock>();
+
         final SynchChannel _synchChannel;
 
-        public ISOLATEDGUARD(List<IsolationLock> locks, SynchChannel synchChannel) {
+        ISOLATEDGUARD(List<IsolationLock> locks, SynchChannel synchChannel) {
             _locksNeeded = locks;
             _synchChannel = synchChannel;
         }
@@ -98,18 +226,18 @@
         public void run() {
             if (_locksNeeded.isEmpty()) {
                 // acquired all locks.
-                
+
                 ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
                         _scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
-                // need to make sure to release the locks after the scope completes, to do this, intercept messages
-                // on the parent scope channel.
-                if (!_locksAcquired.isEmpty()) {
-                    final ParentScopeChannel parent = _self.parent;
-                    _self.parent = newChannel(ParentScopeChannel.class);
-                    instance(new UNLOCKER(_self.parent, parent,_synchChannel, _locksAcquired));
-                }
+
                 
-                instance(new SCOPE(_self, newFrame, _linkFrame));
+                final ParentScopeChannel parent = _self.parent;
+                _self.parent = newChannel(ParentScopeChannel.class);
+                ValChannel lsi = newChannel(ValChannel.class);
+                instance(new UNLOCKER(_self.parent, parent, _synchChannel, _locksAcquired, lsi));
+                LinkFrame linkframe = createInterceptorLinkFrame();
+                instance(new LINKSTATUSINTERCEPTOR(lsi,linkframe));
+                instance(new SCOPE(_self, newFrame, linkframe));
                 return;
             } else {
                 // try to acquire the locks in sequence (IMPORTANT) not all at once.
@@ -118,36 +246,49 @@
                     il.lockChannel.writeLock(_synchChannel);
                 else
                     il.lockChannel.readLock(_synchChannel);
-                
+
                 object(new SynchChannelListener(_synchChannel) {
-                        private static final long serialVersionUID = 2857261074409098274L;
+                    private static final long serialVersionUID = 2857261074409098274L;
 
-                        public void ret() {
-                            _locksAcquired.add(_locksNeeded.remove(0));
-                            instance(ISOLATEDGUARD.this);
-                        }
+                    public void ret() {
+                        _locksAcquired.add(_locksNeeded.remove(0));
+                        instance(ISOLATEDGUARD.this);
+                    }
                 });
-            
-            }    
+
+            }
         }
-        
+
     }
 
-    
+    /**
+     * Interceptor that waits for the scope to finish and unlock the acquired locks.
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     *
+     */
     private static class UNLOCKER extends BpelJacobRunnable {
 
         private static final long serialVersionUID = -476393080609348172L;
-        
+
         private final ParentScopeChannel _self;
+
         private final ParentScopeChannel _parent;
+
         private final SynchChannel _synchChannel;
-        private final LinkedList<IsolationLock> _locks;
+        
+        private final List<IsolationLock> _locks;
+
+        private final ValChannel _linkStatusInterceptor;
 
-        public UNLOCKER(ParentScopeChannel self, ParentScopeChannel parent, SynchChannel synchChannel, LinkedList<IsolationLock> locksAcquired) {
+        public UNLOCKER(ParentScopeChannel self, ParentScopeChannel parent, SynchChannel synchChannel,
+                List<IsolationLock> locksAcquired,
+                ValChannel linkStatusInterceptor) {
             _self = self;
             _parent = parent;
             _synchChannel = synchChannel;
             _locks = locksAcquired;
+            _linkStatusInterceptor = linkStatusInterceptor;
         }
 
         @Override
@@ -158,6 +299,7 @@
                 public void cancelled() {
                     _parent.cancelled();
                     unlockAll();
+                    _linkStatusInterceptor.val(false);
                     // no more listening.
                 }
 
@@ -169,30 +311,32 @@
 
                 public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
                     _parent.completed(faultData, compensations);
+                    _linkStatusInterceptor.val(faultData == null);
                     unlockAll();
                     // no more listening
-                    
+
                 }
 
                 public void failure(String reason, Element data) {
                     _parent.failure(reason, data);
+                    _linkStatusInterceptor.val(false);
                     unlockAll();
                     // no more listening
                 }
-                
-            });            
+
+            });
         }
 
         /**
-         * Unlock all the acquired locks. 
-         *
+         * Unlock all the acquired locks.
+         * 
          */
         private void unlockAll() {
-            for (IsolationLock il : _locks) 
+            for (IsolationLock il : _locks)
                 il.lockChannel.unlock(_synchChannel);
             _locks.clear();
         }
-        
+
     }
 
     /**
@@ -201,7 +345,7 @@
      * @author Maciej Szefler <mszefler at gmail dot com>
      * 
      */
-    private static class IsolationLock implements Comparable<IsolationLock>,Serializable {
+    private static class IsolationLock implements Comparable<IsolationLock>, Serializable {
         private static final long serialVersionUID = 4214864393241172705L;
 
         OScope.Variable guardedObject;