You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by Apache Wiki <wi...@apache.org> on 2006/06/20 01:04:43 UTC

[Ode Wiki] Update of "Jacob" by MatthieuRiou

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Ode Wiki" for change notification.

The following page has been changed by MatthieuRiou:
http://wiki.apache.org/ode/Jacob

New page:
= Introduction =

PXE's BPEL implementation relies on the JACOB framework to implement the BPEL constructs. The framework provides the mechanism necessary to deal with two key issues in implementing BPEL constructs:

  1. Persistence of execution state.
  1. Concurrency.

By rolling up these concerns in the framework, the implementation of the BPEL constructs can be simpler by limiting itself to implementing the BPEL logic and not the infrastructure necessary to support it.

= Rational behind the model =

Let's start from the most classical example of all:

{{{#!java
void process(order) {
  billingService.bill(order.billing);
  shippingService.ship(order.product, order.shipping, self);
  shipping = receive("shipping")
  order.customer.send(shipping.details);
}
}}}

It's Java like pseudo code that works extremely well, unless:

  1. You fail somewhere in the middle and the customer has to order a second time.
  1. Your have too many waiting threads and the VM crashes.

So you change it to:

{{{#!java
void process(order) {
  billingService.bill(order.billing);
  shippingService.ship(order.product, order.shipping, self);
  listenOn("shipping", part2);
}
}}}

{{{#!java
void part2(shipping) {
  order.customer.send(shipping.details);
}
}}}

That's almost better, but still has a lot of points of failure, where you're not sure if you actually billed the customer and shipped the product or not. So:

{{{#!java
void process(order) {
  billingService.bill(order.billing);
  continue("part2");
}

void part2() {
  shippingService.ship(order.product, order.shipping, self);
  listenOn("shipping", part2);
}

void part2(shipping) {
  order.customer.send(shipping.details);
}  
}}}

You're just fracturing the code with two primitives: continue which lets you persist state and go to the next one, and listenOn which lets you persist state and wait for an external event.

Fracturing the code also addresses concurrency of execution even if you only have one thread. For example you could have a large number of 'process' calls to bill and ship an order. As we've broken down the whole treatment into several small parts, we can control when these separate parts actually get called and executed. And we're free to order them.

Let's say we have a small process doing in a sequence:
{{{
1. Invoke
2. Receive
3. Wait
4. Invoke
}}}
If we have 2 parrallel executions of this process and implement it in a simple way we'd have:
{{{
1. Invoke1
2. Receive1
3. Wait1
4. Invoke1
5. Invoke2
6. Receive2
7. Wait2
8. Invoke2
}}}
However if we break down the code as shown above by introducing a "middle man" (or stack) and do not allow activities to directly call each other (or do not have the activity directly calling the sequence that directly calls the next activity) we could well obtain the following:
{{{
1. Invoke1
5. Invoke2
2. Receive1
3. Wait1
6. Receive2
7. Wait2
4. Invoke1
8. Invoke2
}}}
>>From a client standpoint, we've achieved concurrency of execution even with one thread.

Next step is adding links, fault handling/termination, compensation and event handlers and seeing that continue/listenOn is all you need. The last step is just adding implementation details.

= Jacob Example =

Consider the issue of persistence. Imagine a simplified and naive implementation of the BPEL constructs <sequence>, <wait>, and <empty>:

{{{#!java
class Sequence extends Activity {
  /** From BPEL Definition */
  OSequence def;
  ...
  void run() {
     for (OActivity child:def.children)
       createActivity(child).run();
  }
}

class Wait extends Activity {
  /** From BPEL Definition */
  OWait def;
  ...
  void run() {
     Thread.wait(def.duration);
  }
}

class Empty extends Activity {
  /** From BPEL Definition */
  OEmpty def;
  ...
  void run() {
     // <empty> activity: do nothing.
  }
}
}}}

