You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/10/05 00:56:59 UTC

svn commit: r453057 [1/2] - in /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/ runtime/

Author: mriou
Date: Wed Oct  4 15:56:58 2006
New Revision: 453057

URL: http://svn.apache.org/viewvc?view=rev&rev=453057
Log:
Logging faults as errors.

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/THROW.java
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Oct  4 15:56:58 2006
@@ -950,9 +950,7 @@
                             dao.setProperty(property.name.toString(), val);
                         }
                     } catch (FaultException e) {
-                        // ignore error for now
-                        // __log.warn("Error attempting to extract property '" +
-                        // property.toString() + "'", e);
+                         __log.error("Error attempting to extract property '" + property.toString() + "'", e);
                     }
                 }
             }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java Wed Oct  4 15:56:58 2006
@@ -44,171 +44,172 @@
 import java.util.Set;
 
 class ACTIVITYGUARD extends ACTIVITY {
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private static final Log __log = LogFactory.getLog(ACTIVITYGUARD.class);
+    private static final Log __log = LogFactory.getLog(ACTIVITYGUARD.class);
 
-  private static final ActivityTemplateFactory __activityTemplateFactory = new ActivityTemplateFactory();
-  private OActivity _oactivity;
+    private static final ActivityTemplateFactory __activityTemplateFactory = new ActivityTemplateFactory();
+    private OActivity _oactivity;
 
-  /** Link values. */
-  private Map<OLink, Boolean> _linkVals = new HashMap<OLink, Boolean>();
-  
-  /** Flag to prevent duplicate ActivityEnabledEvents */
-  private boolean _firstTime = true;
-
-  public ACTIVITYGUARD(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
-    super(self, scopeFrame, linkFrame);
-    _oactivity = self.o;
-  }
-
-  public void run() {
-    // Send a notification of the activity being enabled, 
-    if (_firstTime) {
-      sendEvent(new ActivityEnabledEvent());
-      _firstTime = false;
-    }
-    
-    if (_linkVals.keySet().containsAll(_oactivity.targetLinks)) {
-      if (evaluateJoinCondition()) {
-
-        ActivityExecStartEvent aese = new ActivityExecStartEvent();
-        sendEvent(aese);
-        // intercept completion channel in order to execute transition conditions.
-        ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScopeChannel.class));
-        instance(createActivity(activity));
-        instance(new TCONDINTERCEPT(activity.parent));
-      } else {
-        // Join Failure.
-        _self.parent.completed(createFault(_oactivity.getOwner().constants.qnJoinFailure,_oactivity),
-                CompensationHandler.emptySet());
-
-        // Dead path activity.
-        dpe(_oactivity);
-      }
-    } else /* don't know all our links statuses */ {
-      Set<ChannelListener> mlset = new HashSet<ChannelListener>();
-      mlset.add(new TerminationChannelListener(_self.self) {
-        private static final long serialVersionUID = 5094153128476008961L;
-
-        public void terminate() {
-          // Complete immediately, without faulting or registering any comps.
-          _self.parent.completed(null, CompensationHandler.emptySet());
-
-          // Dead-path activity
-          dpe(_oactivity);
-        }
-      });
-      for (Iterator<OLink> i = _oactivity.targetLinks.iterator();i.hasNext();) {
-        final OLink link = i.next();
-        mlset.add(new LinkStatusChannelListener(_linkFrame.resolve(link).sub) {
-        private static final long serialVersionUID = 1024137371118887935L;
-
-        public void linkStatus(boolean value) {
-            _linkVals.put(link, Boolean.valueOf(value));
-            instance(ACTIVITYGUARD.this);
-          }
-        });
-      }
-
-      object(false, mlset);
-    }
-  }
-
-
-  private boolean evaluateTransitionCondition(OExpression transitionCondition)
-          throws FaultException {
-    if (transitionCondition == null)
-      return true;
-
-    try {
-      return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(transitionCondition,
-              new ExprEvaluationContextImpl(_scopeFrame, getBpelRuntimeContext()));
-    } catch (EvaluationException e) {
-      String msg = "Error in transition condition detected at runtime; condition=" + transitionCondition;
-      __log.error(msg,e);
-      throw new InvalidProcessException(msg, e);
-    }
-  }
-
-  /**
-   * Evaluate an activity's join condition.
-   * @return <code>true</code> if join condition evaluates to true.
-   */
-  private boolean evaluateJoinCondition() {
-    // For activities with no link targets, the join condition is always satisfied.
-    if (_oactivity.targetLinks.size() == 0)
-      return true;
-
-    // For activities with no join condition, an OR condition is assumed.
-    if (_oactivity.joinCondition == null)
-      return _linkVals.values().contains(Boolean.TRUE);
-
-    try {
-      return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(_oactivity.joinCondition,
-              new ExprEvaluationContextImpl(null,null,_linkVals));
-    } catch (Exception e) {
-      String msg = "Unexpected error evaluating a join condition: " + _oactivity.joinCondition;
-      __log.error(msg,e);
-      throw new InvalidProcessException(msg,e);
-    }
-  }
-
-  private static ACTIVITY createActivity(ActivityInfo activity, ScopeFrame scopeFrame, LinkFrame linkFrame) {
-    return __activityTemplateFactory.createInstance(activity.o,activity, scopeFrame, linkFrame);
-  }
-
-  private ACTIVITY createActivity(ActivityInfo activity) {
-    return createActivity(activity,_scopeFrame, _linkFrame);
-  }
-
-
-  /**
-   * Intercepts the
-   * {@link ParentScopeChannel#completed(org.apache.ode.bpel.runtime.channels.FaultData, java.util.Set<org.apache.ode.bpel.runtime.CompensationHandler>)}
-   * call, to evaluate transition conditions before returning to the parent.
-   */
-  private class TCONDINTERCEPT extends BpelJacobRunnable {
-    private static final long serialVersionUID = 4014873396828400441L;
-    ParentScopeChannel _in;
+    /** Link values. */
+    private Map<OLink, Boolean> _linkVals = new HashMap<OLink, Boolean>();
 
-    public TCONDINTERCEPT(ParentScopeChannel in) {
-      _in = in;
+    /** Flag to prevent duplicate ActivityEnabledEvents */
+    private boolean _firstTime = true;
+
+    public ACTIVITYGUARD(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+        super(self, scopeFrame, linkFrame);
+        _oactivity = self.o;
     }
 
     public void run() {
-      object(new ParentScopeChannelListener(_in) {
-        private static final long serialVersionUID = 2667359535900385952L;
+        // Send a notification of the activity being enabled,
+        if (_firstTime) {
+            sendEvent(new ActivityEnabledEvent());
+            _firstTime = false;
+        }
+
+        if (_linkVals.keySet().containsAll(_oactivity.targetLinks)) {
+            if (evaluateJoinCondition()) {
 
-        public void compensate(OScope scope, SynchChannel ret) {
-          _self.parent.compensate(scope,ret);
-          instance(TCONDINTERCEPT.this);
-        }
-
-        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
-        	sendEvent(new ActivityExecEndEvent());
-          if (faultData != null) {
-            dpe(_oactivity.sourceLinks);
-            _self.parent.completed(faultData, compensations);
-          } else {
-            FaultData fault = null;
-            for (Iterator<OLink> i = _oactivity.sourceLinks.iterator();i.hasNext();) {
-              OLink olink = i.next();
-              LinkInfo linfo = _linkFrame.resolve(olink);
-              try {
-                boolean val = evaluateTransitionCondition(olink.transitionCondition);
-                linfo.pub.linkStatus(val);
-              } catch (FaultException e) {
-                linfo.pub.linkStatus(false);
-                if (fault == null)
-                  fault = createFault(e.getQName(),olink.transitionCondition);
-              }
+                ActivityExecStartEvent aese = new ActivityExecStartEvent();
+                sendEvent(aese);
+                // intercept completion channel in order to execute transition conditions.
+                ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScopeChannel.class));
+                instance(createActivity(activity));
+                instance(new TCONDINTERCEPT(activity.parent));
+            } else {
+                // Join Failure.
+                _self.parent.completed(createFault(_oactivity.getOwner().constants.qnJoinFailure,_oactivity),
+                        CompensationHandler.emptySet());
+
+                // Dead path activity.
+                dpe(_oactivity);
+            }
+        } else /* don't know all our links statuses */ {
+            Set<ChannelListener> mlset = new HashSet<ChannelListener>();
+            mlset.add(new TerminationChannelListener(_self.self) {
+                private static final long serialVersionUID = 5094153128476008961L;
+
+                public void terminate() {
+                    // Complete immediately, without faulting or registering any comps.
+                    _self.parent.completed(null, CompensationHandler.emptySet());
+
+                    // Dead-path activity
+                    dpe(_oactivity);
+                }
+            });
+            for (Iterator<OLink> i = _oactivity.targetLinks.iterator();i.hasNext();) {
+                final OLink link = i.next();
+                mlset.add(new LinkStatusChannelListener(_linkFrame.resolve(link).sub) {
+                    private static final long serialVersionUID = 1024137371118887935L;
+
+                    public void linkStatus(boolean value) {
+                        _linkVals.put(link, Boolean.valueOf(value));
+                        instance(ACTIVITYGUARD.this);
+                    }
+                });
             }
-            _self.parent.completed(fault, compensations);
-          }
+
+            object(false, mlset);
+        }
+    }
+
+
+    private boolean evaluateTransitionCondition(OExpression transitionCondition)
+            throws FaultException {
+        if (transitionCondition == null)
+            return true;
+
+        try {
+            return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(transitionCondition,
+                    new ExprEvaluationContextImpl(_scopeFrame, getBpelRuntimeContext()));
+        } catch (EvaluationException e) {
+            String msg = "Error in transition condition detected at runtime; condition=" + transitionCondition;
+            __log.error(msg,e);
+            throw new InvalidProcessException(msg, e);
+        }
+    }
+
+    /**
+     * Evaluate an activity's join condition.
+     * @return <code>true</code> if join condition evaluates to true.
+     */
+    private boolean evaluateJoinCondition() {
+        // For activities with no link targets, the join condition is always satisfied.
+        if (_oactivity.targetLinks.size() == 0)
+            return true;
+
+        // For activities with no join condition, an OR condition is assumed.
+        if (_oactivity.joinCondition == null)
+            return _linkVals.values().contains(Boolean.TRUE);
+
+        try {
+            return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(_oactivity.joinCondition,
+                    new ExprEvaluationContextImpl(null,null,_linkVals));
+        } catch (Exception e) {
+            String msg = "Unexpected error evaluating a join condition: " + _oactivity.joinCondition;
+            __log.error(msg,e);
+            throw new InvalidProcessException(msg,e);
+        }
+    }
+
+    private static ACTIVITY createActivity(ActivityInfo activity, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+        return __activityTemplateFactory.createInstance(activity.o,activity, scopeFrame, linkFrame);
+    }
+
+    private ACTIVITY createActivity(ActivityInfo activity) {
+        return createActivity(activity,_scopeFrame, _linkFrame);
+    }
+
+
+    /**
+     * Intercepts the
+     * {@link ParentScopeChannel#completed(org.apache.ode.bpel.runtime.channels.FaultData, java.util.Set<org.apache.ode.bpel.runtime.CompensationHandler>)}
+     * call, to evaluate transition conditions before returning to the parent.
+     */
+    private class TCONDINTERCEPT extends BpelJacobRunnable {
+        private static final long serialVersionUID = 4014873396828400441L;
+        ParentScopeChannel _in;
+
+        public TCONDINTERCEPT(ParentScopeChannel in) {
+            _in = in;
         }
-      });
 
+        public void run() {
+            object(new ParentScopeChannelListener(_in) {
+                private static final long serialVersionUID = 2667359535900385952L;
+
+                public void compensate(OScope scope, SynchChannel ret) {
+                    _self.parent.compensate(scope,ret);
+                    instance(TCONDINTERCEPT.this);
+                }
+
+                public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+                    sendEvent(new ActivityExecEndEvent());
+                    if (faultData != null) {
+                        dpe(_oactivity.sourceLinks);
+                        _self.parent.completed(faultData, compensations);
+                    } else {
+                        FaultData fault = null;
+                        for (Iterator<OLink> i = _oactivity.sourceLinks.iterator();i.hasNext();) {
+                            OLink olink = i.next();
+                            LinkInfo linfo = _linkFrame.resolve(olink);
+                            try {
+                                boolean val = evaluateTransitionCondition(olink.transitionCondition);
+                                linfo.pub.linkStatus(val);
+                            } catch (FaultException e) {
+                                linfo.pub.linkStatus(false);
+                                __log.error(e);
+                                if (fault == null)
+                                    fault = createFault(e.getQName(),olink.transitionCondition);
+                            }
+                        }
+                        _self.parent.completed(fault, compensations);
+                    }
+                }
+            });
+
+        }
     }
