You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2006/10/05 00:56:59 UTC
svn commit: r453057 [1/2] - in
/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel:
engine/ runtime/
Author: mriou
Date: Wed Oct 4 15:56:58 2006
New Revision: 453057
URL: http://svn.apache.org/viewvc?view=rev&rev=453057
Log:
Logging faults as errors.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/THROW.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Oct 4 15:56:58 2006
@@ -950,9 +950,7 @@
dao.setProperty(property.name.toString(), val);
}
} catch (FaultException e) {
- // ignore error for now
- // __log.warn("Error attempting to extract property '" +
- // property.toString() + "'", e);
+ __log.error("Error attempting to extract property '" + property.toString() + "'", e);
}
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java Wed Oct 4 15:56:58 2006
@@ -44,171 +44,172 @@
import java.util.Set;
class ACTIVITYGUARD extends ACTIVITY {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private static final Log __log = LogFactory.getLog(ACTIVITYGUARD.class);
+ private static final Log __log = LogFactory.getLog(ACTIVITYGUARD.class);
- private static final ActivityTemplateFactory __activityTemplateFactory = new ActivityTemplateFactory();
- private OActivity _oactivity;
+ private static final ActivityTemplateFactory __activityTemplateFactory = new ActivityTemplateFactory();
+ private OActivity _oactivity;
- /** Link values. */
- private Map<OLink, Boolean> _linkVals = new HashMap<OLink, Boolean>();
-
- /** Flag to prevent duplicate ActivityEnabledEvents */
- private boolean _firstTime = true;
-
- public ACTIVITYGUARD(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
- super(self, scopeFrame, linkFrame);
- _oactivity = self.o;
- }
-
- public void run() {
- // Send a notification of the activity being enabled,
- if (_firstTime) {
- sendEvent(new ActivityEnabledEvent());
- _firstTime = false;
- }
-
- if (_linkVals.keySet().containsAll(_oactivity.targetLinks)) {
- if (evaluateJoinCondition()) {
-
- ActivityExecStartEvent aese = new ActivityExecStartEvent();
- sendEvent(aese);
- // intercept completion channel in order to execute transition conditions.
- ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScopeChannel.class));
- instance(createActivity(activity));
- instance(new TCONDINTERCEPT(activity.parent));
- } else {
- // Join Failure.
- _self.parent.completed(createFault(_oactivity.getOwner().constants.qnJoinFailure,_oactivity),
- CompensationHandler.emptySet());
-
- // Dead path activity.
- dpe(_oactivity);
- }
- } else /* don't know all our links statuses */ {
- Set<ChannelListener> mlset = new HashSet<ChannelListener>();
- mlset.add(new TerminationChannelListener(_self.self) {
- private static final long serialVersionUID = 5094153128476008961L;
-
- public void terminate() {
- // Complete immediately, without faulting or registering any comps.
- _self.parent.completed(null, CompensationHandler.emptySet());
-
- // Dead-path activity
- dpe(_oactivity);
- }
- });
- for (Iterator<OLink> i = _oactivity.targetLinks.iterator();i.hasNext();) {
- final OLink link = i.next();
- mlset.add(new LinkStatusChannelListener(_linkFrame.resolve(link).sub) {
- private static final long serialVersionUID = 1024137371118887935L;
-
- public void linkStatus(boolean value) {
- _linkVals.put(link, Boolean.valueOf(value));
- instance(ACTIVITYGUARD.this);
- }
- });
- }
-
- object(false, mlset);
- }
- }
-
-
- private boolean evaluateTransitionCondition(OExpression transitionCondition)
- throws FaultException {
- if (transitionCondition == null)
- return true;
-
- try {
- return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(transitionCondition,
- new ExprEvaluationContextImpl(_scopeFrame, getBpelRuntimeContext()));
- } catch (EvaluationException e) {
- String msg = "Error in transition condition detected at runtime; condition=" + transitionCondition;
- __log.error(msg,e);
- throw new InvalidProcessException(msg, e);
- }
- }
-
- /**
- * Evaluate an activity's join condition.
- * @return <code>true</code> if join condition evaluates to true.
- */
- private boolean evaluateJoinCondition() {
- // For activities with no link targets, the join condition is always satisfied.
- if (_oactivity.targetLinks.size() == 0)
- return true;
-
- // For activities with no join condition, an OR condition is assumed.
- if (_oactivity.joinCondition == null)
- return _linkVals.values().contains(Boolean.TRUE);
-
- try {
- return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(_oactivity.joinCondition,
- new ExprEvaluationContextImpl(null,null,_linkVals));
- } catch (Exception e) {
- String msg = "Unexpected error evaluating a join condition: " + _oactivity.joinCondition;
- __log.error(msg,e);
- throw new InvalidProcessException(msg,e);
- }
- }
-
- private static ACTIVITY createActivity(ActivityInfo activity, ScopeFrame scopeFrame, LinkFrame linkFrame) {
- return __activityTemplateFactory.createInstance(activity.o,activity, scopeFrame, linkFrame);
- }
-
- private ACTIVITY createActivity(ActivityInfo activity) {
- return createActivity(activity,_scopeFrame, _linkFrame);
- }
-
-
- /**
- * Intercepts the
- * {@link ParentScopeChannel#completed(org.apache.ode.bpel.runtime.channels.FaultData, java.util.Set<org.apache.ode.bpel.runtime.CompensationHandler>)}
- * call, to evaluate transition conditions before returning to the parent.
- */
- private class TCONDINTERCEPT extends BpelJacobRunnable {
- private static final long serialVersionUID = 4014873396828400441L;
- ParentScopeChannel _in;
+ /** Link values. */
+ private Map<OLink, Boolean> _linkVals = new HashMap<OLink, Boolean>();
- public TCONDINTERCEPT(ParentScopeChannel in) {
- _in = in;
+ /** Flag to prevent duplicate ActivityEnabledEvents */
+ private boolean _firstTime = true;
+
+ public ACTIVITYGUARD(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ _oactivity = self.o;
}
public void run() {
- object(new ParentScopeChannelListener(_in) {
- private static final long serialVersionUID = 2667359535900385952L;
+ // Send a notification of the activity being enabled,
+ if (_firstTime) {
+ sendEvent(new ActivityEnabledEvent());
+ _firstTime = false;
+ }
+
+ if (_linkVals.keySet().containsAll(_oactivity.targetLinks)) {
+ if (evaluateJoinCondition()) {
- public void compensate(OScope scope, SynchChannel ret) {
- _self.parent.compensate(scope,ret);
- instance(TCONDINTERCEPT.this);
- }
-
- public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
- sendEvent(new ActivityExecEndEvent());
- if (faultData != null) {
- dpe(_oactivity.sourceLinks);
- _self.parent.completed(faultData, compensations);
- } else {
- FaultData fault = null;
- for (Iterator<OLink> i = _oactivity.sourceLinks.iterator();i.hasNext();) {
- OLink olink = i.next();
- LinkInfo linfo = _linkFrame.resolve(olink);
- try {
- boolean val = evaluateTransitionCondition(olink.transitionCondition);
- linfo.pub.linkStatus(val);
- } catch (FaultException e) {
- linfo.pub.linkStatus(false);
- if (fault == null)
- fault = createFault(e.getQName(),olink.transitionCondition);
- }
+ ActivityExecStartEvent aese = new ActivityExecStartEvent();
+ sendEvent(aese);
+ // intercept completion channel in order to execute transition conditions.
+ ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScopeChannel.class));
+ instance(createActivity(activity));
+ instance(new TCONDINTERCEPT(activity.parent));
+ } else {
+ // Join Failure.
+ _self.parent.completed(createFault(_oactivity.getOwner().constants.qnJoinFailure,_oactivity),
+ CompensationHandler.emptySet());
+
+ // Dead path activity.
+ dpe(_oactivity);
+ }
+ } else /* don't know all our links statuses */ {
+ Set<ChannelListener> mlset = new HashSet<ChannelListener>();
+ mlset.add(new TerminationChannelListener(_self.self) {
+ private static final long serialVersionUID = 5094153128476008961L;
+
+ public void terminate() {
+ // Complete immediately, without faulting or registering any comps.
+ _self.parent.completed(null, CompensationHandler.emptySet());
+
+ // Dead-path activity
+ dpe(_oactivity);
+ }
+ });
+ for (Iterator<OLink> i = _oactivity.targetLinks.iterator();i.hasNext();) {
+ final OLink link = i.next();
+ mlset.add(new LinkStatusChannelListener(_linkFrame.resolve(link).sub) {
+ private static final long serialVersionUID = 1024137371118887935L;
+
+ public void linkStatus(boolean value) {
+ _linkVals.put(link, Boolean.valueOf(value));
+ instance(ACTIVITYGUARD.this);
+ }
+ });
}
- _self.parent.completed(fault, compensations);
- }
+
+ object(false, mlset);
+ }
+ }
+
+
+ private boolean evaluateTransitionCondition(OExpression transitionCondition)
+ throws FaultException {
+ if (transitionCondition == null)
+ return true;
+
+ try {
+ return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(transitionCondition,
+ new ExprEvaluationContextImpl(_scopeFrame, getBpelRuntimeContext()));
+ } catch (EvaluationException e) {
+ String msg = "Error in transition condition detected at runtime; condition=" + transitionCondition;
+ __log.error(msg,e);
+ throw new InvalidProcessException(msg, e);
+ }
+ }
+
+ /**
+ * Evaluate an activity's join condition.
+ * @return <code>true</code> if join condition evaluates to true.
+ */
+ private boolean evaluateJoinCondition() {
+ // For activities with no link targets, the join condition is always satisfied.
+ if (_oactivity.targetLinks.size() == 0)
+ return true;
+
+ // For activities with no join condition, an OR condition is assumed.
+ if (_oactivity.joinCondition == null)
+ return _linkVals.values().contains(Boolean.TRUE);
+
+ try {
+ return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(_oactivity.joinCondition,
+ new ExprEvaluationContextImpl(null,null,_linkVals));
+ } catch (Exception e) {
+ String msg = "Unexpected error evaluating a join condition: " + _oactivity.joinCondition;
+ __log.error(msg,e);
+ throw new InvalidProcessException(msg,e);
+ }
+ }
+
+ private static ACTIVITY createActivity(ActivityInfo activity, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ return __activityTemplateFactory.createInstance(activity.o,activity, scopeFrame, linkFrame);
+ }
+
+ private ACTIVITY createActivity(ActivityInfo activity) {
+ return createActivity(activity,_scopeFrame, _linkFrame);
+ }
+
+
+ /**
+ * Intercepts the
+ * {@link ParentScopeChannel#completed(org.apache.ode.bpel.runtime.channels.FaultData, java.util.Set<org.apache.ode.bpel.runtime.CompensationHandler>)}
+ * call, to evaluate transition conditions before returning to the parent.
+ */
+ private class TCONDINTERCEPT extends BpelJacobRunnable {
+ private static final long serialVersionUID = 4014873396828400441L;
+ ParentScopeChannel _in;
+
+ public TCONDINTERCEPT(ParentScopeChannel in) {
+ _in = in;
}
- });
+ public void run() {
+ object(new ParentScopeChannelListener(_in) {
+ private static final long serialVersionUID = 2667359535900385952L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ _self.parent.compensate(scope,ret);
+ instance(TCONDINTERCEPT.this);
+ }
+
+ public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+ sendEvent(new ActivityExecEndEvent());
+ if (faultData != null) {
+ dpe(_oactivity.sourceLinks);
+ _self.parent.completed(faultData, compensations);
+ } else {
+ FaultData fault = null;
+ for (Iterator<OLink> i = _oactivity.sourceLinks.iterator();i.hasNext();) {
+ OLink olink = i.next();
+ LinkInfo linfo = _linkFrame.resolve(olink);
+ try {
+ boolean val = evaluateTransitionCondition(olink.transitionCondition);
+ linfo.pub.linkStatus(val);
+ } catch (FaultException e) {
+ linfo.pub.linkStatus(false);
+ __log.error(e);
+ if (fault == null)
+ fault = createFault(e.getQName(),olink.transitionCondition);
+ }
+ }
+ _self.parent.completed(fault, compensations);
+ }
+ }
+ });
+
+ }
}
- }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ASSIGN.java Wed Oct 4 15:56:58 2006
@@ -89,9 +89,8 @@
}
if (faultData != null) {
- if (__log.isDebugEnabled())
- __log.debug("Assignment Fault: " + faultData.getFaultName()
- + ",lineNo=" + faultData.getFaultLineNo());
+ __log.error("Assignment Fault: " + faultData.getFaultName()
+ + ",lineNo=" + faultData.getFaultLineNo());
_self.parent.completed(faultData, CompensationHandler.emptySet());
} else {
_self.parent.completed(null, CompensationHandler.emptySet());
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java Wed Oct 4 15:56:58 2006
@@ -33,6 +33,8 @@
import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
import org.apache.ode.bpel.runtime.channels.TimerResponseChannelListener;
import org.apache.ode.jacob.SynchChannel;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.Calendar;
import java.util.HashSet;
@@ -45,194 +47,200 @@
* the event handler completes naturally.
*/
class EH_ALARM extends BpelJacobRunnable {
- private static final long serialVersionUID = 1L;
- private ParentScopeChannel _psc;
- private TerminationChannel _tc;
- private OEventHandler.OAlarm _oalarm;
- private ScopeFrame _scopeFrame;
- private EventHandlerControlChannel _cc;
- private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
-
- /**
- * Concretion constructor.
- * @param psc a link to our parent.
- * @param tc channel we listen on for termination requests
- * @param cc channel we listen on for "stop" requests
- * @param o our prototype / compiled representation
- * @param scopeFrame the {@link ScopeFrame} in which we are executing
- */
- EH_ALARM(ParentScopeChannel psc, TerminationChannel tc, EventHandlerControlChannel cc, OEventHandler.OAlarm o, ScopeFrame scopeFrame) {
- _psc = psc;
- _tc = tc;
- _cc = cc;
- _scopeFrame = scopeFrame;
- _oalarm = o;
- }
-
- public void run() {
-
- Calendar alarm = Calendar.getInstance();
-
- if (_oalarm.forExpr != null)
- try {
- getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(alarm);
- } catch (EvaluationException e) {
- throw new InvalidProcessException(e);
- } catch (FaultException e) {
- _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
- return;
- }
- else if (_oalarm.untilExpr != null)
- try {
- alarm.setTime(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDate(_oalarm.untilExpr, getEvaluationContext()).getTime());
- } catch (EvaluationException e) {
- throw new InvalidProcessException(e);
- } catch (FaultException e) {
- _psc.completed(createFault(e.getQName(),_oalarm.untilExpr), _comps);
- return;
- }
-
- // We reduce to waiting for the alarm to be triggered.
- instance(new WAIT(alarm));
- }
-
- protected EvaluationContext getEvaluationContext() {
- return new ExprEvaluationContextImpl(_scopeFrame,getBpelRuntimeContext());
- }
-
- /**
- * Template used to wait until a given time, reduing to a {@link FIRE} after the
- * elapsed time. This template also monitors the termination and event-control channels
- * for requests from parent.
- */
- private class WAIT extends BpelJacobRunnable {
- private static final long serialVersionUID = -1426724996925898213L;
- Calendar _alarm;
+ private static final Log __log = LogFactory.getLog(EH_ALARM.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private ParentScopeChannel _psc;
+ private TerminationChannel _tc;
+ private OEventHandler.OAlarm _oalarm;
+ private ScopeFrame _scopeFrame;
+ private EventHandlerControlChannel _cc;
+ private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
/**
* Concretion constructor.
- * @param alarm date at which time to fire
+ * @param psc a link to our parent.
+ * @param tc channel we listen on for termination requests
+ * @param cc channel we listen on for "stop" requests
+ * @param o our prototype / compiled representation
+ * @param scopeFrame the {@link ScopeFrame} in which we are executing
*/
- WAIT(Calendar alarm) {
- _alarm = alarm;
+ EH_ALARM(ParentScopeChannel psc, TerminationChannel tc, EventHandlerControlChannel cc, OEventHandler.OAlarm o, ScopeFrame scopeFrame) {
+ _psc = psc;
+ _tc = tc;
+ _cc = cc;
+ _scopeFrame = scopeFrame;
+ _oalarm = o;
}
public void run() {
- Calendar now = Calendar.getInstance();
-
- if (now.before(_alarm)) {
- TimerResponseChannel trc = newChannel(TimerResponseChannel.class);
- getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
- object(false,new TimerResponseChannelListener(trc){
- private static final long serialVersionUID = 1110683632756756017L;
-
- public void onTimeout() {
- // This is what we are waiting for, fire the activity
- instance(new FIRE());
- }
-
- public void onCancel() {
- _psc.completed(null, _comps);
- }
- }.or(new EventHandlerControlChannelListener(_cc) {
- private static final long serialVersionUID = -7750428941445331236L;
-
- public void stop() {
- _psc.completed(null, _comps);
- }
-
- }.or(new TerminationChannelListener(_tc) {
- private static final long serialVersionUID = 6100105997983514609L;
-
- public void terminate() {
- _psc.completed(null, _comps);
- }
- })));
- } else /* now is later then alarm time */ {
- // If the alarm has passed we fire the nested activity
- instance(new FIRE());
- }
-
- }
- }
-
- /**
- * Snipped that fires the alarm activity.
- */
- private class FIRE extends BpelJacobRunnable {
- private static final long serialVersionUID = -7261315204412433250L;
-
- public void run() {
- // Start the child activity.
- ActivityInfo child = new ActivityInfo(genMonotonic(),
- _oalarm.activity,
- newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
- instance(createChild(child, _scopeFrame, new LinkFrame(null) ));
- instance(new ACTIVE(child));
- }
- }
-
- /**
- * Snippet that is used to monitor a running activity.
- */
- private class ACTIVE extends BpelJacobRunnable {
- private static final long serialVersionUID = -2166253425722769701L;
- private ActivityInfo _activity;
+ Calendar alarm = Calendar.getInstance();
- /** Indicates whether our parent has requested a stop. */
- private boolean _stopped = false;
+ if (_oalarm.forExpr != null)
+ try {
+ getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(alarm);
+ } catch (EvaluationException e) {
+ throw new InvalidProcessException(e);
+ } catch (FaultException e) {
+ __log.error(e);
+ _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
+ return;
+ }
+ else if (_oalarm.untilExpr != null)
+ try {
+ alarm.setTime(getBpelRuntimeContext().getExpLangRuntime().evaluateAsDate(_oalarm.untilExpr, getEvaluationContext()).getTime());
+ } catch (EvaluationException e) {
+ throw new InvalidProcessException(e);
+ } catch (FaultException e) {
+ __log.error(e);
+ _psc.completed(createFault(e.getQName(),_oalarm.untilExpr), _comps);
+ return;
+ }
- ACTIVE(ActivityInfo activity) {
- _activity = activity;
+ // We reduce to waiting for the alarm to be triggered.
+ instance(new WAIT(alarm));
}
- public void run() {
- object(false,new ParentScopeChannelListener(_activity.parent){
- private static final long serialVersionUID = -3357030137175178040L;
+ protected EvaluationContext getEvaluationContext() {
+ return new ExprEvaluationContextImpl(_scopeFrame,getBpelRuntimeContext());
+ }
- public void compensate(OScope scope, SynchChannel ret) {
- _psc.compensate(scope,ret);
- instance(ACTIVE.this);
+ /**
+ * Template used to wait until a given time, reduing to a {@link FIRE} after the
+ * elapsed time. This template also monitors the termination and event-control channels
+ * for requests from parent.
+ */
+ private class WAIT extends BpelJacobRunnable {
+ private static final long serialVersionUID = -1426724996925898213L;
+ Calendar _alarm;
+
+ /**
+ * Concretion constructor.
+ * @param alarm date at which time to fire
+ */
+ WAIT(Calendar alarm) {
+ _alarm = alarm;
}
- public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
- _comps.addAll(compensations);
- if (!_stopped && _oalarm.repeatExpr != null) {
- Calendar next = Calendar.getInstance();
- try {
- getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(next);
- } catch (EvaluationException e) {
- throw new InvalidProcessException(e);
- } catch (FaultException e) {
- _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
- return;
+ public void run() {
+ Calendar now = Calendar.getInstance();
+
+ if (now.before(_alarm)) {
+ TimerResponseChannel trc = newChannel(TimerResponseChannel.class);
+ getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
+ object(false,new TimerResponseChannelListener(trc){
+ private static final long serialVersionUID = 1110683632756756017L;
+
+ public void onTimeout() {
+ // This is what we are waiting for, fire the activity
+ instance(new FIRE());
+ }
+
+ public void onCancel() {
+ _psc.completed(null, _comps);
+ }
+ }.or(new EventHandlerControlChannelListener(_cc) {
+ private static final long serialVersionUID = -7750428941445331236L;
+
+ public void stop() {
+ _psc.completed(null, _comps);
+ }
+
+ }.or(new TerminationChannelListener(_tc) {
+ private static final long serialVersionUID = 6100105997983514609L;
+
+ public void terminate() {
+ _psc.completed(null, _comps);
+ }
+ })));
+ } else /* now is later then alarm time */ {
+ // If the alarm has passed we fire the nested activity
+ instance(new FIRE());
}
- instance(new WAIT(next));
- } else {
- _psc.completed(faultData, _comps);
- }
+
}
+ }
- }.or(new EventHandlerControlChannelListener(_cc) {
- private static final long serialVersionUID = -3873619538789039424L;
+ /**
+ * Snipped that fires the alarm activity.
+ */
+ private class FIRE extends BpelJacobRunnable {
+ private static final long serialVersionUID = -7261315204412433250L;
- public void stop() {
- _stopped = true;
- instance(ACTIVE.this);
+ public void run() {
+ // Start the child activity.
+ ActivityInfo child = new ActivityInfo(genMonotonic(),
+ _oalarm.activity,
+ newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+ instance(createChild(child, _scopeFrame, new LinkFrame(null) ));
+ instance(new ACTIVE(child));
}
+ }
+
+ /**
+ * Snippet that is used to monitor a running activity.
+ */
+ private class ACTIVE extends BpelJacobRunnable {
+ private static final long serialVersionUID = -2166253425722769701L;
+
+ private ActivityInfo _activity;
- }.or(new TerminationChannelListener(_tc) {
- private static final long serialVersionUID = -4566956567870652885L;
+ /** Indicates whether our parent has requested a stop. */
+ private boolean _stopped = false;
- public void terminate() {
- replication(_activity.self).terminate();
- _stopped = true;
- instance(ACTIVE.this);
+ ACTIVE(ActivityInfo activity) {
+ _activity = activity;
}
- })));
+ public void run() {
+ object(false,new ParentScopeChannelListener(_activity.parent){
+ private static final long serialVersionUID = -3357030137175178040L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ _psc.compensate(scope,ret);
+ instance(ACTIVE.this);
+ }
+
+ public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+ _comps.addAll(compensations);
+ if (!_stopped && _oalarm.repeatExpr != null) {
+ Calendar next = Calendar.getInstance();
+ try {
+ getBpelRuntimeContext().getExpLangRuntime().evaluateAsDuration(_oalarm.forExpr, getEvaluationContext()).addTo(next);
+ } catch (EvaluationException e) {
+ throw new InvalidProcessException(e);
+ } catch (FaultException e) {
+ __log.error(e);
+ _psc.completed(createFault(e.getQName(),_oalarm.forExpr), _comps);
+ return;
+ }
+ instance(new WAIT(next));
+ } else {
+ _psc.completed(faultData, _comps);
+ }
+ }
+
+ }.or(new EventHandlerControlChannelListener(_cc) {
+ private static final long serialVersionUID = -3873619538789039424L;
+
+ public void stop() {
+ _stopped = true;
+ instance(ACTIVE.this);
+ }
+
+ }.or(new TerminationChannelListener(_tc) {
+ private static final long serialVersionUID = -4566956567870652885L;
+
+ public void terminate() {
+ replication(_activity.self).terminate();
+ _stopped = true;
+ instance(ACTIVE.this);
+ }
+ })));
+
+ }
}
- }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_EVENT.java Wed Oct 4 15:56:58 2006
@@ -46,247 +46,249 @@
*/
class EH_EVENT extends BpelJacobRunnable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private static final Log __log = LogFactory.getLog(EH_EVENT.class);
+ private static final Log __log = LogFactory.getLog(EH_EVENT.class);
- private EventHandlerControlChannel _ehc;
- private TerminationChannel _tc;
- private ParentScopeChannel _psc;
- private ScopeFrame _scopeFrame;
- private OEventHandler.OEvent _oevent;
+ private EventHandlerControlChannel _ehc;
+ private TerminationChannel _tc;
+ private ParentScopeChannel _psc;
+ private ScopeFrame _scopeFrame;
+ private OEventHandler.OEvent _oevent;
- /** Registered compensation handlers. */
- private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
+ /** Registered compensation handlers. */
+ private Set<CompensationHandler> _comps = new HashSet<CompensationHandler>();
- private FaultData _fault;
+ private FaultData _fault;
- /** Active instances (we can have more than one!) */
- private Set<ActivityInfo> _active = new HashSet<ActivityInfo>();
+ /** Active instances (we can have more than one!) */
+ private Set<ActivityInfo> _active = new HashSet<ActivityInfo>();
- /** Whether a stop has been requested; if so no more new instances. */
- private boolean _stopped;
+ /** Whether a stop has been requested; if so no more new instances. */
+ private boolean _stopped;
- /** Has a termination of this handler been requested. */
- private boolean _terminated;
+ /** Has a termination of this handler been requested. */
+ private boolean _terminated;
- private boolean _childrenTerminated;
+ private boolean _childrenTerminated;
- EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
- _scopeFrame = scopeFrame;
- _oevent = o;
- _tc = tc;
- _psc = psc;
- _ehc = ehc;
- }
-
-
- public void run() {
- instance(new SELECT());
- }
-
- /**
- * Terminate all the active activities.
- */
- private void terminateActive() {
- if (!_childrenTerminated) {
- for (ActivityInfo tact : _active) {
- replication(tact.self).terminate();
- }
- _childrenTerminated = true;
+ EH_EVENT(ParentScopeChannel psc,TerminationChannel tc, EventHandlerControlChannel ehc, OEventHandler.OEvent o, ScopeFrame scopeFrame) {
+ _scopeFrame = scopeFrame;
+ _oevent = o;
+ _tc = tc;
+ _psc = psc;
+ _ehc = ehc;
}
- }
- /**
- * Template that does the actual selection interaction with the runtime system, and
- * then waits on the pick response channel.
- */
- class SELECT extends BpelJacobRunnable {
- private static final long serialVersionUID = 1L;
- /**
- * @see org.apache.ode.jacob.JacobRunnable#run()
- */
public void run() {
- Selector selector;
- try {
- PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
- CorrelationKey key;
- PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
- if (_oevent.matchCorrelation == null) {
- // Adding a route for opaque correlation. In this case correlation is done on "out-of-band" session id.
- String sessionId = getBpelRuntimeContext().fetchMySessionId(pLinkInstance);
- key = new CorrelationKey(-1, new String[] {sessionId});
- } else {
- if (!getBpelRuntimeContext().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation))) {
- throw new FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation not initialized.");
- }
- key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
- assert key != null;
- }
-
- selector = new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
- getBpelRuntimeContext().select(pickResponseChannel, null, false, new Selector[] { selector} );
- instance(new WAITING(pickResponseChannel));
- } catch(FaultException e){
- if (_fault == null) {
- _fault = createFault(e.getQName(), _oevent);
- }
- terminateActive();
- instance(new WAITING(null));
- }
+ instance(new SELECT());
}
- }
-
- /**
- * Template that represents the waiting for a pick response.
- */
- private class WAITING extends BpelJacobRunnable {
- private static final long serialVersionUID = 1L;
- private PickResponseChannel _pickResponseChannel;
- private WAITING(PickResponseChannel pickResponseChannel) {
- _pickResponseChannel = pickResponseChannel;
+ /**
+ * Terminate all the active activities.
+ */
+ private void terminateActive() {
+ if (!_childrenTerminated) {
+ for (ActivityInfo tact : _active) {
+ replication(tact.self).terminate();
+ }
+ _childrenTerminated = true;
+ }
}
+ /**
+ * Template that does the actual selection interaction with the runtime system, and
+ * then waits on the pick response channel.
+ */
+ class SELECT extends BpelJacobRunnable {
- public void run() {
+ private static final long serialVersionUID = 1L;
- if (!_active.isEmpty() || _pickResponseChannel != null) {
- HashSet<ChannelListener> mlset = new HashSet<ChannelListener>();
+ /**
+ * @see org.apache.ode.jacob.JacobRunnable#run()
+ */
+ public void run() {
+ Selector selector;
+ try {
+ PickResponseChannel pickResponseChannel = newChannel(PickResponseChannel.class);
+ CorrelationKey key;
+ PartnerLinkInstance pLinkInstance = _scopeFrame.resolve(_oevent.partnerLink);
+ if (_oevent.matchCorrelation == null) {
+ // Adding a route for opaque correlation. In this case correlation is done on "out-of-band" session id.
+ String sessionId = getBpelRuntimeContext().fetchMySessionId(pLinkInstance);
+ key = new CorrelationKey(-1, new String[] {sessionId});
+ } else {
+ if (!getBpelRuntimeContext().isCorrelationInitialized(_scopeFrame.resolve(_oevent.matchCorrelation))) {
+ throw new FaultException(_oevent.getOwner().constants.qnCorrelationViolation,"Correlation not initialized.");
+ }
+ key = getBpelRuntimeContext().readCorrelation(_scopeFrame.resolve(_oevent.matchCorrelation));
+ assert key != null;
+ }
- if (!_terminated) {
- mlset.add(new TerminationChannelListener(_tc) {
- private static final long serialVersionUID = 7666910462948788042L;
-
- public void terminate() {
- terminateActive();
- _terminated = true;
- if (_pickResponseChannel != null)
- getBpelRuntimeContext().cancel(_pickResponseChannel);
- instance(WAITING.this);
+ selector = new Selector(0,pLinkInstance,_oevent.operation.getName(), _oevent.operation.getOutput() == null, _oevent.messageExchangeId, key);
+ getBpelRuntimeContext().select(pickResponseChannel, null, false, new Selector[] { selector} );
+ instance(new WAITING(pickResponseChannel));
+ } catch(FaultException e){
+ __log.error(e);
+ if (_fault == null) {
+ _fault = createFault(e.getQName(), _oevent);
+ }
+ terminateActive();
+ instance(new WAITING(null));
}
- });
-
}
+ }
- if (!_stopped) {
- mlset.add(new EventHandlerControlChannelListener(_ehc) {
- private static final long serialVersionUID = -1050788954724647970L;
-
- public void stop() {
- _stopped = true;
- if (_pickResponseChannel != null)
- getBpelRuntimeContext().cancel(_pickResponseChannel);
- instance(WAITING.this);
- }
- });
+ /**
+ * Template that represents the waiting for a pick response.
+ */
+ private class WAITING extends BpelJacobRunnable {
+ private static final long serialVersionUID = 1L;
+ private PickResponseChannel _pickResponseChannel;
+ private WAITING(PickResponseChannel pickResponseChannel) {
+ _pickResponseChannel = pickResponseChannel;
}
- for (final ActivityInfo ai : _active) {
- mlset.add(new ParentScopeChannelListener(ai.parent) {
- private static final long serialVersionUID = 5341207762415360982L;
-
- public void compensate(OScope scope, SynchChannel ret) {
- _psc.compensate(scope, ret);
- instance(WAITING.this);
- }
+ public void run() {
- public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
- _active.remove(ai);
- _comps.addAll(compensations);
- if (faultData != null && _fault == null) {
- _fault = faultData;
- terminateActive();
- }
+ if (!_active.isEmpty() || _pickResponseChannel != null) {
+ HashSet<ChannelListener> mlset = new HashSet<ChannelListener>();
- instance(WAITING.this);
- }
- });
- }
+ if (!_terminated) {
+ mlset.add(new TerminationChannelListener(_tc) {
+ private static final long serialVersionUID = 7666910462948788042L;
+
+ public void terminate() {
+ terminateActive();
+ _terminated = true;
+ if (_pickResponseChannel != null)
+ getBpelRuntimeContext().cancel(_pickResponseChannel);
+ instance(WAITING.this);
+ }
+ });
- if (_pickResponseChannel != null)
- mlset.add(new PickResponseChannelListener(_pickResponseChannel) {
- private static final long serialVersionUID = -4929999153478677288L;
-
-
- public void onRequestRcvd(int selectorIdx, String mexId) {
- Element msgEl = getBpelRuntimeContext().getMyRequest(mexId);
- try {
- getBpelRuntimeContext().initializeVariable(_scopeFrame.resolve(_oevent.variable),msgEl);
- } catch (Exception ex) {
- __log.error(ex);
- throw new InvalidProcessException(ex);
- }
-
- try {
- for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
- initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(_oevent.variable));
- }
-
- if (_oevent.partnerLink.hasPartnerRole()) {
- // Trying to initialize partner epr based on a message-provided epr/session.
- if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
- .resolve(_oevent.partnerLink))) {
- Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
- if (fromEpr != null) {
- getBpelRuntimeContext().writeEndpointReference(
- _scopeFrame.resolve(_oevent.partnerLink), (Element) fromEpr);
- }
- }
-
- String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
- if (partnersSessionId != null)
- getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oevent.partnerLink),
- partnersSessionId);
- }
-
-
-
-
- } catch (FaultException e) {
- if (_fault == null) {
- _fault = createFault(e.getQName(), _oevent);
- terminateActive();
}
- instance(new WAITING(null));
- return;
- }
- // activate 'onMessage' activity; we'll do this even if a stop/terminate has been
- // requested becasue we cannot undo the receipt of the message at this point.
- ActivityInfo child = new ActivityInfo(genMonotonic(),
- _oevent.activity,
- newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
- _active.add(child);
- // If we previously terminated the other activiites, then we do the same
- // here; this is easier then undoing the receive.
- if (_childrenTerminated)
- replication(child.self).terminate();
- instance(createChild(child,_scopeFrame, new LinkFrame(null)));
-
- if (_terminated || _stopped || _fault != null)
- instance(new WAITING(null));
- else
- instance(new SELECT());
- }
+ if (!_stopped) {
+ mlset.add(new EventHandlerControlChannelListener(_ehc) {
+ private static final long serialVersionUID = -1050788954724647970L;
+
+ public void stop() {
+ _stopped = true;
+ if (_pickResponseChannel != null)
+ getBpelRuntimeContext().cancel(_pickResponseChannel);
+ instance(WAITING.this);
+ }
+ });
+ }
- public void onTimeout() {
- instance(new WAITING(null));
- }
+ for (final ActivityInfo ai : _active) {
+ mlset.add(new ParentScopeChannelListener(ai.parent) {
+ private static final long serialVersionUID = 5341207762415360982L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ _psc.compensate(scope, ret);
+ instance(WAITING.this);
+ }
+
+ public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+ _active.remove(ai);
+ _comps.addAll(compensations);
+ if (faultData != null && _fault == null) {
+ _fault = faultData;
+ terminateActive();
+ }
+
+ instance(WAITING.this);
+ }
+ });
+ }
- public void onCancel() {
- instance(new WAITING(null));
+ if (_pickResponseChannel != null)
+ mlset.add(new PickResponseChannelListener(_pickResponseChannel) {
+ private static final long serialVersionUID = -4929999153478677288L;
+
+
+ public void onRequestRcvd(int selectorIdx, String mexId) {
+ Element msgEl = getBpelRuntimeContext().getMyRequest(mexId);
+ try {
+ getBpelRuntimeContext().initializeVariable(_scopeFrame.resolve(_oevent.variable),msgEl);
+ } catch (Exception ex) {
+ __log.error(ex);
+ throw new InvalidProcessException(ex);
+ }
+
+ try {
+ for (OScope.CorrelationSet cset : _oevent.initCorrelations) {
+ initializeCorrelation(_scopeFrame.resolve(cset), _scopeFrame.resolve(_oevent.variable));
+ }
+
+ if (_oevent.partnerLink.hasPartnerRole()) {
+ // Trying to initialize partner epr based on a message-provided epr/session.
+ if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
+ .resolve(_oevent.partnerLink))) {
+ Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
+ if (fromEpr != null) {
+ getBpelRuntimeContext().writeEndpointReference(
+ _scopeFrame.resolve(_oevent.partnerLink), (Element) fromEpr);
+ }
+ }
+
+ String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
+ if (partnersSessionId != null)
+ getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oevent.partnerLink),
+ partnersSessionId);
+ }
+
+
+
+
+ } catch (FaultException e) {
+ __log.error(e);
+ if (_fault == null) {
+ _fault = createFault(e.getQName(), _oevent);
+ terminateActive();
+ }
+ instance(new WAITING(null));
+ return;
+ }
+
+ // activate 'onMessage' activity; we'll do this even if a stop/terminate has been
+ // requested becasue we cannot undo the receipt of the message at this point.
+ ActivityInfo child = new ActivityInfo(genMonotonic(),
+ _oevent.activity,
+ newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+ _active.add(child);
+ // If we previously terminated the other activiites, then we do the same
+ // here; this is easier then undoing the receive.
+ if (_childrenTerminated)
+ replication(child.self).terminate();
+ instance(createChild(child,_scopeFrame, new LinkFrame(null)));
+
+ if (_terminated || _stopped || _fault != null)
+ instance(new WAITING(null));
+ else
+ instance(new SELECT());
+ }
+
+
+ public void onTimeout() {
+ instance(new WAITING(null));
+ }
+
+ public void onCancel() {
+ instance(new WAITING(null));
+ }
+ });
+
+ object(false, mlset);
+ } else /* Nothing more to do. */ {
+ _psc.completed(_fault, _comps);
}
- });
+ }
- object(false, mlset);
- } else /* Nothing more to do. */ {
- _psc.completed(_fault, _comps);
- }
}
-
- }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/FOREACH.java Wed Oct 4 15:56:58 2006
@@ -46,180 +46,181 @@
public class FOREACH extends ACTIVITY {
- private static final long serialVersionUID = 1L;
- private static final Log __log = LogFactory.getLog(FOREACH.class);
+ private static final long serialVersionUID = 1L;
+ private static final Log __log = LogFactory.getLog(FOREACH.class);
- private OForEach _oforEach;
- private Set<ChildInfo> _children = new HashSet<ChildInfo>();
- private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
- private int _startCounter = -1;
- private int _finalCounter = -1;
- private int _currentCounter = -1;
- private int _completedCounter = 0;
- private int _completionCounter = -1;
-
- public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
- super(self,frame, linkFrame);
- _oforEach = (OForEach) self.o;
- }
-
- public void run() {
- try {
- _startCounter = evaluateCondition(_oforEach.startCounterValue);
- _finalCounter = evaluateCondition(_oforEach.finalCounterValue);
- if (_oforEach.completionCondition != null) {
- _completionCounter = evaluateCondition(_oforEach.completionCondition.branchCount);
- }
- _currentCounter = _startCounter;
- } catch (FaultException fe) {
- _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
- return;
- }
-
- // Checking for bpws:invalidBranchCondition when the counter limit is superior
- // to the maximum number of children
- if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
- _self.parent.completed(
- createFault(_oforEach.getOwner().constants.qnInvalidBranchCondition, _self.o), _compHandlers);
- return;
- }
-
- // There's really nothing to do
- if (_finalCounter < _startCounter || _completionCounter == 0) {
- _self.parent.completed(null, _compHandlers);
- } else {
- // If we're parrallel, starting all our child copies, otherwise one will suffice.
- if (_oforEach.parallel) {
- for (int m = _startCounter; m <= _finalCounter; m++) {
- newChild();
- }
- } else newChild();
- instance(new ACTIVE());
+ private OForEach _oforEach;
+ private Set<ChildInfo> _children = new HashSet<ChildInfo>();
+ private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
+ private int _startCounter = -1;
+ private int _finalCounter = -1;
+ private int _currentCounter = -1;
+ private int _completedCounter = 0;
+ private int _completionCounter = -1;
+
+ public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
+ super(self,frame, linkFrame);
+ _oforEach = (OForEach) self.o;
}
- }
-
- private class ACTIVE extends BpelJacobRunnable {
- private static final long serialVersionUID = -5642862698981385732L;
-
- private FaultData _fault;
- private boolean _terminateRequested = false;
public void run() {
- Iterator<ChildInfo> active = active();
- // Continuing as long as a child is active
- if (active().hasNext()) {
-
- Set<ChannelListener> mlSet = new HashSet<ChannelListener>();
- mlSet.add(new TerminationChannelListener(_self.self) {
- private static final long serialVersionUID = 2554750257484084466L;
-
- public void terminate() {
- // Terminating all children before sepuku
- for (Iterator<ChildInfo> i = active(); i.hasNext(); )
- replication(i.next().activity.self).terminate();
- _terminateRequested = true;
- instance(ACTIVE.this);
- }
- });
- for (;active.hasNext();) {
- // Checking out our children
- final ChildInfo child = active.next();
- mlSet.add(new ParentScopeChannelListener(child.activity.parent) {
- private static final long serialVersionUID = -8027205709961438172L;
-
- public void compensate(OScope scope, SynchChannel ret) {
- // Forward compensation to parent
- _self.parent.compensate(scope, ret);
- instance(ACTIVE.this);
+ try {
+ _startCounter = evaluateCondition(_oforEach.startCounterValue);
+ _finalCounter = evaluateCondition(_oforEach.finalCounterValue);
+ if (_oforEach.completionCondition != null) {
+ _completionCounter = evaluateCondition(_oforEach.completionCondition.branchCount);
}
+ _currentCounter = _startCounter;
+ } catch (FaultException fe) {
+ __log.error(fe);
+ _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
+ return;
+ }
+
+ // Checking for bpws:invalidBranchCondition when the counter limit is superior
+ // to the maximum number of children
+ if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
+ _self.parent.completed(
+ createFault(_oforEach.getOwner().constants.qnInvalidBranchCondition, _self.o), _compHandlers);
+ return;
+ }
+
+ // There's really nothing to do
+ if (_finalCounter < _startCounter || _completionCounter == 0) {
+ _self.parent.completed(null, _compHandlers);
+ } else {
+ // If we're parrallel, starting all our child copies, otherwise one will suffice.
+ if (_oforEach.parallel) {
+ for (int m = _startCounter; m <= _finalCounter; m++) {
+ newChild();
+ }
+ } else newChild();
+ instance(new ACTIVE());
+ }
+ }
+
+ private class ACTIVE extends BpelJacobRunnable {
+ private static final long serialVersionUID = -5642862698981385732L;
+
+ private FaultData _fault;
+ private boolean _terminateRequested = false;
- public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
- child.completed = true;
- //
- if (_completionCounter > 0 && _oforEach.completionCondition.successfulBranchesOnly) {
- if (faultData != null) _completedCounter++;
- } else _completedCounter++;
-
- _compHandlers.addAll(compensations);
-
- // Keeping the fault to let everybody know
- if (faultData != null && _fault == null) {
- _fault = faultData;
- }
- if (shouldContinue() && _fault == null && !_terminateRequested) {
- // Everything fine. If parrallel, just let our children be, otherwise making a new child
- if (!_oforEach.parallel) newChild();
- } else {
- // Work is done or something wrong happened, children shouldn't continue
- for (Iterator<ChildInfo> i = active(); i.hasNext(); )
- replication(i.next().activity.self).terminate();
- }
- instance(ACTIVE.this);
+ public void run() {
+ Iterator<ChildInfo> active = active();
+ // Continuing as long as a child is active
+ if (active().hasNext()) {
+
+ Set<ChannelListener> mlSet = new HashSet<ChannelListener>();
+ mlSet.add(new TerminationChannelListener(_self.self) {
+ private static final long serialVersionUID = 2554750257484084466L;
+
+ public void terminate() {
+ // Terminating all children before sepuku
+ for (Iterator<ChildInfo> i = active(); i.hasNext(); )
+ replication(i.next().activity.self).terminate();
+ _terminateRequested = true;
+ instance(ACTIVE.this);
+ }
+ });
+ for (;active.hasNext();) {
+ // Checking out our children
+ final ChildInfo child = active.next();
+ mlSet.add(new ParentScopeChannelListener(child.activity.parent) {
+ private static final long serialVersionUID = -8027205709961438172L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ // Forward compensation to parent
+ _self.parent.compensate(scope, ret);
+ instance(ACTIVE.this);
+ }
+
+ public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+ child.completed = true;
+ //
+ if (_completionCounter > 0 && _oforEach.completionCondition.successfulBranchesOnly) {
+ if (faultData != null) _completedCounter++;
+ } else _completedCounter++;
+
+ _compHandlers.addAll(compensations);
+
+ // Keeping the fault to let everybody know
+ if (faultData != null && _fault == null) {
+ _fault = faultData;
+ }
+ if (shouldContinue() && _fault == null && !_terminateRequested) {
+ // Everything fine. If parrallel, just let our children be, otherwise making a new child
+ if (!_oforEach.parallel) newChild();
+ } else {
+ // Work is done or something wrong happened, children shouldn't continue
+ for (Iterator<ChildInfo> i = active(); i.hasNext(); )
+ replication(i.next().activity.self).terminate();
+ }
+ instance(ACTIVE.this);
+ }
+ });
+ }
+ object(false,mlSet);
+ } else {
+ // No children left, either because they've all been executed or because we
+ // had to make them stop.
+ _self.parent.completed(_fault, _compHandlers);
}
- });
}
- object(false,mlSet);
- } else {
- // No children left, either because they've all been executed or because we
- // had to make them stop.
- _self.parent.completed(_fault, _compHandlers);
- }
- }
- }
-
- private boolean shouldContinue() {
- boolean stop = false;
- if (_completionCounter > 0) {
- stop = (_completedCounter >= _completionCounter) || stop;
- }
- stop = (_startCounter + _completedCounter > _finalCounter) || stop;
- return !stop;
- }
-
- private int evaluateCondition(OExpression condition)
- throws FaultException {
- try {
- return getBpelRuntimeContext().getExpLangRuntime().
- evaluateAsNumber(condition, getEvaluationContext()).intValue();
- } catch (EvaluationException e) {
- String msg;
- if (condition instanceof OXPath10Expression)
- msg = "ForEach counter value " + ((OXPath10Expression)condition).xpath +
- " couldn't be evaluated as xs:unsignedInt.";
- else msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
- __log.error(msg, e);
- throw new FaultException(_oforEach.getOwner().constants.qnForEachCounterError,msg);
- }
- }
-
- private void newChild() {
- ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.innerScope,
- newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class)));
- _children.add(child);
-
- // Creating the current counter value node
- Document doc = DOMUtils.newDocument();
- Node counterNode = doc.createTextNode(""+_currentCounter++);
-
- // Instantiating the scope directly to keep control of its scope frame, allows
- // the introduction of the counter variable in there (monkey business that is).
- ScopeFrame newFrame = new ScopeFrame(
- _oforEach.innerScope, getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
- _oforEach.innerScope), _scopeFrame, null);
- getBpelRuntimeContext().initializeVariable(newFrame.resolve(_oforEach.counterVariable), counterNode);
- instance(new SCOPE(child.activity, newFrame, _linkFrame));
- }
-
- public String toString() {
- return "<T:Act:Flow:" + _oforEach.name + ">";
- }
-
- private Iterator<ChildInfo> active() {
- return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
- public boolean isMember(ChildInfo childInfo) {
- return !childInfo.completed;
- }
- });
- }
+ }
+
+ private boolean shouldContinue() {
+ boolean stop = false;
+ if (_completionCounter > 0) {
+ stop = (_completedCounter >= _completionCounter) || stop;
+ }
+ stop = (_startCounter + _completedCounter > _finalCounter) || stop;
+ return !stop;
+ }
+
+ private int evaluateCondition(OExpression condition)
+ throws FaultException {
+ try {
+ return getBpelRuntimeContext().getExpLangRuntime().
+ evaluateAsNumber(condition, getEvaluationContext()).intValue();
+ } catch (EvaluationException e) {
+ String msg;
+ if (condition instanceof OXPath10Expression)
+ msg = "ForEach counter value " + ((OXPath10Expression)condition).xpath +
+ " couldn't be evaluated as xs:unsignedInt.";
+ else msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
+ __log.error(msg, e);
+ throw new FaultException(_oforEach.getOwner().constants.qnForEachCounterError,msg);
+ }
+ }
+
+ private void newChild() {
+ ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.innerScope,
+ newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class)));
+ _children.add(child);
+
+ // Creating the current counter value node
+ Document doc = DOMUtils.newDocument();
+ Node counterNode = doc.createTextNode(""+_currentCounter++);
+
+ // Instantiating the scope directly to keep control of its scope frame, allows
+ // the introduction of the counter variable in there (monkey business that is).
+ ScopeFrame newFrame = new ScopeFrame(
+ _oforEach.innerScope, getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
+ _oforEach.innerScope), _scopeFrame, null);
+ getBpelRuntimeContext().initializeVariable(newFrame.resolve(_oforEach.counterVariable), counterNode);
+ instance(new SCOPE(child.activity, newFrame, _linkFrame));
+ }
+
+ public String toString() {
+ return "<T:Act:Flow:" + _oforEach.name + ">";
+ }
+
+ private Iterator<ChildInfo> active() {
+ return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
+ public boolean isMember(ChildInfo childInfo) {
+ return !childInfo.completed;
+ }
+ });
+ }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java?view=diff&rev=453057&r1=453056&r2=453057
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java Wed Oct 4 15:56:58 2006
@@ -37,6 +37,8 @@
import org.apache.ode.bpel.runtime.channels.TerminationChannelListener;
import org.apache.ode.bpel.runtime.channels.TimerResponseChannel;
import org.apache.ode.bpel.runtime.channels.TimerResponseChannelListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -44,222 +46,227 @@
* JacobRunnable that performs the work of the <code>invoke</code> activity.
*/
public class INVOKE extends ACTIVITY {
- private static final long serialVersionUID = 992248281026821783L;
+ private static final long serialVersionUID = 992248281026821783L;
- private OInvoke _oinvoke;
- // Records number of invocations on the activity.
- private int _invoked;
- // Date/time of last failure.
- private Date _lastFailure;
- // Reason for last failure.
- private String _failureReason;
- // Data associated with failure.
- private Element _failureData;
-
-
- public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
- super(self, scopeFrame, linkFrame);
- _oinvoke = (OInvoke) _self.o;
- _invoked = 0;
- }
-
- public final void run() {
- Element outboundMsg;
- try {
- outboundMsg = setupOutbound(_oinvoke, _oinvoke.initCorrelationsInput);
- } catch (FaultException e) {
- FaultData fault = createFault(e.getQName(), _oinvoke);
- _self.parent.completed(fault, CompensationHandler.emptySet());
- return;
+ private static final Log __log = LogFactory.getLog(INVOKE.class);
+
+ private OInvoke _oinvoke;
+ // Records number of invocations on the activity.
+ private int _invoked;
+ // Date/time of last failure.
+ private Date _lastFailure;
+ // Reason for last failure.
+ private String _failureReason;
+ // Data associated with failure.
+ private Element _failureData;
+
+
+ public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ _oinvoke = (OInvoke) _self.o;
+ _invoked = 0;
}
- ++_invoked;
- // if there is no output variable, then this is a one-way invoke
- boolean isTwoWay = _oinvoke.outputVar != null;
+ public final void run() {
+ Element outboundMsg;
+ try {
+ outboundMsg = setupOutbound(_oinvoke, _oinvoke.initCorrelationsInput);
+ } catch (FaultException e) {
+ __log.error(e);
+ FaultData fault = createFault(e.getQName(), _oinvoke);
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ return;
+ }
+ ++_invoked;
+
+ // if there is no output variable, then this is a one-way invoke
+ boolean isTwoWay = _oinvoke.outputVar != null;
+
+ try {
+ if (!isTwoWay) {
+ FaultData faultData = null;
+ getBpelRuntimeContext().invoke(
+ _scopeFrame.resolve(_oinvoke.partnerLink),
+ _oinvoke.operation,
+ outboundMsg,
+ null);
+
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+
+ } else /* two-way */{
+ final VariableInstance outputVar = _scopeFrame
+ .resolve(_oinvoke.outputVar);
+ InvokeResponseChannel invokeResponseChannel = newChannel(InvokeResponseChannel.class);
+
+ final String mexId = getBpelRuntimeContext().invoke(
+ _scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
+ outboundMsg,
+ invokeResponseChannel);
+
+ object(new InvokeResponseChannelListener(invokeResponseChannel) {
+ private static final long serialVersionUID = 4496880438819196765L;
+
+ public void onResponse() {
+ // we don't have to write variable data -> this already
+ // happened in the nativeAPI impl
+ FaultData fault = null;
+
+ Element response;
+ try {
+ response = getBpelRuntimeContext().getPartnerResponse(mexId);
+ } catch (Exception ex) {
+ // TODO: Better error handling
+ throw new RuntimeException(ex);
+ }
+
+ getBpelRuntimeContext().initializeVariable(outputVar, response);
+
+ try {
+ for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
+ initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
+ }
+ if (_oinvoke.partnerLink.hasPartnerRole()) {
+ // Trying to initialize partner epr based on a message-provided epr/session.
+ if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
+ .resolve(_oinvoke.partnerLink))) {
+
+ Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
+ if (fromEpr != null) {
+ getBpelRuntimeContext().writeEndpointReference(
+ _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
+ }
+ }
+
+ String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
+ if (partnersSessionId != null)
+ getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.partnerLink),
+ partnersSessionId);
+
+ }
+ } catch (FaultException e) {
+ __log.error(e);
+ fault = createFault(e.getQName(), _oinvoke);
+ }
+
+ // TODO update output variable with data from non-initiate
+ // correlation sets
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ }
+
+ public void onFault() {
+ QName faultName = getBpelRuntimeContext().getPartnerFault(mexId);
+ Element msg = getBpelRuntimeContext().getPartnerResponse(mexId);
+ QName msgType = getBpelRuntimeContext().getPartnerResponseType(
+ mexId);
+ FaultData fault = createFault(faultName, msg,
+ _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ }
- try {
- if (!isTwoWay) {
- FaultData faultData = null;
- getBpelRuntimeContext().invoke(
- _scopeFrame.resolve(_oinvoke.partnerLink),
- _oinvoke.operation,
- outboundMsg,
- null);
-
- _self.parent.completed(faultData, CompensationHandler.emptySet());
-
- } else /* two-way */{
- final VariableInstance outputVar = _scopeFrame
- .resolve(_oinvoke.outputVar);
- InvokeResponseChannel invokeResponseChannel = newChannel(InvokeResponseChannel.class);
-
- final String mexId = getBpelRuntimeContext().invoke(
- _scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
- outboundMsg,
- invokeResponseChannel);
-
- object(new InvokeResponseChannelListener(invokeResponseChannel) {
- private static final long serialVersionUID = 4496880438819196765L;
-
- public void onResponse() {
- // we don't have to write variable data -> this already
- // happened in the nativeAPI impl
- FaultData fault = null;
-
- Element response;
- try {
- response = getBpelRuntimeContext().getPartnerResponse(mexId);
- } catch (Exception ex) {
- // TODO: Better error handling
- throw new RuntimeException(ex);
- }
-
- getBpelRuntimeContext().initializeVariable(outputVar, response);
-
- try {
- for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.initCorrelationsOutput) {
- initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
- }
- if (_oinvoke.partnerLink.hasPartnerRole()) {
- // Trying to initialize partner epr based on a message-provided epr/session.
- if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
- .resolve(_oinvoke.partnerLink))) {
-
- Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
- if (fromEpr != null) {
- getBpelRuntimeContext().writeEndpointReference(
- _scopeFrame.resolve(_oinvoke.partnerLink), (Element) fromEpr);
+ public void onFailure() {
+ // This indicates a communication failure. We don't throw a fault,
+ // because there is no fault, instead we'll re-incarnate the invoke
+ // and either retry or indicate failure condition.
+ // admin to resume the process.
+ INVOKE.this.retryOrFailure(getBpelRuntimeContext().getPartnerFaultExplanation(mexId), null);
}
- }
-
- String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
- if (partnersSessionId != null)
- getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.partnerLink),
- partnersSessionId);
-
- }
- } catch (FaultException e) {
- fault = createFault(e.getQName(), _oinvoke);
+ });
}
+ } catch (FaultException fault) {
+ __log.error(fault);
+ FaultData faultData = createFault(fault.getQName(), _oinvoke, fault
+ .getMessage());
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+ }
+ }
- // TODO update output variable with data from non-initiate
- // correlation sets
- _self.parent.completed(fault, CompensationHandler.emptySet());
- }
+ private Element setupOutbound(OInvoke oinvoke,
+ Collection<OScope.CorrelationSet> outboundInitiations)
+ throws FaultException {
+ if (outboundInitiations.size() > 0) {
+ for (OScope.CorrelationSet c : outboundInitiations) {
+ initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.inputVar));
+ }
+ }
- public void onFault() {
- QName faultName = getBpelRuntimeContext().getPartnerFault(mexId);
- Element msg = getBpelRuntimeContext().getPartnerResponse(mexId);
- QName msgType = getBpelRuntimeContext().getPartnerResponseType(
- mexId);
- FaultData fault = createFault(faultName, msg,
- _oinvoke.getOwner().messageTypes.get(msgType), _self.o);
- _self.parent.completed(fault, CompensationHandler.emptySet());
- }
+ Node outboundMsg = getBpelRuntimeContext().fetchVariableData(
+ _scopeFrame.resolve(oinvoke.inputVar), false);
- public void onFailure() {
- // This indicates a communication failure. We don't throw a fault,
- // because there is no fault, instead we'll re-incarnate the invoke
- // and either retry or indicate failure condition.
- // admin to resume the process.
- INVOKE.this.retryOrFailure(getBpelRuntimeContext().getPartnerFaultExplanation(mexId), null);
- }
- });
- }
- } catch (FaultException fault) {
- FaultData faultData = createFault(fault.getQName(), _oinvoke, fault
- .getMessage());
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- }
- }
+ // TODO outbound message should be updated with non-initiate correlation
+ // sets
+ assert outboundMsg instanceof Element;
- private Element setupOutbound(OInvoke oinvoke,
- Collection<OScope.CorrelationSet> outboundInitiations)
- throws FaultException {
- if (outboundInitiations.size() > 0) {
- for (OScope.CorrelationSet c : outboundInitiations) {
- initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.inputVar));
- }
+ return (Element) outboundMsg;
}
- Node outboundMsg = getBpelRuntimeContext().fetchVariableData(
- _scopeFrame.resolve(oinvoke.inputVar), false);
-
- // TODO outbound message should be updated with non-initiate correlation
- // sets
- assert outboundMsg instanceof Element;
-
- return (Element) outboundMsg;
- }
-
- private void retryOrFailure(String reason, Element data) {
- _lastFailure = new Date();
- _failureReason = reason;
- _failureData = data;
-
- if (_self.getFailureHandling().faultOnFailure) {
- // No attempt to retry or enter activity recovery state, simply fault.
- FaultData faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _oinvoke, reason);
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- return;
+ private void retryOrFailure(String reason, Element data) {
+ _lastFailure = new Date();
+ _failureReason = reason;
+ _failureData = data;
+
+ if (_self.getFailureHandling().faultOnFailure) {
+ // No attempt to retry or enter activity recovery state, simply fault.
+ FaultData faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _oinvoke, reason);
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+ return;
+ }
+ // If maximum number of retries, enter activity recovery state.
+ if (_invoked > _self.getFailureHandling().retryFor) {
+ requireRecovery();
+ return;
+ }
+
+ Date future = new Date(new Date().getTime() + (_self.getFailureHandling().retryDelay * 1000));
+ final TimerResponseChannel timerChannel = newChannel(TimerResponseChannel.class);
+ getBpelRuntimeContext().registerTimer(timerChannel, future);
+ object(false, new TimerResponseChannelListener(timerChannel) {
+ public void onTimeout() {
+ instance(INVOKE.this);
+ }
+ public void onCancel() {
+ INVOKE.this.requireRecovery();
+ }
+ }.or(new TerminationChannelListener(_self.self) {
+ public void terminate() {
+ _self.parent.completed(null, CompensationHandler.emptySet());
+ object(new TimerResponseChannelListener(timerChannel) {
+ public void onTimeout() { }
+ public void onCancel() { }
+ });
+ }
+ }));
}
- // If maximum number of retries, enter activity recovery state.
- if (_invoked > _self.getFailureHandling().retryFor) {
- requireRecovery();
- return;
+
+ private void requireRecovery() {
+ sendEvent(new ActivityFailureEvent(_failureReason));
+ final ActivityRecoveryChannel recoveryChannel = newChannel(ActivityRecoveryChannel.class);
+ getBpelRuntimeContext().registerActivityForRecovery(recoveryChannel, _self.aId, _failureReason, _lastFailure, _failureData,
+ new String[] { "retry", "cancel", "fault" }, _invoked - 1);
+ object(false, new ActivityRecoveryChannelListener(recoveryChannel) {
+ public void retry() {
+ sendEvent(new ActivityRecoveryEvent("retry"));
+ getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+ instance(INVOKE.this);
+ }
+ public void cancel() {
+ sendEvent(new ActivityRecoveryEvent("cancel"));
+ getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+ _self.parent.completed(null, CompensationHandler.emptySet());
+ }
+ public void fault(FaultData faultData) {
+ sendEvent(new ActivityRecoveryEvent("fault"));
+ getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+ // TODO: real fault name.
+ if (faultData == null)
+ faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _self.o, _failureReason);
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+ }
+ }.or(new TerminationChannelListener(_self.self) {
+ public void terminate() {
+ getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
+ _self.parent.completed(null, CompensationHandler.emptySet());
+ }
+ }));
}
-
- Date future = new Date(new Date().getTime() + (_self.getFailureHandling().retryDelay * 1000));
- final TimerResponseChannel timerChannel = newChannel(TimerResponseChannel.class);
- getBpelRuntimeContext().registerTimer(timerChannel, future);
- object(false, new TimerResponseChannelListener(timerChannel) {
- public void onTimeout() {
- instance(INVOKE.this);
- }
- public void onCancel() {
- INVOKE.this.requireRecovery();
- }
- }.or(new TerminationChannelListener(_self.self) {
- public void terminate() {
- _self.parent.completed(null, CompensationHandler.emptySet());
- object(new TimerResponseChannelListener(timerChannel) {
- public void onTimeout() { }
- public void onCancel() { }
- });
- }
- }));
- }
-
- private void requireRecovery() {
- sendEvent(new ActivityFailureEvent(_failureReason));
- final ActivityRecoveryChannel recoveryChannel = newChannel(ActivityRecoveryChannel.class);
- getBpelRuntimeContext().registerActivityForRecovery(recoveryChannel, _self.aId, _failureReason, _lastFailure, _failureData,
- new String[] { "retry", "cancel", "fault" }, _invoked - 1);
- object(false, new ActivityRecoveryChannelListener(recoveryChannel) {
- public void retry() {
- sendEvent(new ActivityRecoveryEvent("retry"));
- getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
- instance(INVOKE.this);
- }
- public void cancel() {
- sendEvent(new ActivityRecoveryEvent("cancel"));
- getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
- _self.parent.completed(null, CompensationHandler.emptySet());
- }
- public void fault(FaultData faultData) {
- sendEvent(new ActivityRecoveryEvent("fault"));
- getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
- // TODO: real fault name.
- if (faultData == null)
- faultData = createFault(FailureHandling.FAILURE_FAULT_NAME, _self.o, _failureReason);
- _self.parent.completed(faultData, CompensationHandler.emptySet());
- }
- }.or(new TerminationChannelListener(_self.self) {
- public void terminate() {
- getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
- _self.parent.completed(null, CompensationHandler.emptySet());
- }
- }));
- }
}