You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ha...@apache.org on 2013/01/19 03:23:56 UTC

svn commit: r1435467 - in /ode/trunk: bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ jacob/src/main/java/org/apache/ode/jacob/ jacob/src/main/java/org/apache/ode/jacob/vpu/

Author: hadrian
Date: Sat Jan 19 02:23:56 2013
New Revision: 1435467

URL: http://svn.apache.org/viewvc?rev=1435467&view=rev
Log:
ODE-987. Added a CompositeProcess so we could consolidate the object() methods to only one

Added:
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java   (with props)
Modified:
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java Sat Jan 19 02:23:56 2013
@@ -52,6 +52,9 @@ import org.apache.ode.jacob.ReceiveProce
 import org.apache.ode.jacob.Synch;
 import org.w3c.dom.Element;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 class ACTIVITYGUARD extends ACTIVITY {
     private static final long serialVersionUID = 1L;
 
@@ -292,7 +295,7 @@ class ACTIVITYGUARD extends ACTIVITY {
                     getBpelRuntimeContext().registerActivityForRecovery(
                         recoveryChannel, _self.aId, _failure.reason, _failure.dateTime, _failure.data,
                         new String[] { "retry", "cancel", "fault" }, _failure.retryCount);
-                    object(false, new ReceiveProcess<ActivityRecovery>(recoveryChannel, new ActivityRecovery() {
+                    object(false, compose(new ReceiveProcess<ActivityRecovery>(recoveryChannel, new ActivityRecovery() {
                         public void retry() {
                             if (__log.isDebugEnabled())
                                 __log.debug("ActivityRecovery: Retrying activity " + _self.aId + " (user initiated)");
@@ -319,7 +322,7 @@ class ACTIVITYGUARD extends ACTIVITY {
                         }
                     }){
                         private static final long serialVersionUID = 8397883882810521685L;
-                    }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+                    }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
                         public void terminate() {
                             if (__log.isDebugEnabled())
                                 __log.debug("ActivityRecovery: Cancelling activity " + _self.aId + " (terminated by scope)");

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java Sat Jan 19 02:23:56 2013
@@ -35,10 +35,14 @@ import org.apache.ode.bpel.runtime.chann
 import org.apache.ode.bpel.runtime.channels.Termination;
 import org.apache.ode.bpel.runtime.channels.TimerResponse;
 import org.apache.ode.jacob.ChannelListener;
+import org.apache.ode.jacob.CompositeProcess;
 import org.apache.ode.jacob.ReceiveProcess;
 import org.apache.ode.jacob.Synch;
 import org.w3c.dom.Element;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 /**
  * Alarm event handler. This process template manages a single alarm event handler.
  * It acts like an activity in that it can be terminated, but also adds a channel for
@@ -127,14 +131,13 @@ class EH_ALARM extends BpelJacobRunnable
         public void run() {
             Calendar now = Calendar.getInstance();
 
-            Set<ChannelListener> listeners = new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
+            CompositeProcess listeners = compose(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
                 public void stop() {
                     _psc.completed(null, _comps);
                 }
-
             }){
                 private static final long serialVersionUID = -7750428941445331236L;
-            }.or(new ReceiveProcess<Termination>(_tc, new Termination() {
+            }).or(new ReceiveProcess<Termination>(_tc, new Termination() {
                 public void terminate() {
                     _psc.completed(null, _comps);
                 }
@@ -148,7 +151,7 @@ class EH_ALARM extends BpelJacobRunnable
                 TimerResponse trc = newChannel(TimerResponse.class);
                 getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
 
-                listeners.add(new ReceiveProcess<TimerResponse>(trc, new TimerResponse(){
+                listeners.or(new ReceiveProcess<TimerResponse>(trc, new TimerResponse(){
                     public void onTimeout() {
                         // This is what we are waiting for, fire the activity
                         instance(new FIRE());
@@ -204,7 +207,7 @@ class EH_ALARM extends BpelJacobRunnable
         }
 
         public void run() {
-            object(false,new ReceiveProcess<ParentScope>(_activity.parent, new ParentScope() {
+            object(false, compose(new ReceiveProcess<ParentScope>(_activity.parent, new ParentScope() {
                 public void compensate(OScope scope, Synch ret) {
                     _psc.compensate(scope,ret);
                     instance(ACTIVE.this);
@@ -238,14 +241,14 @@ class EH_ALARM extends BpelJacobRunnable
                 public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
             }){
                 private static final long serialVersionUID = -3357030137175178040L;
-            }.or(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
+            }).or(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
                 public void stop() {
                     _stopped = true;
                     instance(ACTIVE.this);
                 }
             }){
                 private static final long serialVersionUID = -3873619538789039424L;
-            }.or(new ReceiveProcess<Termination>(_tc, new Termination() {
+            }).or(new ReceiveProcess<Termination>(_tc, new Termination() {
                 public void terminate() {
                     replication(_activity.self).terminate();
                     _stopped = true;
@@ -253,7 +256,7 @@ class EH_ALARM extends BpelJacobRunnable
                 }
             }){
                 private static final long serialVersionUID = -4566956567870652885L;
-            })));
+            }));
         }
     }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java Sat Jan 19 02:23:56 2013
@@ -38,6 +38,9 @@ import org.apache.ode.utils.DOMUtils;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 /**
  * JacobRunnable that performs the work of the <code>invoke</code> activity.
  */
@@ -96,7 +99,7 @@ public class INVOKE extends ACTIVITY {
                         _scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
                         outboundMsg, invokeResponseChannel);
 
-                object(false, new ReceiveProcess<InvokeResponse>(invokeResponseChannel, new InvokeResponse() {
+                object(false, compose(new ReceiveProcess<InvokeResponse>(invokeResponseChannel, new InvokeResponse() {
                     public void onResponse() {
                         // we don't have to write variable data -> this already
                         // happened in the nativeAPI impl
@@ -190,7 +193,7 @@ public class INVOKE extends ACTIVITY {
 
                 }){
                     private static final long serialVersionUID = 4496880438819196765L;
-                }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+                }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
                     public void terminate() {
                         _self.parent.completed(null, CompensationHandler.emptySet());
                     }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java Sat Jan 19 02:23:56 2013
@@ -50,6 +50,9 @@ import org.apache.ode.utils.xsd.Duration
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 /**
  * Template for the BPEL <code>pick</code> activity.
  */
@@ -279,7 +282,7 @@ class PICK extends ACTIVITY {
         }
 
         public void run() {
-            object(false, new ReceiveProcess<PickResponse>(_pickResponseChannel, new PickResponse() {
+            object(false, compose(new ReceiveProcess<PickResponse>(_pickResponseChannel, new PickResponse() {
                 public void onRequestRcvd(int selectorIdx, String mexId) {
                     OPickReceive.OnMessage onMessage = _opick.onMessages.get(selectorIdx);
 
@@ -373,7 +376,7 @@ class PICK extends ACTIVITY {
 
             }){
                 private static final long serialVersionUID = -8237296827418738011L;
-            }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+            }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
                 public void terminate() {
                     getBpelRuntimeContext().cancel(_pickResponseChannel);
                     instance(WAITING.this);

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java Sat Jan 19 02:23:56 2013
@@ -34,6 +34,9 @@ import org.apache.ode.jacob.ReceiveProce
 import org.apache.ode.jacob.Synch;
 import org.w3c.dom.Element;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 public class REPEATUNTIL extends ACTIVITY {
     private static final long serialVersionUID = 1L;
 
@@ -94,7 +97,7 @@ public class REPEATUNTIL extends ACTIVIT
         }
 
         public void run() {
-            object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+            object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
                 public void terminate() {
                     _terminated = true;
                     replication(_child.self).terminate();
@@ -102,7 +105,7 @@ public class REPEATUNTIL extends ACTIVIT
                 }
             }) {
                 private static final long serialVersionUID = -5471984635653784051L;
-            }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+            }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
                 public void compensate(OScope scope, Synch ret) {
                     _self.parent.compensate(scope,ret);
                     instance(WAITER.this);

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java Sat Jan 19 02:23:56 2013
@@ -35,6 +35,9 @@ import org.apache.ode.jacob.ReceiveProce
 import org.apache.ode.jacob.Synch;
 import org.w3c.dom.Element;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 /**
  * Implementation of the BPEL &lt;sequence&gt; activity.
  */
@@ -75,7 +78,7 @@ class SEQUENCE extends ACTIVITY {
         }
 
         public void run() {
-            object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+            object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
                 public void terminate() {
                     replication(_child.self).terminate();
 
@@ -89,7 +92,7 @@ class SEQUENCE extends ACTIVITY {
                 }
             }) {
                 private static final long serialVersionUID = -2680515407515637639L;
-            }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+            }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
                 public void compensate(OScope scope, Synch ret) {
                     _self.parent.compensate(scope,ret);
                     instance(ACTIVE.this);

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java Sat Jan 19 02:23:56 2013
@@ -32,6 +32,8 @@ import org.apache.ode.bpel.runtime.chann
 import org.apache.ode.jacob.ReceiveProcess;
 import org.apache.ode.utils.xsd.Duration;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
 
 /**
  * JacobRunnable that performs the work of the <code>&lt;wait&gt;</code> activity.
@@ -66,7 +68,7 @@ class WAIT extends ACTIVITY {
             final TimerResponse timerChannel = newChannel(TimerResponse.class);
             getBpelRuntimeContext().registerTimer(timerChannel, dueDate);
 
-            object(false, new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
+            object(false, compose(new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
                 public void onTimeout() {
                     _self.parent.completed(null, CompensationHandler.emptySet());
                 }
@@ -74,9 +76,9 @@ class WAIT extends ACTIVITY {
                 public void onCancel() {
                     _self.parent.completed(null, CompensationHandler.emptySet());
                 }
-            }){
+            }) {
                 private static final long serialVersionUID = 3120518305645437327L;
-            }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+            }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
                 public void terminate() {
                     _self.parent.completed(null, CompensationHandler.emptySet());
                     object(new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
@@ -94,7 +96,7 @@ class WAIT extends ACTIVITY {
             }) {
                 private static final long serialVersionUID = -2791243270691333946L;
             }));
-        }else{
+        } else {
             _self.parent.completed(null, CompensationHandler.emptySet());
         }
     }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java Sat Jan 19 02:23:56 2013
@@ -34,6 +34,9 @@ import org.apache.ode.jacob.ReceiveProce
 import org.apache.ode.jacob.Synch;
 import org.w3c.dom.Element;
 
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
 /**
  * BPEL &lt;while&gt; activity
  */
@@ -112,7 +115,7 @@ class WHILE extends ACTIVITY {
         }
 
         public void run() {
-            object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+            object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
                 public void terminate() {
                     _terminated = true;
                     replication(_child.self).terminate();
@@ -120,7 +123,7 @@ class WHILE extends ACTIVITY {
                 }
             }) {
                 private static final long serialVersionUID = -5471984635653784051L;
-            }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+            }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
                 public void compensate(OScope scope, Synch ret) {
                     _self.parent.compensate(scope,ret);
                     instance(WAITER.this);

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java Sat Jan 19 02:23:56 2013
@@ -18,9 +18,6 @@
  */
 package org.apache.ode.jacob;
 
-import java.lang.reflect.Method;
-import java.util.HashSet;
-import java.util.Set;
 
 
 /**
@@ -29,38 +26,4 @@ import java.util.Set;
  */
 @SuppressWarnings("serial")
 public abstract class ChannelListener extends JacobObject {
-
-    public abstract Set<Method> getImplementedMethods();
-
-    public Set<ChannelListener> or(ChannelListener other) {
-        HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
-        retval.add(this);
-        retval.add(other);
-        return retval;
-    }
-
-    public Set<ChannelListener> or(Set<ChannelListener> other) {
-        HashSet<ChannelListener> retval = new HashSet<ChannelListener>(other);
-        retval.add(this);
-        return retval;
-    }
-
-    /**
-     * Get a description of the object for debugging purposes.
-     *
-     * @return human-readable description.
-     */
-    public String toString() {
-        // TODO: needs improvement
-        StringBuffer buf = new StringBuffer(getClassName());
-        buf.append('{');
-        for (Method m : getImplementedMethods()) {
-            buf.append(m.getName());
-            buf.append("()");
-            buf.append("&");
-        }
-        buf.setLength(buf.length()-1);
-        buf.append('}');
-        return buf.toString();
-    }
 }

Added: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java?rev=1435467&view=auto
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java (added)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java Sat Jan 19 02:23:56 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+
+@SuppressWarnings("serial")
+public final class CompositeProcess extends ChannelListener {
+    private Set<ChannelListener> processes = new HashSet<ChannelListener>();
+
+    public CompositeProcess() {
+    }
+    
+    public Method getMethod(String methodName) {
+        // Must call getMethod(String) on each of the getProcesses(). Use instanceof if necessary.
+        throw new IllegalStateException("Calling getMethod() on a CompositeProcess is illegal.");
+    }
+
+    public Set<Method> getImplementedMethods() {
+        // Must call getImplementedMethods on each of the getProcesses(). Use instanceof if necessary.
+        throw new IllegalStateException("Calling getImplementationMethods() on a CompositeProcess is illegal.");
+    }
+
+    public Set<ChannelListener> getProcesses() {
+        return Collections.unmodifiableSet(processes);
+    }
+
+    public CompositeProcess or(ChannelListener process) {
+        processes.add(process);
+        return this;
+    }
+}

Propchange: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java Sat Jan 19 02:23:56 2013
@@ -37,6 +37,11 @@ public final class ProcessUtil {
     	    + channel == null ? "<null>" : channel.getClass().toString());
     }
     
+    public static CompositeProcess compose(ReceiveProcess<?> process) {
+        CompositeProcess result = new CompositeProcess();
+        return result.or(process);
+    }
+
     @SuppressWarnings("serial")
 	public static <T extends Channel> ChannelListener receive(T proxy, T listener) {
     	// TODO: NOTE: this *only* works when the listnere doesn't need to be Serialiazble really

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java Sat Jan 19 02:23:56 2013
@@ -30,7 +30,7 @@ public abstract class ReceiveProcess<T e
     private transient Channel channel;
     private T receiver;
     
-    protected ReceiveProcess(T channel, T receiver) throws IllegalStateException {
+    protected ReceiveProcess(T channel, T receiver) {
         assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
             "Inheritance in ChannelListener classes not allowed!";
         if (channel == null) {
@@ -60,4 +60,18 @@ public abstract class ReceiveProcess<T e
         }
         return _implementedMethods;
     }
+
+    public String toString() {
+        // TODO: needs improvement
+        StringBuffer buf = new StringBuffer(getClassName());
+        buf.append('{');
+        for (Method m : getImplementedMethods()) {
+            buf.append(m.getName());
+            buf.append("()");
+            buf.append(",");
+        }
+        buf.setLength(buf.length()-1);
+        buf.append('}');
+        return buf.toString();
+    }
 }

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Sat Jan 19 02:23:56 2013
@@ -26,6 +26,7 @@ import java.util.Stack;
 
 import org.apache.ode.jacob.Channel;
 import org.apache.ode.jacob.ChannelListener;
+import org.apache.ode.jacob.CompositeProcess;
 import org.apache.ode.jacob.JacobObject;
 import org.apache.ode.jacob.JacobRunnable;
 import org.apache.ode.jacob.JacobThread;
@@ -345,14 +346,7 @@ public final class JacobVPU {
 
             CommGroup grp = new CommGroup(replicate);
             for (int i = 0; i < ml.length; ++i) {
-                if (ml[i] instanceof ReceiveProcess) {
-                    CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
-                        ((ReceiveProcess<?>)ml[i]).getChannel());
-                    // TODO see below..
-                    // oframe.setDebugInfo(fillDebugInfo());
-                    CommRecv recv = new CommRecv(chnl, ml[i]);
-                    grp.add(recv);
-                }
+                addCommChannel(grp, ml[i]);
             }
             _executionQueue.add(grp);
         }
@@ -361,6 +355,23 @@ public final class JacobVPU {
             object(replicate, new ChannelListener[] { methodList });
         }
 
+        private void addCommChannel(CommGroup group, ChannelListener receiver) {
+            if (receiver instanceof CompositeProcess) {
+                for (ChannelListener r : ((CompositeProcess)receiver).getProcesses()) {
+                    addCommChannel(group, r);
+                }
+            } else if (receiver instanceof ReceiveProcess) {
+                CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
+                        ((ReceiveProcess<?>)receiver).getChannel());
+                    // TODO see below..
+                    // oframe.setDebugInfo(fillDebugInfo());
+                    CommRecv recv = new CommRecv(chnl, receiver);
+                    group.add(recv);
+            } else {
+                throw new IllegalStateException("Don't know how to handle Process type...");
+            }
+        }
+
         /* UNUSED
          private DebugInfo fillDebugInfo() {
             // Some of the debug information is a bit lengthy, so lets not put