The above is the "natural" implementation of the constructs in the Java language. However, this implementation has some serious and obvious problems: the <wait> may specify a duration of days in which case a system failure during those days of waiting would mean that the process execution state would be lost. It would be nice to replace Thread.wait() in the Wait class with some sort of "suspend" operator that would save the execution state to disk.

However there are practical issues to "suspending" to disk. At the point we wish to suspend, the call stack looks like:
{{{
Sequence.run()
Wait.run()
}}}

In order to save the process to disk, we need to end the current thread of control which means popping both stack frames. To do this we have no choice but to require the implementation of Wait and Sequence to "cooperate" with this requirement thereby greatly complicating the implementation of those constructs. This also means that the "natural" implementation model cannot be used directly.

JACOB aims to solve this problem by providing an alternate "natural" model that allows execution state to be suspended /without cooperation from the implementation classes/. The idea in JACOB is to flatten the call stack and rely on explicit communication channels to handle control flow. We now consider a simplified JACOB representation of our three BPEL activities:

{{{#!java
class Empty  {
  OEmpty def;

  /** channel we use to notify parent we are done. */
  CompletionChannel myCompletionChannel;
  ...
  void run() {
     // <empty> activity: do nothing...except to
     // notify our parent that we are done.
     myCompletionChannel.completed();
  }
}

class Sequence  {
  OSequence def;
  CompletionChannel myCompletionChannel;
  ...
  void self() {
    // Start of by instantiating a sequential child runner for the first
    // (0th) child...
    instance(new SequenceChildRunner(0))
  }

  class SequenceChildRunner {
    int currentchild;
    SequenceChildRunner(int childNumber) { currentchild = childNumber; }
    void self() {
       if(currentChild == def.children.size()) {
         // We are past the last child, the sequence is done.
         myCompletionChannel.completed();
       } else {
         // We still have more children to run..
  
         // Create a completion channel for our child.
         CompletionChannel childCompletionChannel = 
            newChannel(CompletionChannel.class);

         // create a child activity based on the activity type
         // and parameterized with the model and the completion
         // channel we just created
         Activity childActivity = 
            createActivity(def.children.get(currentChild),
                           childCompletionChannel);

         // instantiate the child activity
         instance(childActivity);

         // create an object to wait for the "completed()" notification
         // from the child activity.
         object(new CompletionChannelML(childCompletionChannel)) {
            void completed() {
               // Ok, finished with the child, create a runner
               // to do the next child.
               instance(new SequenceChildRunner(currentChild+1));
            }
         }
       }
    }
  }
}

class Wait extends Activity {
  OWait def;
  CompletionChannel myCompletionChannel;
   
  ...
  void self() {
     // Create a channel for an externally managed alarm.
     TimerChannel timerChannel = newChannel(TimerChannel.class);
     // register the alarm with the runtime.
     getRuntimeContext().registerTimer(timerChannel, def.duration);

     // create an object to wait for the alarm and then send an
     // activity completed message to our parent.
     object(new TimerChannelML(timerChannel) {
        onTimer() {
           myCompletionChannel.completed();
        }
     });
   
  }
}
}}}

So Jacob constructs help us in breaking the execution stack.

= Main Jacob Concepts =

== Channels ==

As briefly demonstrated above, channels are interfaces used for communication between activities in PXE engine. There are several types of channels like TerminationChannel, ParentScopeChannel or CompensationChannel (their respective purpose should be obvious from their name). Some basic channels are provided to all activities when they're created to allow them to interact with their environment. When an activity wants to notifies its parent that it has terminated for example, it just calls its parent TerminationChannel (see the Empty example above).

Don't look for channels implementations because there are none. Channels implementation is provided through a dynamic proxy (see ChannelFactory.createChannel() and ChannelFactory.ChannelInvocationHandler for more). That's one of the levels of decoupling between invocation and actual execution in Jacob.

== JavaClosure / Abstraction ==