-  }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java Wed Oct  4 15:56:58 2006
@@ -89,9 +89,8 @@
         }
 
         if (faultData != null) {
-            if (__log.isDebugEnabled())
-                __log.debug("Assignment Fault: " + faultData.getFaultName()
-                        + ",lineNo=" + faultData.getFaultLineNo());
+            __log.error("Assignment Fault: " + faultData.getFaultName()
+                    + ",lineNo=" + faultData.getFaultLineNo());
             _self.parent.completed(faultData, CompensationHandler.emptySet());
         } else {
             _self.parent.completed(null, CompensationHandler.emptySet());

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java Wed Oct  4 15:56:58 2006
@@ -33,6 +33,8 @@
 import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
 import org.apache.ode.bpel.runtime.channels.TimerResponseChannelListener;
 import org.apache.ode.jacob.SynchChannel;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.Calendar;
 import java.util.HashSet;
@@ -45,194 +47,200 @@
  * the event handler completes naturally.
  */
 class EH_ALARM extends BpelJacobRunnable {
-	private static final long serialVersionUID = 1L;
 
-	private ParentScopeChannel _psc;
-  private TerminationChannel _tc;
-  private OEventHandler.OAlarm _oalarm;
-  private ScopeFrame _scopeFrame;
-  private EventHandlerControlChannel _cc;
-  private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
-
-  /**
-   * Concretion constructor.
-   * @param psc a link to our parent.
-   * @param tc channel we listen on for termination requests
-   * @param cc channel we listen on for "stop" requests
-   * @param o our prototype / compiled representation
-   * @param scopeFrame the {@link ScopeFrame} in which we are executing
-   */
-  EH_ALARM(ParentScopeChannel psc, TerminationChannel tc, EventHandlerControlChannel cc, OEventHandler.OAlarm o, ScopeFrame scopeFrame) {
-    _psc = psc;
-    _tc = tc;
-    _cc = cc;
-    _scopeFrame = scopeFrame;
-    _oalarm  = o;
-  }
-
-  public void run() {
-
-    Calendar alarm = Calendar.getInstance();
-
-    if (_oalarm.forExpr != null)
-      try {
-        getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(alarm);
-      } catch (EvaluationException e) {
-        throw new InvalidProcessException(e);
-      } catch (FaultException e) {
-        _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
-        return;
-      }
-    else if (_oalarm.untilExpr != null)
-      try {
-        alarm.setTime(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDate(_oalarm.untilExpr, getEvaluationContext()).getTime());
-      } catch (EvaluationException e) {
-        throw new InvalidProcessException(e);
-      } catch (FaultException e) {
-        _psc.completed(createFault(e.getQName(),_oalarm.untilExpr), _comps);
-        return;
-      }
-
-    // We reduce to waiting for the alarm to be triggered.
-    instance(new WAIT(alarm));
-  }
-
-  protected EvaluationContext getEvaluationContext() {
-    return new ExprEvaluationContextImpl(_scopeFrame,getBpelRuntimeContext());
-  }
-
-  /**
-   * Template used to wait until a given time, reduing to a {@link FIRE} after the
-   * elapsed time. This template also monitors the termination and event-control channels
-   * for requests from parent.
-   */
-  private class WAIT extends BpelJacobRunnable {
-    private static final long serialVersionUID = -1426724996925898213L;
-    Calendar _alarm;
+    private static final Log __log = LogFactory.getLog(EH_ALARM.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private ParentScopeChannel _psc;
+    private TerminationChannel _tc;
+    private OEventHandler.OAlarm _oalarm;
+    private ScopeFrame _scopeFrame;
+    private EventHandlerControlChannel _cc;
+    private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
 
     /**
      * Concretion constructor.
-     * @param alarm date at which time to fire
+     * @param psc a link to our parent.
+     * @param tc channel we listen on for termination requests
+     * @param cc channel we listen on for "stop" requests
+     * @param o our prototype / compiled representation
+     * @param scopeFrame the {@link ScopeFrame} in which we are executing
      */
-    WAIT(Calendar alarm) {
-      _alarm = alarm;
+    EH_ALARM(ParentScopeChannel psc, TerminationChannel tc, EventHandlerControlChannel cc, OEventHandler.OAlarm o, ScopeFrame scopeFrame) {
+        _psc = psc;
+        _tc = tc;
+        _cc = cc;
+        _scopeFrame = scopeFrame;
+        _oalarm  = o;
     }
 
     public void run() {
-      Calendar now = Calendar.getInstance();
-
-      if (now.before(_alarm)) {
-        TimerResponseChannel trc = newChannel(TimerResponseChannel.class);
-        getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
-        object(false,new TimerResponseChannelListener(trc){
-        private static final long serialVersionUID = 1110683632756756017L;
-
-        public void onTimeout() {
-            // This is what we are waiting for, fire the activity
-            instance(new FIRE());
-          }
-
-          public void onCancel() {
-            _psc.completed(null, _comps);
-          }
-        }.or(new EventHandlerControlChannelListener(_cc) {
-        private static final long serialVersionUID = -7750428941445331236L;
-
-        public void stop() {
-            _psc.completed(null, _comps);
-          }
-
-        }.or(new TerminationChannelListener(_tc) {
-        private static final long serialVersionUID = 6100105997983514609L;
-
-        public void terminate() {
-            _psc.completed(null, _comps);
-          }
-        })));
-      } else /* now is later then alarm time */ {
-        // If the alarm has passed we fire the nested activity
-        instance(new FIRE());
-      }
-
-    }
-  }
-
-  /**
-   * Snipped that fires the alarm activity.
-   */
-  private class FIRE extends BpelJacobRunnable {
-    private static final long serialVersionUID = -7261315204412433250L;
-
-    public void run() {
-      // Start the child activity.
-      ActivityInfo child = new ActivityInfo(genMonotonic(),
-              _oalarm.activity,
-              newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
-      instance(createChild(child, _scopeFrame, new LinkFrame(null) ));
-      instance(new ACTIVE(child));
-    }
-  }
-
-  /**
-   * Snippet that is used to monitor a running activity.
-   */
-  private class ACTIVE extends BpelJacobRunnable {
-    private static final long serialVersionUID = -2166253425722769701L;
 
-    private ActivityInfo _activity;
+        Calendar alarm = Calendar.getInstance();
 
-    /** Indicates whether our parent has requested a stop. */
-    private boolean _stopped = false;
+        if (_oalarm.forExpr != null)
+            try {
+                getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(alarm);
+            } catch (EvaluationException e) {
+                throw new InvalidProcessException(e);
+            } catch (FaultException e) {
+                __log.error(e);
+                _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
+                return;
+            }
+        else if (_oalarm.untilExpr != null)
+            try {
+                alarm.setTime(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDate(_oalarm.untilExpr, getEvaluationContext()).getTime());
+            } catch (EvaluationException e) {
+                throw new InvalidProcessException(e);
+            } catch (FaultException e) {
+                __log.error(e);
+                _psc.completed(createFault(e.getQName(),_oalarm.untilExpr), _comps);
+                return;
+            }
 
-    ACTIVE(ActivityInfo activity) {
-      _activity = activity;
+        // We reduce to waiting for the alarm to be triggered.
+        instance(new WAIT(alarm));
     }
 
-    public void run() {
-      object(false,new ParentScopeChannelListener(_activity.parent){
-        private static final long serialVersionUID = -3357030137175178040L;
+    protected EvaluationContext getEvaluationContext() {
+        return new ExprEvaluationContextImpl(_scopeFrame,getBpelRuntimeContext());
+    }
 
-        public void compensate(OScope scope, SynchChannel ret) {
-          _psc.compensate(scope,ret);
-          instance(ACTIVE.this);
+    /**
+     * Template used to wait until a given time, reduing to a {@link FIRE} after the
+     * elapsed time. This template also monitors the termination and event-control channels
+     * for requests from parent.
+     */
+    private class WAIT extends BpelJacobRunnable {
+        private static final long serialVersionUID = -1426724996925898213L;
+        Calendar _alarm;
+
+        /**
+         * Concretion constructor.
+         * @param alarm date at which time to fire
+         */
+        WAIT(Calendar alarm) {
+            _alarm = alarm;
         }
 
-        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
-          _comps.addAll(compensations);
-          if (!_stopped && _oalarm.repeatExpr != null) {
-            Calendar next = Calendar.getInstance();
-            try {
-              getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(next);
-            } catch (EvaluationException e) {
-              throw new InvalidProcessException(e);
-            } catch (FaultException e) {
-              _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
-              return;
+        public void run() {
+            Calendar now = Calendar.getInstance();
+
+            if (now.before(_alarm)) {
+                TimerResponseChannel trc = newChannel(TimerResponseChannel.class);
+                getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
+                object(false,new TimerResponseChannelListener(trc){
+                    private static final long serialVersionUID = 1110683632756756017L;
+
+                    public void onTimeout() {
+                        // This is what we are waiting for, fire the activity
+                        instance(new FIRE());
+                    }
+
+                    public void onCancel() {
+                        _psc.completed(null, _comps);
+                    }
+                }.or(new EventHandlerControlChannelListener(_cc) {
+                    private static final long serialVersionUID = -7750428941445331236L;
+
+                    public void stop() {
+                        _psc.completed(null, _comps);
+                    }
+
+                }.or(new TerminationChannelListener(_tc) {
+                    private static final long serialVersionUID = 6100105997983514609L;
+
+                    public void terminate() {
+                        _psc.completed(null, _comps);
+                    }
+                })));
+            } else /* now is later then alarm time */ {
+                // If the alarm has passed we fire the nested activity
+                instance(new FIRE());
             }
-            instance(new WAIT(next));
-          } else {
-            _psc.completed(faultData, _comps);
-          }
+
         }
+    }
 
-      }.or(new EventHandlerControlChannelListener(_cc) {
-        private static final long serialVersionUID = -3873619538789039424L;
+    /**
+     * Snipped that fires the alarm activity.
+     */
+    private class FIRE extends BpelJacobRunnable {
+        private static final long serialVersionUID = -7261315204412433250L;
 
-        public void stop() {
-          _stopped = true;
-          instance(ACTIVE.this);
+        public void run() {
+            // Start the child activity.
+            ActivityInfo child = new ActivityInfo(genMonotonic(),
+                    _oalarm.activity,
+                    newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+            instance(createChild(child, _scopeFrame, new LinkFrame(null) ));
+            instance(new ACTIVE(child));
         }
+    }
+
+    /**
+     * Snippet that is used to monitor a running activity.
+     */
+    private class ACTIVE extends BpelJacobRunnable {
+        private static final long serialVersionUID = -2166253425722769701L;
+
+        private ActivityInfo _activity;
 
-      }.or(new TerminationChannelListener(_tc) {
-        private static final long serialVersionUID = -4566956567870652885L;
+        /** Indicates whether our parent has requested a stop. */
+        private boolean _stopped = false;
 
-        public void terminate() {
-          replication(_activity.self).terminate();
-          _stopped = true;
-          instance(ACTIVE.this);
+        ACTIVE(ActivityInfo activity) {
+            _activity = activity;
         }
-      })));
 
+        public void run() {
+            object(false,new ParentScopeChannelListener(_activity.parent){
+                private static final long serialVersionUID = -3357030137175178040L;
+
+                public void compensate(OScope scope, SynchChannel ret) {
+                    _psc.compensate(scope,ret);
+                    instance(ACTIVE.this);
+                }
+
+                public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+                    _comps.addAll(compensations);
+                    if (!_stopped && _oalarm.repeatExpr != null) {
+                        Calendar next = Calendar.getInstance();
+                        try {
+                            getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(next);
+                        } catch (EvaluationException e) {
+                            throw new InvalidProcessException(e);
+                        } catch (FaultException e) {
+                            __log.error(e);
+                            _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
+                            return;
+                        }
+                        instance(new WAIT(next));
+                    } else {
+                        _psc.completed(faultData, _comps);
+                    }
+                }
+
+            }.or(new EventHandlerControlChannelListener(_cc) {
+                private static final long serialVersionUID = -3873619538789039424L;
+
+                public void stop() {
+                    _stopped = true;
+                    instance(ACTIVE.this);
+                }
+
+            }.or(new TerminationChannelListener(_tc) {
+                private static final long serialVersionUID = -4566956567870652885L;
+
+                public void terminate() {
+                    replication(_activity.self).terminate();
+                    _stopped = true;
+                    instance(ACTIVE.this);
+                }
+            })));
+
+        }
     }
-  }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java Wed Oct  4 15:56:58 2006
@@ -46,247 +46,249 @@
  */
 class EH_EVENT extends BpelJacobRunnable {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private static final Log __log = LogFactory.getLog(EH_EVENT.class);
+    private static final Log __log = LogFactory.getLog(EH_EVENT.class);
 
-  private EventHandlerControlChannel _ehc;
-  private TerminationChannel _tc;
-  private ParentScopeChannel _psc;
-  private ScopeFrame _scopeFrame;
-  private OEventHandler.OEvent _oevent;
+    private EventHandlerControlChannel _ehc;
+    private TerminationChannel _tc;
+    private ParentScopeChannel _psc;
+    private ScopeFrame _scopeFrame;
+    private OEventHandler.OEvent _oevent;
 
-  /** Registered compensation handlers. */
-  private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
+    /** Registered compensation handlers. */
+    private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
 
-  private FaultData _fault;
+    private FaultData _fault;
 
-  /** Active instances (we can have more than one!) */
-  private Set<ActivityInfo> _active = new HashSet<ActivityInfo>();
+    /** Active instances (we can have more than one!) */
+    private Set<ActivityInfo> _active = new HashSet<ActivityInfo>();
 
-  /** Whether a stop has been requested; if so no more new instances. */
-  private boolean _stopped;
+    /** Whether a stop has been requested; if so no more new instances. */
+    private boolean _stopped;
 
-  /** Has a termination of this handler been requested. */
-  private boolean _terminated;
+    /** Has a termination of this handler been requested. */
+    private boolean _terminated;
 
-  private boolean _childrenTerminated;
+    private boolean _childrenTerminated;
 
 
-  EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
-    _scopeFrame = scopeFrame;
-    _oevent = o;
-    _tc = tc;
-    _psc = psc;
-    _ehc = ehc;
-  }
-
-
-  public void run() {
-    instance(new SELECT());
-  }
-
-  /**
-   * Terminate all the active activities.
-   */
-  private void terminateActive() {
-    if (!_childrenTerminated) {
-      for (ActivityInfo tact : _active) {
-        replication(tact.self).terminate();
-      }
-      _childrenTerminated = true;
+    EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
+        _scopeFrame = scopeFrame;
+        _oevent = o;
+        _tc = tc;
+        _psc = psc;
+        _ehc = ehc;
     }
-  }
-  /**
-   * Template that does the actual selection interaction with the runtime system, and
-   * then waits on the pick response channel.
-   */
-  class SELECT extends BpelJacobRunnable {
 
-		private static final long serialVersionUID = 1L;
 
-		/**
-     * @see org.apache.ode.jacob.JacobRunnable#run()
-     */
     public void run() {
-      Selector selector;
-      try {
-        PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
-        CorrelationKey key;
-        PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
-        if (_oevent.matchCorrelation == null) {
-          // Adding a route for opaque correlation. In this case correlation is done on "out-of-band" session id.
-          String sessionId = getBpelRuntimeContext().fetchMySessionId(pLinkInstance);
-          key = new CorrelationKey(-1, new String[] {sessionId});
-        } else {
-          if (!getBpelRuntimeContext().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation))) {
-            throw new FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation not initialized.");
-          }
-          key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
-          assert key != null;
-        }
-
-        selector =  new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
-        getBpelRuntimeContext().select(pickResponseChannel, null, false, new Selector[] { selector} );
-        instance(new WAITING(pickResponseChannel));
-      } catch(FaultException e){
-        if (_fault == null) {
-          _fault = createFault(e.getQName(), _oevent);
-        }
-        terminateActive();
-        instance(new WAITING(null));
-      }
+        instance(new SELECT());
     }
-  }
-
-  /**
-   * Template that represents the waiting for a pick response.
-   */
-  private class WAITING extends BpelJacobRunnable {
-		private static final long serialVersionUID = 1L;
-		private PickResponseChannel _pickResponseChannel;
 
-    private WAITING(PickResponseChannel pickResponseChannel) {
-      _pickResponseChannel = pickResponseChannel;
+    /**
+     * Terminate all the active activities.
+     */
+    private void terminateActive() {
+        if (!_childrenTerminated) {
+            for (ActivityInfo tact : _active) {
+                replication(tact.self).terminate();
+            }
+            _childrenTerminated = true;
+        }
     }
+    /**
+     * Template that does the actual selection interaction with the runtime system, and
+     * then waits on the pick response channel.
+     */
+    class SELECT extends BpelJacobRunnable {
 
-    public void run() {
+        private static final long serialVersionUID = 1L;
 
-      if (!_active.isEmpty() || _pickResponseChannel != null) {
-        HashSet<ChannelListener> mlset = new HashSet<ChannelListener>();
+        /**
+         * @see org.apache.ode.jacob.JacobRunnable#run()
+         */
+        public void run() {
+            Selector selector;
+            try {
+                PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
+                CorrelationKey key;
+                PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
+                if (_oevent.matchCorrelation == null) {
+                    // Adding a route for opaque correlation. In this case correlation is done on "out-of-band" session id.
+                    String sessionId = getBpelRuntimeContext().fetchMySessionId(pLinkInstance);
+                    key = new CorrelationKey(-1, new String[] {sessionId});
+                } else {
+                    if (!getBpelRuntimeContext().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation))) {
+                        throw new FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation not initialized.");
+                    }
+                    key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
+                    assert key != null;
+                }
 
-        if (!_terminated) {
-          mlset.add(new TerminationChannelListener(_tc) {
-            private static final long serialVersionUID = 7666910462948788042L;
-
-            public void terminate() {
-              terminateActive();
-              _terminated = true;
-              if (_pickResponseChannel != null)
-                getBpelRuntimeContext().cancel(_pickResponseChannel);
-              instance(WAITING.this);
+                selector =  new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+                getBpelRuntimeContext().select(pickResponseChannel, null, false, new Selector[] { selector} );
+                instance(new WAITING(pickResponseChannel));
+            } catch(FaultException e){
+                __log.error(e);
+                if (_fault == null) {
+                    _fault = createFault(e.getQName(), _oevent);
+                }
+                terminateActive();
+                instance(new WAITING(null));
             }
-          });
-
         }
+    }
 
-        if (!_stopped) {
-          mlset.add(new EventHandlerControlChannelListener(_ehc) {
-            private static final long serialVersionUID = -1050788954724647970L;
-
-            public void stop() {
-              _stopped = true;
-              if (_pickResponseChannel != null)
-                getBpelRuntimeContext().cancel(_pickResponseChannel);
-              instance(WAITING.this);
-            }
-          });
+    /**
+     * Template that represents the waiting for a pick response.
+     */
+    private class WAITING extends BpelJacobRunnable {
+        private static final long serialVersionUID = 1L;
+        private PickResponseChannel _pickResponseChannel;
 
+        private WAITING(PickResponseChannel pickResponseChannel) {
+            _pickResponseChannel = pickResponseChannel;
         }
 
-        for (final ActivityInfo ai : _active) {
-          mlset.add(new ParentScopeChannelListener(ai.parent) {
-            private static final long serialVersionUID = 5341207762415360982L;
-
-            public void compensate(OScope scope, SynchChannel ret) {
-              _psc.compensate(scope, ret);
-              instance(WAITING.this);
-            }
+        public void run() {
 
-            public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
-              _active.remove(ai);
-              _comps.addAll(compensations);
-              if (faultData != null && _fault == null) {
-                _fault = faultData;
-                terminateActive();
-              }
+            if (!_active.isEmpty() || _pickResponseChannel != null) {
+                HashSet<ChannelListener> mlset = new HashSet<ChannelListener>();
 
-              instance(WAITING.this);
-            }
-          });
-        }
+                if (!_terminated) {
+                    mlset.add(new TerminationChannelListener(_tc) {
+                        private static final long serialVersionUID = 7666910462948788042L;
+
+                        public void terminate() {
+                            terminateActive();
+                            _terminated = true;
+                            if (_pickResponseChannel != null)
+                                getBpelRuntimeContext().cancel(_pickResponseChannel);
+                            instance(WAITING.this);
+                        }
+                    });
 
-        if (_pickResponseChannel != null)
-          mlset.add(new PickResponseChannelListener(_pickResponseChannel) {
-            private static final long serialVersionUID = -4929999153478677288L;
-
-
-            public void onRequestRcvd(int selectorIdx, String mexId) {
-              Element msgEl = getBpelRuntimeContext().getMyRequest(mexId);
-              try {
-                getBpelRuntimeContext().initializeVariable(_scopeFrame.resolve(_oevent.variable),msgEl);
-              } catch (Exception ex) {
-                __log.error(ex);
-                throw new InvalidProcessException(ex);
-              }
-
-              try {
-                for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
-                  initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(_oevent.variable));
-                }
-                
-                if (_oevent.partnerLink.hasPartnerRole()) {
-                  // Trying to initialize partner epr based on a message-provided epr/session.
-                  if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
-                          .resolve(_oevent.partnerLink))) {
-                      Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
-                      if (fromEpr != null) {
-                        getBpelRuntimeContext().writeEndpointReference(
-                                _scopeFrame.resolve(_oevent.partnerLink), (Element) fromEpr);
-                      }
-                  }
-                  
-                  String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
-                  if (partnersSessionId != null)
-                      getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oevent.partnerLink),
-                              partnersSessionId);
-                }
-                
-               
-                
-                
-              } catch (FaultException e) {
-                if (_fault == null) {
-                  _fault = createFault(e.getQName(), _oevent);
-                  terminateActive();
                 }
-                instance(new WAITING(null));
-                return;
-              }
 
-              // activate 'onMessage' activity; we'll do this even if a stop/terminate has been
-              // requested becasue we cannot undo the receipt of the message at this point.
-              ActivityInfo child = new ActivityInfo(genMonotonic(),
-                      _oevent.activity,
-                      newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
-              _active.add(child);
-              // If we previously terminated the other activiites, then we do the same
-              // here; this is easier then undoing the receive.
-              if (_childrenTerminated)
-                replication(child.self).terminate();
-              instance(createChild(child,_scopeFrame, new LinkFrame(null)));
-
-              if (_terminated || _stopped || _fault != null)
-                instance(new WAITING(null));
-              else
-                instance(new SELECT());
-            }
+                if (!_stopped) {
+                    mlset.add(new EventHandlerControlChannelListener(_ehc) {
+                        private static final long serialVersionUID = -1050788954724647970L;
+
+                        public void stop() {
+                            _stopped = true;
+                            if (_pickResponseChannel != null)
+                                getBpelRuntimeContext().cancel(_pickResponseChannel);
+                            instance(WAITING.this);
+                        }
+                    });
 
+                }
 
-            public void onTimeout() {
-              instance(new WAITING(null));
-            }
+                for (final ActivityInfo ai : _active) {
+                    mlset.add(new ParentScopeChannelListener(ai.parent) {
+                        private static final long serialVersionUID = 5341207762415360982L;
+
+                        public void compensate(OScope scope, SynchChannel ret) {
+                            _psc.compensate(scope, ret);
+                            instance(WAITING.this);
+                        }
+
+                        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+                            _active.remove(ai);
+                            _comps.addAll(compensations);
+                            if (faultData != null && _fault == null) {
+                                _fault = faultData;
+                                terminateActive();
+                            }
+
+                            instance(WAITING.this);
+                        }
+                    });
+                }
 
-            public void onCancel() {
-              instance(new WAITING(null));
+                if (_pickResponseChannel != null)
+                    mlset.add(new PickResponseChannelListener(_pickResponseChannel) {
+                        private static final long serialVersionUID = -4929999153478677288L;
+
+
+                        public void onRequestRcvd(int selectorIdx, String mexId) {
+                            Element msgEl = getBpelRuntimeContext().getMyRequest(mexId);
+                            try {
+                                getBpelRuntimeContext().initializeVariable(_scopeFrame.resolve(_oevent.variable),msgEl);
+                            } catch (Exception ex) {
+                                __log.error(ex);
+                                throw new InvalidProcessException(ex);
+                            }
+
+                            try {
+                                for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
+                                    initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(_oevent.variable));
+                                }
+
+                                if (_oevent.partnerLink.hasPartnerRole()) {
+                                    // Trying to initialize partner epr based on a message-provided epr/session.
+                                    if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
+                                            .resolve(_oevent.partnerLink))) {
+                                        Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
+                                        if (fromEpr != null) {
+                                            getBpelRuntimeContext().writeEndpointReference(
+                                                    _scopeFrame.resolve(_oevent.partnerLink), (Element) fromEpr);
+                                        }
+                                    }
+
+                                    String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
+                                    if (partnersSessionId != null)
+                                        getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oevent.partnerLink),
+                                                partnersSessionId);
+                                }
+
+
+
+
+                            } catch (FaultException e) {
+                                __log.error(e);
+                                if (_fault == null) {
+                                    _fault = createFault(e.getQName(), _oevent);
+                                    terminateActive();
+                                }
+                                instance(new WAITING(null));
+                                return;
+                            }
+
+                            // activate 'onMessage' activity; we'll do this even if a stop/terminate has been
+                            // requested becasue we cannot undo the receipt of the message at this point.
+                            ActivityInfo child = new ActivityInfo(genMonotonic(),
+                                    _oevent.activity,
+                                    newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+                            _active.add(child);
+                            // If we previously terminated the other activiites, then we do the same
+                            // here; this is easier then undoing the receive.
+                            if (_childrenTerminated)
+                                replication(child.self).terminate();
+                            instance(createChild(child,_scopeFrame, new LinkFrame(null)));
+
+                            if (_terminated || _stopped || _fault != null)
+                                instance(new WAITING(null));
+                            else
+                                instance(new SELECT());
+                        }
+
+
+                        public void onTimeout() {
+                            instance(new WAITING(null));
+                        }
+
+                        public void onCancel() {
+                            instance(new WAITING(null));
+                        }
+                    });
+
+                object(false, mlset);
+            } else /* Nothing more to do. */ {
+                _psc.completed(_fault, _comps);
             }
-          });
+        }
 
-        object(false, mlset);
-      } else /* Nothing more to do. */ {
-        _psc.completed(_fault, _comps);
-      }
     }
-
-  }
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java Wed Oct  4 15:56:58 2006
@@ -46,180 +46,181 @@
 
 public class FOREACH extends ACTIVITY {
 
-  private static final long serialVersionUID = 1L;
-  private static final Log __log = LogFactory.getLog(FOREACH.class);
+    private static final long serialVersionUID = 1L;
+    private static final Log __log = LogFactory.getLog(FOREACH.class);
 
-  private OForEach _oforEach;
-  private Set<ChildInfo> _children = new HashSet<ChildInfo>();
-  private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
-  private int _startCounter = -1;
-  private int _finalCounter = -1;
-  private int _currentCounter = -1;
-  private int _completedCounter = 0;
-  private int _completionCounter = -1;
-
-  public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
-    super(self,frame, linkFrame);
-    _oforEach = (OForEach) self.o;
-  }
-
-  public void run() {
-    try {
-      _startCounter = evaluateCondition(_oforEach.startCounterValue);
-      _finalCounter = evaluateCondition(_oforEach.finalCounterValue);
-      if (_oforEach.completionCondition != null) {
-        _completionCounter = evaluateCondition(_oforEach.completionCondition.branchCount);
-      }
-      _currentCounter = _startCounter;
-    } catch (FaultException fe) {
-      _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
-      return;
-    }
-
-    // Checking for bpws:invalidBranchCondition when the counter limit is superior
-    // to the maximum number of children
-    if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
-      _self.parent.completed(
-              createFault(_oforEach.getOwner().constants.qnInvalidBranchCondition, _self.o), _compHandlers);
-      return;
-    }
-
-    // There's really nothing to do
-    if (_finalCounter < _startCounter || _completionCounter == 0) {
-      _self.parent.completed(null, _compHandlers);
-    } else {
-      // If we're parrallel, starting all our child copies, otherwise one will suffice.
-      if (_oforEach.parallel) {
-        for (int m = _startCounter; m <= _finalCounter; m++) {
-          newChild();
-        }
-      } else newChild();
-      instance(new ACTIVE());
+    private OForEach _oforEach;
+    private Set<ChildInfo> _children = new HashSet<ChildInfo>();
+    private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
+    private int _startCounter = -1;
+    private int _finalCounter = -1;
+    private int _currentCounter = -1;
+    private int _completedCounter = 0;
+    private int _completionCounter = -1;
+
+    public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
+        super(self,frame, linkFrame);
+        _oforEach = (OForEach) self.o;
     }
-  }
-
-  private class ACTIVE extends BpelJacobRunnable {
-    private static final long serialVersionUID = -5642862698981385732L;
-
-    private FaultData _fault;
-    private boolean _terminateRequested = false;
 
     public void run() {
-      Iterator<ChildInfo> active = active();
-      // Continuing as long as a child is active
-      if (active().hasNext()) {
-
-        Set<ChannelListener> mlSet = new HashSet<ChannelListener>();
-        mlSet.add(new TerminationChannelListener(_self.self) {
-          private static final long serialVersionUID = 2554750257484084466L;
-
-          public void terminate() {
-            // Terminating all children before sepuku
-            for (Iterator<ChildInfo> i = active(); i.hasNext(); )
-              replication(i.next().activity.self).terminate();
-            _terminateRequested = true;
-            instance(ACTIVE.this);
-          }
-        });
-        for (;active.hasNext();) {
-          // Checking out our children
-          final ChildInfo child = active.next();
-          mlSet.add(new ParentScopeChannelListener(child.activity.parent) {
-            private static final long serialVersionUID = -8027205709961438172L;
-
-            public void compensate(OScope scope, SynchChannel ret) {
-              // Forward compensation to parent
-              _self.parent.compensate(scope, ret);
-              instance(ACTIVE.this);
+        try {
+            _startCounter = evaluateCondition(_oforEach.startCounterValue);
+            _finalCounter = evaluateCondition(_oforEach.finalCounterValue);
+            if (_oforEach.completionCondition != null) {
+                _completionCounter = evaluateCondition(_oforEach.completionCondition.branchCount);
             }
+            _currentCounter = _startCounter;
+        } catch (FaultException fe) {
+            __log.error(fe);
+            _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
+            return;
+        }
+
+        // Checking for bpws:invalidBranchCondition when the counter limit is superior
+        // to the maximum number of children
+        if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
+            _self.parent.completed(
+                    createFault(_oforEach.getOwner().constants.qnInvalidBranchCondition, _self.o), _compHandlers);
+            return;
+        }
+
+        // There's really nothing to do
+        if (_finalCounter < _startCounter || _completionCounter == 0) {
+            _self.parent.completed(null, _compHandlers);
+        } else {
+            // If we're parrallel, starting all our child copies, otherwise one will suffice.
+            if (_oforEach.parallel) {
+                for (int m = _startCounter; m <= _finalCounter; m++) {
+                    newChild();
+                }
+            } else newChild();
+            instance(new ACTIVE());
+        }
+    }
+
+    private class ACTIVE extends BpelJacobRunnable {
+        private static final long serialVersionUID = -5642862698981385732L;
+
+        private FaultData _fault;
+        private boolean _terminateRequested = false;
 
-            public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
-              child.completed = true;
-              //
-              if (_completionCounter > 0 && _oforEach.completionCondition.successfulBranchesOnly) {
-                 if (faultData != null) _completedCounter++;
-              } else _completedCounter++;
-
-              _compHandlers.addAll(compensations);
-
-              // Keeping the fault to let everybody know
-              if (faultData != null && _fault == null) {
-                _fault = faultData;
-              }
-              if (shouldContinue() && _fault == null && !_terminateRequested) {
-                // Everything fine. If parrallel, just let our children be, otherwise making a new child
-                if (!_oforEach.parallel) newChild();
-              } else {
-                // Work is done or something wrong happened, children shouldn't continue
-                for (Iterator<ChildInfo> i = active(); i.hasNext(); )
-                  replication(i.next().activity.self).terminate();
-              }
-              instance(ACTIVE.this);
+        public void run() {
+            Iterator<ChildInfo> active = active();
+            // Continuing as long as a child is active
+            if (active().hasNext()) {
+
+                Set<ChannelListener> mlSet = new HashSet<ChannelListener>();
+                mlSet.add(new TerminationChannelListener(_self.self) {
+                    private static final long serialVersionUID = 2554750257484084466L;
+
+                    public void terminate() {
+                        // Terminating all children before sepuku
+                        for (Iterator<ChildInfo> i = active(); i.hasNext(); )
+                            replication(i.next().activity.self).terminate();
+                        _terminateRequested = true;
+                        instance(ACTIVE.this);
+                    }
+                });
+                for (;active.hasNext();) {
+                    // Checking out our children
+                    final ChildInfo child = active.next();
+                    mlSet.add(new ParentScopeChannelListener(child.activity.parent) {
+                        private static final long serialVersionUID = -8027205709961438172L;
+
+                        public void compensate(OScope scope, SynchChannel ret) {
+                            // Forward compensation to parent
+                            _self.parent.compensate(scope, ret);
+                            instance(ACTIVE.this);
+                        }
+
+                        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+                            child.completed = true;
+                            //
+                            if (_completionCounter > 0 && _oforEach.completionCondition.successfulBranchesOnly) {
+                                if (faultData != null) _completedCounter++;
+                            } else _completedCounter++;
+
+                            _compHandlers.addAll(compensations);
+
+                            // Keeping the fault to let everybody know
+                            if (faultData != null && _fault == null) {
+                                _fault = faultData;
+                            }
+                            if (shouldContinue() && _fault == null && !_terminateRequested) {
+                                // Everything fine. If parrallel, just let our children be, otherwise making a new child
+                                if (!_oforEach.parallel) newChild();
+                            } else {
+                                // Work is done or something wrong happened, children shouldn't continue
+                                for (Iterator<ChildInfo> i = active(); i.hasNext(); )
+                                    replication(i.next().activity.self).terminate();
+                            }
+                            instance(ACTIVE.this);
+                        }
+                    });
+                }
+                object(false,mlSet);
+            } else {
+                // No children left, either because they've all been executed or because we
+                // had to make them stop.
+                _self.parent.completed(_fault, _compHandlers);
             }
-          });
         }
-        object(false,mlSet);
-      } else {
-        // No children left, either because they've all been executed or because we
-        // had to make them stop.
-        _self.parent.completed(_fault, _compHandlers);
-      }
-    }
-  }
-
-  private boolean shouldContinue() {
-    boolean stop = false;
-    if (_completionCounter > 0) {
-      stop = (_completedCounter >= _completionCounter) || stop;
-    }
-    stop = (_startCounter + _completedCounter > _finalCounter) || stop;
-    return !stop;
-  }
-
-  private int evaluateCondition(OExpression condition)
-          throws FaultException {
-    try {
-      return getBpelRuntimeContext().getExpLangRuntime().
-              evaluateAsNumber(condition, getEvaluationContext()).intValue();
-    } catch (EvaluationException e) {
-      String msg;
-      if (condition instanceof OXPath10Expression)
-        msg = "ForEach counter value " + ((OXPath10Expression)condition).xpath +
-                " couldn't be evaluated as xs:unsignedInt.";
-      else msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
-      __log.error(msg, e);
-      throw new FaultException(_oforEach.getOwner().constants.qnForEachCounterError,msg);
-    }
-  }
-
-  private void newChild() {
-    ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.innerScope,
-            newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class)));
-    _children.add(child);
-
-    // Creating the current counter value node
-    Document doc = DOMUtils.newDocument();
-    Node counterNode = doc.createTextNode(""+_currentCounter++);
-
-    // Instantiating the scope directly to keep control of its scope frame, allows
-    // the introduction of the counter variable in there (monkey business that is).
-    ScopeFrame newFrame = new ScopeFrame(
-            _oforEach.innerScope, getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
-            _oforEach.innerScope), _scopeFrame, null);
-    getBpelRuntimeContext().initializeVariable(newFrame.resolve(_oforEach.counterVariable), counterNode);
-    instance(new SCOPE(child.activity, newFrame, _linkFrame));
-  }
-
-  public String toString() {
-    return "<T:Act:Flow:" + _oforEach.name + ">";
-  }
-
-  private Iterator<ChildInfo> active() {
-    return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
-      public boolean isMember(ChildInfo childInfo) {
-        return !childInfo.completed;
-      }
-    });
-  }
+    }
+
+    private boolean shouldContinue() {
+        boolean stop = false;
+        if (_completionCounter > 0) {
+            stop = (_completedCounter >= _completionCounter) || stop;
+        }
+        stop = (_startCounter + _completedCounter > _finalCounter) || stop;
+        return !stop;
+    }
+
+    private int evaluateCondition(OExpression condition)
+            throws FaultException {
+        try {
+            return getBpelRuntimeContext().getExpLangRuntime().
+                    evaluateAsNumber(condition, getEvaluationContext()).intValue();
+        } catch (EvaluationException e) {
+            String msg;
+            if (condition instanceof OXPath10Expression)
+                msg = "ForEach counter value " + ((OXPath10Expression)condition).xpath +
+                        " couldn't be evaluated as xs:unsignedInt.";
+            else msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
+            __log.error(msg, e);
+            throw new FaultException(_oforEach.getOwner().constants.qnForEachCounterError,msg);
+        }
+    }
+
+    private void newChild() {
+        ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.innerScope,
+                newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class)));
+        _children.add(child);
+
+        // Creating the current counter value node
+        Document doc = DOMUtils.newDocument();
+        Node counterNode = doc.createTextNode(""+_currentCounter++);
+
+        // Instantiating the scope directly to keep control of its scope frame, allows
+        // the introduction of the counter variable in there (monkey business that is).
+        ScopeFrame newFrame = new ScopeFrame(
+                _oforEach.innerScope, getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
+                _oforEach.innerScope), _scopeFrame, null);
+        getBpelRuntimeContext().initializeVariable(newFrame.resolve(_oforEach.counterVariable), counterNode);
+        instance(new SCOPE(child.activity, newFrame, _linkFrame));
+    }
+
+    public String toString() {
+        return "<T:Act:Flow:" + _oforEach.name + ">";
+    }
+
+    private Iterator<ChildInfo> active() {
+        return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
+            public boolean isMember(ChildInfo childInfo) {
+                return !childInfo.completed;
+            }
+        });
+    }
 
 }

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java Wed Oct  4 15:56:58 2006
@@ -37,6 +37,8 @@
 import org.apache.ode.bpel.runtime.channels.TerminationChannelListener;
 import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
 import org.apache.ode.bpel.runtime.channels.TimerResponseChannelListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
