You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ha...@apache.org on 2013/01/19 03:23:56 UTC
svn commit: r1435467 - in /ode/trunk:
bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/
jacob/src/main/java/org/apache/ode/jacob/
jacob/src/main/java/org/apache/ode/jacob/vpu/
Author: hadrian
Date: Sat Jan 19 02:23:56 2013
New Revision: 1435467
URL: http://svn.apache.org/viewvc?rev=1435467&view=rev
Log:
ODE-987. Added a CompositeProcess so we could consolidate the object() methods to only one
Added:
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java (with props)
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/ACTIVITYGUARD.java Sat Jan 19 02:23:56 2013
@@ -52,6 +52,9 @@ import org.apache.ode.jacob.ReceiveProce
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
class ACTIVITYGUARD extends ACTIVITY {
private static final long serialVersionUID = 1L;
@@ -292,7 +295,7 @@ class ACTIVITYGUARD extends ACTIVITY {
getBpelRuntimeContext().registerActivityForRecovery(
recoveryChannel, _self.aId, _failure.reason, _failure.dateTime, _failure.data,
new String[] { "retry", "cancel", "fault" }, _failure.retryCount);
- object(false, new ReceiveProcess<ActivityRecovery>(recoveryChannel, new ActivityRecovery() {
+ object(false, compose(new ReceiveProcess<ActivityRecovery>(recoveryChannel, new ActivityRecovery() {
public void retry() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Retrying activity " + _self.aId + " (user initiated)");
@@ -319,7 +322,7 @@ class ACTIVITYGUARD extends ACTIVITY {
}
}){
private static final long serialVersionUID = 8397883882810521685L;
- }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Cancelling activity " + _self.aId + " (terminated by scope)");
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/EH_ALARM.java Sat Jan 19 02:23:56 2013
@@ -35,10 +35,14 @@ import org.apache.ode.bpel.runtime.chann
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.bpel.runtime.channels.TimerResponse;
import org.apache.ode.jacob.ChannelListener;
+import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
/**
* Alarm event handler. This process template manages a single alarm event handler.
* It acts like an activity in that it can be terminated, but also adds a channel for
@@ -127,14 +131,13 @@ class EH_ALARM extends BpelJacobRunnable
public void run() {
Calendar now = Calendar.getInstance();
- Set<ChannelListener> listeners = new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
+ CompositeProcess listeners = compose(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
public void stop() {
_psc.completed(null, _comps);
}
-
}){
private static final long serialVersionUID = -7750428941445331236L;
- }.or(new ReceiveProcess<Termination>(_tc, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_tc, new Termination() {
public void terminate() {
_psc.completed(null, _comps);
}
@@ -148,7 +151,7 @@ class EH_ALARM extends BpelJacobRunnable
TimerResponse trc = newChannel(TimerResponse.class);
getBpelRuntimeContext().registerTimer(trc,_alarm.getTime());
- listeners.add(new ReceiveProcess<TimerResponse>(trc, new TimerResponse(){
+ listeners.or(new ReceiveProcess<TimerResponse>(trc, new TimerResponse(){
public void onTimeout() {
// This is what we are waiting for, fire the activity
instance(new FIRE());
@@ -204,7 +207,7 @@ class EH_ALARM extends BpelJacobRunnable
}
public void run() {
- object(false,new ReceiveProcess<ParentScope>(_activity.parent, new ParentScope() {
+ object(false, compose(new ReceiveProcess<ParentScope>(_activity.parent, new ParentScope() {
public void compensate(OScope scope, Synch ret) {
_psc.compensate(scope,ret);
instance(ACTIVE.this);
@@ -238,14 +241,14 @@ class EH_ALARM extends BpelJacobRunnable
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
}){
private static final long serialVersionUID = -3357030137175178040L;
- }.or(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
+ }).or(new ReceiveProcess<EventHandlerControl>(_cc, new EventHandlerControl() {
public void stop() {
_stopped = true;
instance(ACTIVE.this);
}
}){
private static final long serialVersionUID = -3873619538789039424L;
- }.or(new ReceiveProcess<Termination>(_tc, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_tc, new Termination() {
public void terminate() {
replication(_activity.self).terminate();
_stopped = true;
@@ -253,7 +256,7 @@ class EH_ALARM extends BpelJacobRunnable
}
}){
private static final long serialVersionUID = -4566956567870652885L;
- })));
+ }));
}
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/INVOKE.java Sat Jan 19 02:23:56 2013
@@ -38,6 +38,9 @@ import org.apache.ode.utils.DOMUtils;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
/**
* JacobRunnable that performs the work of the <code>invoke</code> activity.
*/
@@ -96,7 +99,7 @@ public class INVOKE extends ACTIVITY {
_scopeFrame.resolve(_oinvoke.partnerLink), _oinvoke.operation,
outboundMsg, invokeResponseChannel);
- object(false, new ReceiveProcess<InvokeResponse>(invokeResponseChannel, new InvokeResponse() {
+ object(false, compose(new ReceiveProcess<InvokeResponse>(invokeResponseChannel, new InvokeResponse() {
public void onResponse() {
// we don't have to write variable data -> this already
// happened in the nativeAPI impl
@@ -190,7 +193,7 @@ public class INVOKE extends ACTIVITY {
}){
private static final long serialVersionUID = 4496880438819196765L;
- }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
_self.parent.completed(null, CompensationHandler.emptySet());
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/PICK.java Sat Jan 19 02:23:56 2013
@@ -50,6 +50,9 @@ import org.apache.ode.utils.xsd.Duration
import org.w3c.dom.Element;
import org.w3c.dom.Node;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
/**
* Template for the BPEL <code>pick</code> activity.
*/
@@ -279,7 +282,7 @@ class PICK extends ACTIVITY {
}
public void run() {
- object(false, new ReceiveProcess<PickResponse>(_pickResponseChannel, new PickResponse() {
+ object(false, compose(new ReceiveProcess<PickResponse>(_pickResponseChannel, new PickResponse() {
public void onRequestRcvd(int selectorIdx, String mexId) {
OPickReceive.OnMessage onMessage = _opick.onMessages.get(selectorIdx);
@@ -373,7 +376,7 @@ class PICK extends ACTIVITY {
}){
private static final long serialVersionUID = -8237296827418738011L;
- }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
getBpelRuntimeContext().cancel(_pickResponseChannel);
instance(WAITING.this);
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPEATUNTIL.java Sat Jan 19 02:23:56 2013
@@ -34,6 +34,9 @@ import org.apache.ode.jacob.ReceiveProce
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
public class REPEATUNTIL extends ACTIVITY {
private static final long serialVersionUID = 1L;
@@ -94,7 +97,7 @@ public class REPEATUNTIL extends ACTIVIT
}
public void run() {
- object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+ object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
_terminated = true;
replication(_child.self).terminate();
@@ -102,7 +105,7 @@ public class REPEATUNTIL extends ACTIVIT
}
}) {
private static final long serialVersionUID = -5471984635653784051L;
- }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+ }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
public void compensate(OScope scope, Synch ret) {
_self.parent.compensate(scope,ret);
instance(WAITER.this);
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/SEQUENCE.java Sat Jan 19 02:23:56 2013
@@ -35,6 +35,9 @@ import org.apache.ode.jacob.ReceiveProce
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
/**
* Implementation of the BPEL <sequence> activity.
*/
@@ -75,7 +78,7 @@ class SEQUENCE extends ACTIVITY {
}
public void run() {
- object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+ object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
replication(_child.self).terminate();
@@ -89,7 +92,7 @@ class SEQUENCE extends ACTIVITY {
}
}) {
private static final long serialVersionUID = -2680515407515637639L;
- }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+ }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
public void compensate(OScope scope, Synch ret) {
_self.parent.compensate(scope,ret);
instance(ACTIVE.this);
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WAIT.java Sat Jan 19 02:23:56 2013
@@ -32,6 +32,8 @@ import org.apache.ode.bpel.runtime.chann
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.utils.xsd.Duration;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
/**
* JacobRunnable that performs the work of the <code><wait></code> activity.
@@ -66,7 +68,7 @@ class WAIT extends ACTIVITY {
final TimerResponse timerChannel = newChannel(TimerResponse.class);
getBpelRuntimeContext().registerTimer(timerChannel, dueDate);
- object(false, new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
+ object(false, compose(new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
public void onTimeout() {
_self.parent.completed(null, CompensationHandler.emptySet());
}
@@ -74,9 +76,9 @@ class WAIT extends ACTIVITY {
public void onCancel() {
_self.parent.completed(null, CompensationHandler.emptySet());
}
- }){
+ }) {
private static final long serialVersionUID = 3120518305645437327L;
- }.or(new ReceiveProcess<Termination>(_self.self, new Termination() {
+ }).or(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
_self.parent.completed(null, CompensationHandler.emptySet());
object(new ReceiveProcess<TimerResponse>(timerChannel, new TimerResponse() {
@@ -94,7 +96,7 @@ class WAIT extends ACTIVITY {
}) {
private static final long serialVersionUID = -2791243270691333946L;
}));
- }else{
+ } else {
_self.parent.completed(null, CompensationHandler.emptySet());
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/WHILE.java Sat Jan 19 02:23:56 2013
@@ -34,6 +34,9 @@ import org.apache.ode.jacob.ReceiveProce
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
+import static org.apache.ode.jacob.ProcessUtil.compose;
+
+
/**
* BPEL <while> activity
*/
@@ -112,7 +115,7 @@ class WHILE extends ACTIVITY {
}
public void run() {
- object(false, new ReceiveProcess<Termination>(_self.self, new Termination() {
+ object(false, compose(new ReceiveProcess<Termination>(_self.self, new Termination() {
public void terminate() {
_terminated = true;
replication(_child.self).terminate();
@@ -120,7 +123,7 @@ class WHILE extends ACTIVITY {
}
}) {
private static final long serialVersionUID = -5471984635653784051L;
- }.or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
+ }).or(new ReceiveProcess<ParentScope>(_child.parent, new ParentScope() {
public void compensate(OScope scope, Synch ret) {
_self.parent.compensate(scope,ret);
instance(WAITER.this);
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java Sat Jan 19 02:23:56 2013
@@ -18,9 +18,6 @@
*/
package org.apache.ode.jacob;
-import java.lang.reflect.Method;
-import java.util.HashSet;
-import java.util.Set;
/**
@@ -29,38 +26,4 @@ import java.util.Set;
*/
@SuppressWarnings("serial")
public abstract class ChannelListener extends JacobObject {
-
- public abstract Set<Method> getImplementedMethods();
-
- public Set<ChannelListener> or(ChannelListener other) {
- HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
- retval.add(this);
- retval.add(other);
- return retval;
- }
-
- public Set<ChannelListener> or(Set<ChannelListener> other) {
- HashSet<ChannelListener> retval = new HashSet<ChannelListener>(other);
- retval.add(this);
- return retval;
- }
-
- /**
- * Get a description of the object for debugging purposes.
- *
- * @return human-readable description.
- */
- public String toString() {
- // TODO: needs improvement
- StringBuffer buf = new StringBuffer(getClassName());
- buf.append('{');
- for (Method m : getImplementedMethods()) {
- buf.append(m.getName());
- buf.append("()");
- buf.append("&");
- }
- buf.setLength(buf.length()-1);
- buf.append('}');
- return buf.toString();
- }
}
Added: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java?rev=1435467&view=auto
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java (added)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java Sat Jan 19 02:23:56 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+
+@SuppressWarnings("serial")
+public final class CompositeProcess extends ChannelListener {
+ private Set<ChannelListener> processes = new HashSet<ChannelListener>();
+
+ public CompositeProcess() {
+ }
+
+ public Method getMethod(String methodName) {
+ // Must call getMethod(String) on each of the getProcesses(). Use instanceof if necessary.
+ throw new IllegalStateException("Calling getMethod() on a CompositeProcess is illegal.");
+ }
+
+ public Set<Method> getImplementedMethods() {
+ // Must call getImplementedMethods on each of the getProcesses(). Use instanceof if necessary.
+ throw new IllegalStateException("Calling getImplementationMethods() on a CompositeProcess is illegal.");
+ }
+
+ public Set<ChannelListener> getProcesses() {
+ return Collections.unmodifiableSet(processes);
+ }
+
+ public CompositeProcess or(ChannelListener process) {
+ processes.add(process);
+ return this;
+ }
+}
Propchange: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/CompositeProcess.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ProcessUtil.java Sat Jan 19 02:23:56 2013
@@ -37,6 +37,11 @@ public final class ProcessUtil {
+ channel == null ? "<null>" : channel.getClass().toString());
}
+ public static CompositeProcess compose(ReceiveProcess<?> process) {
+ CompositeProcess result = new CompositeProcess();
+ return result.or(process);
+ }
+
@SuppressWarnings("serial")
public static <T extends Channel> ChannelListener receive(T proxy, T listener) {
// TODO: NOTE: this *only* works when the listnere doesn't need to be Serialiazble really
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java Sat Jan 19 02:23:56 2013
@@ -30,7 +30,7 @@ public abstract class ReceiveProcess<T e
private transient Channel channel;
private T receiver;
- protected ReceiveProcess(T channel, T receiver) throws IllegalStateException {
+ protected ReceiveProcess(T channel, T receiver) {
assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
"Inheritance in ChannelListener classes not allowed!";
if (channel == null) {
@@ -60,4 +60,18 @@ public abstract class ReceiveProcess<T e
}
return _implementedMethods;
}
+
+ public String toString() {
+ // TODO: needs improvement
+ StringBuffer buf = new StringBuffer(getClassName());
+ buf.append('{');
+ for (Method m : getImplementedMethods()) {
+ buf.append(m.getName());
+ buf.append("()");
+ buf.append(",");
+ }
+ buf.setLength(buf.length()-1);
+ buf.append('}');
+ return buf.toString();
+ }
}
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=1435467&r1=1435466&r2=1435467&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Sat Jan 19 02:23:56 2013
@@ -26,6 +26,7 @@ import java.util.Stack;
import org.apache.ode.jacob.Channel;
import org.apache.ode.jacob.ChannelListener;
+import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.JacobRunnable;
import org.apache.ode.jacob.JacobThread;
@@ -345,14 +346,7 @@ public final class JacobVPU {
CommGroup grp = new CommGroup(replicate);
for (int i = 0; i < ml.length; ++i) {
- if (ml[i] instanceof ReceiveProcess) {
- CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
- ((ReceiveProcess<?>)ml[i]).getChannel());
- // TODO see below..
- // oframe.setDebugInfo(fillDebugInfo());
- CommRecv recv = new CommRecv(chnl, ml[i]);
- grp.add(recv);
- }
+ addCommChannel(grp, ml[i]);
}
_executionQueue.add(grp);
}
@@ -361,6 +355,23 @@ public final class JacobVPU {
object(replicate, new ChannelListener[] { methodList });
}
+ private void addCommChannel(CommGroup group, ChannelListener receiver) {
+ if (receiver instanceof CompositeProcess) {
+ for (ChannelListener r : ((CompositeProcess)receiver).getProcesses()) {
+ addCommChannel(group, r);
+ }
+ } else if (receiver instanceof ReceiveProcess) {
+ CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
+ ((ReceiveProcess<?>)receiver).getChannel());
+ // TODO see below..
+ // oframe.setDebugInfo(fillDebugInfo());
+ CommRecv recv = new CommRecv(chnl, receiver);
+ group.add(recv);
+ } else {
+ throw new IllegalStateException("Don't know how to handle Process type...");
+ }
+ }
+
/* UNUSED
private DebugInfo fillDebugInfo() {
// Some of the debug information is a bit lengthy, so lets not put