>>From Wikipedia: "A closure combines the code of a function with a special lexical environment bound to that function (scope). Closure lexical variables differ from global variables in that they do not occupy the global variable namespace. They differ from object oriented object variables in that they are bound to functions, not objects.". Normally closures aren't supported in Java so JavaClosure tries to feels that gap. But it's not a true closure anyway, which makes thing easier. Closures in Jacob are statically coded, whereas in most languages supporting closures these are dynamic. So basically in Jacob, a closure is expected to implement some methods and provides other utility methods to manipulate channels and replicate itself.

Abstraction is just a closure that requires the implementation of only one method: self(). As /all activities inherit from Abstraction/ they're all supposed to implement their main processing in this self() method. Their initialization occur in their respective constructors.

== Method Lists (MLs) ==

ML classes can be seen as the other end of a channel. Only they're not invoked directly when one calls a channel method, but only once the Jacob engine has popped the channel invocation from its internal stack (again you can see how the execution stack gets broken here).

Usually MLs implementations in PXE are inlined because it's just easier to declare them in the activities self() method. For example if you look at the Sequence example shown above you'll see something like:

{{{#!java
    void self() {
	...
         // create an object to wait for the "completed()" notification
         // from the child activity.
         object(new CompletionChannelML(childCompletionChannel)) {
            void completed() {
               // Ok, finished with the child, create a runner
               // to do the next child.
               instance(new SequenceChildRunner(currentChild+1));
            }
         }
       }
    }
}}}

The object method here is inherited from JavaClosure and is just a way to hand our ML to Jacob. So that the Jacob runtime can match it with an incoming channel message later on.

== VPU and Soup ==

The VPU is where all the Jacob processing is occuring. When an JavaClosure is injected inside the VPU, it's actually registered as a Reaction, which is just wrapping the closure with the method to call on the closure to execute it (in our case always the self() method as we're only dealing with Abstraction instances).

The Soup is just a container for all the artifacts managed by the VPU (mostly channels and reactions) to organize them in queues where artifacts can be pushed and popped. It also records some execution statistics.

So the VPU main processing is just dequeuing a reaction from the soup and executing it. That's all (check JacobVPU.execute(), you'll see that I'm not lying). However when the Abstraction (usually an activity) gets executed the following things can happen:

* if other abstractions (usually other activities) are created, they will be appended to the reaction queue,
* if new channels are created, they will be saved for later usage,
* if channels are invoked, the message will be saved to match against a new ML,
* if a new ML instance is created, it will be submitted to the VPU that will try to match it against a channel invocation.

There's one more thing that should be mentioned here. Reactions (and hence Abstractions) don't "stay" in the VPU queues. They just get popped, executed and that's it. So if an abstraction must last more than one execution, it should simply fork itself. This explains why in our Sequence example already pasted above we see the line:

{{{#!java
   instance(new SequenceChildRunner(currentChild+1));
}}}

This simple adds a new ChildRunner that will monitor the next child completion. If you browse PXE's activities code you will even find things like instance(this) which directly enqueues a new instance of the same Jacob abstraction.

= Walking through examples =

== While ==

{{{#!xml
<process name="while1" 
    targetNamespace="http://pxe/bpel/unit-test" 
    xmlns:bpws="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
    xmlns="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
    xmlns:tns="http://pxe/bpel/unit-test"
    xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:test="http://pxe/bpel/unit-test.wsdl" 
    suppressJoinFailure="yes">
   <partnerLinks>
      <partnerLink name="testPartnerLink" 
         partnerLinkType="test:TestPartnerLinkType" 
         myRole="me" />
   </partnerLinks>

   <variables>
     <variable name="var1" messageType="test:TestMessage2"/>
   </variables>

   <sequence>
       <receive 
          createInstance="yes"
          name="startReceive"
          partnerLink="testPartnerLink"
          portType="test:TestPortType"
          operation="testOperation"
          variable="var1"/>
      <while condition="bpws:getVariableData('var1', 'TestPart') &lt; 10">
        <assign>
           <copy>
             <from expression="bpws:getVariableData('var1', 'TestPart') + 1"/>
             <to variable="var1" part="TestPart"/>
          </copy>
        </assign>
       </while>
       <reply name="endReply" 
              operation="testOperation" 
              partnerLink="testPartnerLink"
              portType="test:TestPortType" 
              variable="var1"/>
   </sequence>
</process>
}}}