@@ -44,222 +46,227 @@
  * JacobRunnable that performs the work of the <code>invoke</code> activity.
  */
 public class INVOKE extends ACTIVITY {
-  private static final long serialVersionUID = 992248281026821783L;
+    private static final long serialVersionUID = 992248281026821783L;
 
-  private OInvoke _oinvoke;
-  // Records number of invocations on the activity.
-  private int     _invoked;
-  // Date/time of last failure.
-  private Date    _lastFailure;
-  // Reason for last failure.
-  private String  _failureReason;
-  // Data associated with failure.
-  private Element _failureData;
-
-
-  public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
-    super(self, scopeFrame, linkFrame);
-    _oinvoke = (OInvoke) _self.o;
-    _invoked = 0;
-  }
-
-  public final void run() {
-    Element outboundMsg;
-    try {
-      outboundMsg = setupOutbound(_oinvoke, _oinvoke.initCorrelationsInput);
-    } catch (FaultException e) {
-      FaultData fault = createFault(e.getQName(), _oinvoke);
-      _self.parent.completed(fault, CompensationHandler.emptySet());
-      return;
+    private static final Log __log = LogFactory.getLog(INVOKE.class);
+
+    private OInvoke _oinvoke;
+    // Records number of invocations on the activity.
+    private int     _invoked;
+    // Date/time of last failure.
+    private Date    _lastFailure;
+    // Reason for last failure.
+    private String  _failureReason;
+    // Data associated with failure.
+    private Element _failureData;
+
+
+    public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+        super(self, scopeFrame, linkFrame);
+        _oinvoke = (OInvoke) _self.o;
+        _invoked = 0;
     }
-    ++_invoked;
 
-    // if there is no output variable, then this is a one-way invoke
-    boolean isTwoWay = _oinvoke.outputVar != null;
+    public final void run() {
+        Element outboundMsg;
+        try {
+            outboundMsg = setupOutbound(_oinvoke, _oinvoke.initCorrelationsInput);
+        } catch (FaultException e) {
+            __log.error(e);
+            FaultData fault = createFault(e.getQName(), _oinvoke);
+            _self.parent.completed(fault, CompensationHandler.emptySet());
+            return;
+        }
+        ++_invoked;
+
+        // if there is no output variable, then this is a one-way invoke
+        boolean isTwoWay = _oinvoke.outputVar != null;
+
+        try {
+            if (!isTwoWay) {
+                FaultData faultData = null;
+                getBpelRuntimeContext().invoke(
+                        _scopeFrame.resolve(_oinvoke.partnerLink),
+                        _oinvoke.operation,
+                        outboundMsg,
+                        null);
+
+                _self.parent.completed(faultData, CompensationHandler.emptySet());
+
+            } else /* two-way */{
+                final VariableInstance outputVar = _scopeFrame
+                        .resolve(_oinvoke.outputVar);
+                InvokeResponseChannel invokeResponseChannel = newChannel(InvokeResponseChannel.class);
+
+                final String mexId = getBpelRuntimeContext().invoke(
+                        _scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
+                        outboundMsg,
+                        invokeResponseChannel);
+
+                object(new InvokeResponseChannelListener(invokeResponseChannel) {
+                    private static final long serialVersionUID = 4496880438819196765L;
+
+                    public void onResponse() {
+                        // we don't have to write variable data -> this already
+                        // happened in the nativeAPI impl
+                        FaultData fault = null;
+
+                        Element response;
+                        try {
+                            response = getBpelRuntimeContext().getPartnerResponse(mexId);
+                        } catch (Exception ex) {
+                            // TODO: Better error handling
+                            throw new RuntimeException(ex);
+                        }
+
+                        getBpelRuntimeContext().initializeVariable(outputVar, response);
+
+                        try {
+                            for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
+                                initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
+                            }
+                            if (_oinvoke.partnerLink.hasPartnerRole()) {
+                                // Trying to initialize partner epr based on a message-provided epr/session.
+                                if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
+                                        .resolve(_oinvoke.partnerLink))) {
+
+                                    Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
+                                    if (fromEpr != null) {
+                                        getBpelRuntimeContext().writeEndpointReference(
+                                                _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
+                                    }
+                                }
+
+                                String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
+                                if (partnersSessionId != null)
+                                    getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.partnerLink),
+                                            partnersSessionId);
+
+                            }
+                        } catch (FaultException e) {
+                            __log.error(e);
+                            fault = createFault(e.getQName(), _oinvoke);
+                        }
+
+                        // TODO update output variable with data from non-initiate
+                        // correlation sets
+                        _self.parent.completed(fault, CompensationHandler.emptySet());
+                    }
+
+                    public void onFault() {
+                        QName faultName = getBpelRuntimeContext().getPartnerFault(mexId);
+                        Element msg = getBpelRuntimeContext().getPartnerResponse(mexId);
+                        QName msgType = getBpelRuntimeContext().getPartnerResponseType(
+                                mexId);
+                        FaultData fault = createFault(faultName, msg,
+                                _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
+                        _self.parent.completed(fault, CompensationHandler.emptySet());
+                    }
 
-    try {
-      if (!isTwoWay) {
-        FaultData faultData = null;
-        getBpelRuntimeContext().invoke(
-            _scopeFrame.resolve(_oinvoke.partnerLink),
-            _oinvoke.operation,
-            outboundMsg,
-            null);
-
-        _self.parent.completed(faultData, CompensationHandler.emptySet());
-
-      } else /* two-way */{
-        final VariableInstance outputVar = _scopeFrame
-            .resolve(_oinvoke.outputVar);
-        InvokeResponseChannel invokeResponseChannel = newChannel(InvokeResponseChannel.class);
-
-        final String mexId = getBpelRuntimeContext().invoke(
-            _scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
-            outboundMsg,
-            invokeResponseChannel);
-
-        object(new InvokeResponseChannelListener(invokeResponseChannel) {
-          private static final long serialVersionUID = 4496880438819196765L;
-
-          public void onResponse() {
-            // we don't have to write variable data -> this already
-            // happened in the nativeAPI impl
-            FaultData fault = null;
-
-            Element response;
-            try {
-              response = getBpelRuntimeContext().getPartnerResponse(mexId);
-            } catch (Exception ex) {
-              // TODO: Better error handling
-              throw new RuntimeException(ex);
-            }
-           
-            getBpelRuntimeContext().initializeVariable(outputVar, response);
-
-            try {
-              for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
-                initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
-              }
-              if (_oinvoke.partnerLink.hasPartnerRole()) {
-                // Trying to initialize partner epr based on a message-provided epr/session.
-                if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
-                        .resolve(_oinvoke.partnerLink))) {
-    
-                    Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
-                    if (fromEpr != null) {
-                      getBpelRuntimeContext().writeEndpointReference(
-                              _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
+                    public void onFailure() {
+                        // This indicates a communication failure. We don't throw a fault,
+                        // because there is no fault, instead we'll re-incarnate the invoke
+                        // and either retry or indicate failure condition.
+                        // admin to resume the process.
+                        INVOKE.this.retryOrFailure(getBpelRuntimeContext().getPartnerFaultExplanation(mexId), null);
                     }
-                }                    
-                
-                String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
-                if (partnersSessionId != null)
-                    getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.partnerLink),
-                            partnersSessionId);
-                
-              }
-            } catch (FaultException e) {
-              fault = createFault(e.getQName(), _oinvoke);
+                });
             }
+        } catch (FaultException fault) {
+            __log.error(fault);
+            FaultData faultData = createFault(fault.getQName(), _oinvoke, fault
+                    .getMessage());
+            _self.parent.completed(faultData, CompensationHandler.emptySet());
+        }
+    }
 
-            // TODO update output variable with data from non-initiate
-            // correlation sets
-            _self.parent.completed(fault, CompensationHandler.emptySet());
-          }
+    private Element setupOutbound(OInvoke oinvoke,
+                                  Collection<OScope.CorrelationSet> outboundInitiations)
+            throws FaultException {
+        if (outboundInitiations.size() > 0) {
+            for (OScope.CorrelationSet c : outboundInitiations) {
+                initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.inputVar));
+            }
+        }
 
-          public void onFault() {
-            QName faultName = getBpelRuntimeContext().getPartnerFault(mexId);
-            Element msg = getBpelRuntimeContext().getPartnerResponse(mexId);
-            QName msgType = getBpelRuntimeContext().getPartnerResponseType(
-                mexId);
-            FaultData fault = createFault(faultName, msg,
-                _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
-            _self.parent.completed(fault, CompensationHandler.emptySet());
-          }
+        Node outboundMsg = getBpelRuntimeContext().fetchVariableData(
+                _scopeFrame.resolve(oinvoke.inputVar), false);
 
-          public void onFailure() {
-            // This indicates a communication failure. We don't throw a fault,
-            // because there is no fault, instead we'll re-incarnate the invoke
-            // and either retry or indicate failure condition.
-            // admin to resume the process.
-            INVOKE.this.retryOrFailure(getBpelRuntimeContext().getPartnerFaultExplanation(mexId), null);
-          }
-        });
-      }
-    } catch (FaultException fault) {
-      FaultData faultData = createFault(fault.getQName(), _oinvoke, fault
-          .getMessage());
-      _self.parent.completed(faultData, CompensationHandler.emptySet());
-    }
-  }
+        // TODO outbound message should be updated with non-initiate correlation
+        // sets
+        assert outboundMsg instanceof Element;
 