Everything starts with a receive. So our entry point here in our Jacob-focused discussion is going to be [BpelProcess.PartnerLinkMyRoleImpl.inputMsgRcvd|http://svn.apache.org/repos/asf/incubator/ode/scratch/pxe/bpel-runtime/src/main/java/com/fs/pxe/bpel/runtime/BpelProcess.java]. The code that matters to us now is the following (executed when a message is targeted at a createInstance receive):

{{{#!java
    BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), messageExchange);
    ...
    // run the vpu
    instance.execute();
}}}

If you check the code executed by BpelRuntimeContextImpl constructor you'll see among other things the following:

{{{#!java
    if (PROCESS != null) {
      vpu.inject(PROCESS);
    }
}}}

The process itself get injected. When executed, PROCESS just instantiates a scope to control the execution of its child activity and starts listening on compensation and completion channel. From the process we go to a scope, then our main sequence and finally our receive.

Receives are just mapped to a pick onMessage so its Jacob implementation should be looked for in PICK. The PICK is just about isolating the right correlations and selecting a message for it, then waiting for the message. In our createInstance case we'll be more interested in the following code, located in BpelRuntimeContextImpl.select() (and called by PICK):

{{{#!java
    if (_instantiatingMessageExchange != null && _dao.getState() == ProcessState.STATE_READY) {
      for (int i = 0 ; i < correlators.size(); ++i) {
        CorrelatorDAO ci = correlators.get(i);
        if (ci.equals(_dao.getInstantiatingCorrelator())) {
          inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
          return;
        }
      }
    }
}}}

Which just happens to call something like:

{{{#!java
    vpu.inject(new Abstraction() {
        public void self() {
            PickResponseChannel responseChannel = importChannel(responsechannel, PickResponseChannel.class);
            responseChannel.onRequestRcvd(idx, mex);
        }
    });
}}}

That's where things really start. When injected, this abstraction just calls the response channel for our receive. The other side of this channel is implemented as an ML in the PICK:

{{{#!java
    object(false, new PickResponseML(_pickResponseChannel) {
        public void onRequestRcvd(int selectorIdx, Object msgex) {
            ...
          ActivityInfo child = new ActivityInfo(genMonotonic(), onMessage.activity, _self.self, _self.parent);
          instance(createChild(child,_scopeFrame,_linkFrame));
        }
    });
}}}

This method just does what a receive needs to do (like variable and correlation initialization) and creates a new child. When dealing with a real pick, this child would be the onMessage activity, however in the case of a receive, this is an empty activity. So when does our receive completes? Well, when the child completes. As you can see on the child constructor, we're passing the same ParentScopeML that we've been provided. So when the child completes, the receive's parent is notified which means to our receive doesn't need to do it itself. And an empty immediately completes: 

{{{#!java
    _self.parent.completed(null, CompensationHandler.emptySet());
}}}

The parent sequence gets notified almost immediately after the onRequestRcvd() methods finishes.

Now how does our sequence gets back the control? Well, once again, let's look at the ML, the other side of the channel. As one of the most important job of the VPU is matching channels invocations and MLs, we'll get to the sequence byt its ParentScopeML implementation:

{{{#!java
class SEQUENCE extends ACTIVITY {
  ...
  private class ACTIVE extends BpelAbstraction {
    ....
    public void self() {
      ...
      object(new ParentScopeML(_child.parent) {
        public void compensate(OScope scope, SynchChannel ret) {
          _self.parent.compensate(scope,ret);
          instance(ACTIVE.this);
        }

        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
          HashSet<CompensationHandler> comps = new HashSet<CompensationHandler>(_compensations);
          comps.addAll(compensations);
          if (faultData != null || _terminateRequested || _remaining.size() <= 1) {
            _self.parent.completed(faultData, comps);
          } else /* !fault && ! terminateRequested && !remaining.isEmpty */ {
            ArrayList<OActivity> remaining = new ArrayList<OActivity>(_remaining);
            remaining.remove(0);
            instance(new SEQUENCE(_self, _scopeFrame, _linkFrame, remaining, comps));
          }
        }
      }));
    }
  }
  ...
}
}}}

The method that will get executed is of course the completed() method. It simply completes if a fault has been thrown, a termination has been requested and if no child activities remain. Being of an optimistic nature, we'll check what happens when everything goes just fine. In this second case a remaining activity is removed and the SEQUENCE abstraction itself is reinstantiated. Which leads us to what the SEQUENCE does:

{{{#!java
  public void self() {
    final ActivityInfo child = new  ActivityInfo(genMonotonic(),
            _remaining.get(0),
            newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
    instance(createChild(child, _scopeFrame, _linkFrame));
    instance(new ACTIVE(child));
  }
}}}

As you can see, it just instantiates the next child abstraction and also another abstraction named ACTIVE. So what's this ACTIVE that we've already seen a bit before? Well, it's just the abstraction that keeps on following child activities when they execute. It's more like a convention on all containment-based activity in PXE (while, sequence, pick, ...) that the main activity abstraction just kicks off the processing. Then an ACTIVE (also sometimes called WAITER) abstraction takes care of following the children.

Rolling on to the next step, we've just instantiated an abstraction for the while in our example process, as it's the next child of the sequence. So what happens there?

{{{#!java
  public void self() {
    boolean condResult = false;
    try {
      condResult = checkCondition();
    } catch (FaultException fe) {
      _self.parent.completed(createFault(fe.getQName(), _self.o),_compHandlers);
      return;
    }
    if (condResult) {
      ActivityInfo child = new ActivityInfo(genMonotonic(),
              getOWhile().activity,
              newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
      instance(createChild(child, _scopeFrame, _linkFrame));
      instance(new WAITER(child));
    } else /* stop. */ {
      _self.parent.completed(null, _compHandlers);
    }
  }
}}}

Now you should be getting more familiar with that sort of code. First step is evaluating the while condition. If it turns out it's true, then a child abstraction gets created as well as a WAITER to follow its execution. The WAITER implementation is again pretty straightforward:

{{{#!java
  private class WAITER extends BpelAbstraction {
    private ActivityInfo _child;
    private boolean _terminated;

    WAITER(ActivityInfo child) {
      _child = child;
    }

    public void self() {
      object(false, new TerminationML(_self.self) {
        public void terminate() {
          _terminated = true;
          replication(_child.self).terminate();
          instance(WAITER.this);
        }
      }.or(new ParentScopeML(_child.parent) {
        public void compensate(OScope scope, SynchChannel ret) {
          _self.parent.compensate(scope,ret);
          instance(WAITER.this);
        }
        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
          _compHandlers.addAll(compensations);
          if (_terminated || faultData != null)
            _self.parent.completed(faultData, compensations);
          else
            instance(WHILE.this);
        }
      }));
    }
  }
}}}

Termination and compensation aren't doing anything really interesting. Completion, just like for the sequence, re-instantiates the WHILE abstraction. And that's how we get our loop, by re-instantiating the main WHILE abstraction (again evaluating the condition and creating a child if it's true).

Finally, when the while condition becomes false, it notifies its parent channel. The sequence then goes to our last activity: reply. As expected, the reply replies, just sending the variable content and notifying its parent for completion. The sequence has no more children to execute so it also notifies its own parent, which is the process. We then just declare the process to be completed and that's it! We're done!