-  private Element setupOutbound(OInvoke oinvoke,
-                                Collection<OScope.CorrelationSet> outboundInitiations)
-      throws FaultException {
-    if (outboundInitiations.size() > 0) {
-      for (OScope.CorrelationSet c : outboundInitiations) {
-        initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.inputVar));
-      }
+        return (Element) outboundMsg;
     }
 
-    Node outboundMsg = getBpelRuntimeContext().fetchVariableData(
-        _scopeFrame.resolve(oinvoke.inputVar), false);
-
-    // TODO outbound message should be updated with non-initiate correlation
-    // sets
-    assert outboundMsg instanceof Element;
-
-    return (Element) outboundMsg;
-  }
-
-  private void retryOrFailure(String reason, Element data) {
-    _lastFailure = new Date();
-    _failureReason = reason;
-    _failureData = data;
-
-    if (_self.getFailureHandling().faultOnFailure) {
-      // No attempt to retry or enter activity recovery state, simply fault.
-      FaultData faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _oinvoke, reason);
-      _self.parent.completed(faultData, CompensationHandler.emptySet());
-      return;
+    private void retryOrFailure(String reason, Element data) {
+        _lastFailure = new Date();
+        _failureReason = reason;
+        _failureData = data;
+
+        if (_self.getFailureHandling().faultOnFailure) {
+            // No attempt to retry or enter activity recovery state, simply fault.
+            FaultData faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _oinvoke, reason);
+            _self.parent.completed(faultData, CompensationHandler.emptySet());
+            return;
+        }
+        // If maximum number of retries, enter activity recovery state.
+        if (_invoked > _self.getFailureHandling().retryFor) {
+            requireRecovery();
+            return;
+        }
+
+        Date future = new Date(new Date().getTime() + (_self.getFailureHandling().retryDelay * 1000));
+        final TimerResponseChannel timerChannel = newChannel(TimerResponseChannel.class);
+        getBpelRuntimeContext().registerTimer(timerChannel, future);
+        object(false, new TimerResponseChannelListener(timerChannel) {
+            public void onTimeout() {
+                instance(INVOKE.this);
+            }
+            public void onCancel() {
+                INVOKE.this.requireRecovery();
+            }
+        }.or(new TerminationChannelListener(_self.self) {
+            public void terminate() {
+                _self.parent.completed(null, CompensationHandler.emptySet());
+                object(new TimerResponseChannelListener(timerChannel) {
+                    public void onTimeout() { }
+                    public void onCancel() { }
+                });
+            }
+        }));
     }
-    // If maximum number of retries, enter activity recovery state.  
-    if (_invoked > _self.getFailureHandling().retryFor) {
-      requireRecovery();
-      return;
+
+    private void requireRecovery() {
+        sendEvent(new ActivityFailureEvent(_failureReason));
+        final ActivityRecoveryChannel recoveryChannel = newChannel(ActivityRecoveryChannel.class);
+        getBpelRuntimeContext().registerActivityForRecovery(recoveryChannel, _self.aId, _failureReason, _lastFailure, _failureData,
+                new String[] { "retry", "cancel", "fault" }, _invoked - 1);
+        object(false, new ActivityRecoveryChannelListener(recoveryChannel) {
+            public void retry() {
+                sendEvent(new ActivityRecoveryEvent("retry"));
+                getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+                instance(INVOKE.this);
+            }
+            public void cancel() {
+                sendEvent(new ActivityRecoveryEvent("cancel"));
+                getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+                _self.parent.completed(null, CompensationHandler.emptySet());
+            }
+            public void fault(FaultData faultData) {
+                sendEvent(new ActivityRecoveryEvent("fault"));
+                getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+                // TODO: real fault name.
+                if (faultData == null)
+                    faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _self.o, _failureReason);
+                _self.parent.completed(faultData, CompensationHandler.emptySet());
+            }
+        }.or(new TerminationChannelListener(_self.self) {
+            public void terminate() {
+                getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+                _self.parent.completed(null, CompensationHandler.emptySet());
+            }
+        }));
     }
-    
-    Date future = new Date(new Date().getTime() + (_self.getFailureHandling().retryDelay * 1000));
-    final TimerResponseChannel timerChannel = newChannel(TimerResponseChannel.class);
-    getBpelRuntimeContext().registerTimer(timerChannel, future);
-    object(false, new TimerResponseChannelListener(timerChannel) {
-      public void onTimeout() {
-        instance(INVOKE.this);
-      }
-      public void onCancel() {
-        INVOKE.this.requireRecovery();
-      }
-    }.or(new TerminationChannelListener(_self.self) {
-      public void terminate() {
-        _self.parent.completed(null, CompensationHandler.emptySet());
-        object(new TimerResponseChannelListener(timerChannel) {
-          public void onTimeout() { }
-          public void onCancel() { }
-        });
-      }
-    }));
-  }
-
-  private void requireRecovery() {
-    sendEvent(new ActivityFailureEvent(_failureReason));
-    final ActivityRecoveryChannel recoveryChannel = newChannel(ActivityRecoveryChannel.class);
-    getBpelRuntimeContext().registerActivityForRecovery(recoveryChannel, _self.aId, _failureReason, _lastFailure, _failureData,
-      new String[] { "retry", "cancel", "fault" }, _invoked - 1);
-    object(false, new ActivityRecoveryChannelListener(recoveryChannel) {
-      public void retry() {
-        sendEvent(new ActivityRecoveryEvent("retry"));
-        getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
-        instance(INVOKE.this);
-      }
-      public void cancel() {
-        sendEvent(new ActivityRecoveryEvent("cancel"));
-        getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
-        _self.parent.completed(null, CompensationHandler.emptySet());
-      }
-      public void fault(FaultData faultData) {
-        sendEvent(new ActivityRecoveryEvent("fault"));
-        getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
-        // TODO: real fault name.
-        if (faultData == null)
-          faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _self.o, _failureReason);
-        _self.parent.completed(faultData, CompensationHandler.emptySet());
-      }
-    }.or(new TerminationChannelListener(_self.self) {
-      public void terminate() {
-        getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
-        _self.parent.completed(null, CompensationHandler.emptySet());
-      }
-    }));
-  }
 
 }