You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2006/10/30 16:57:03 UTC
svn commit: r469170 [1/2] - in
/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob: ./
examples/cell/ examples/sequence/ examples/synch/ soup/ vpu/
Author: boisvert
Date: Mon Oct 30 07:57:02 2006
New Revision: 469170
URL: http://svn.apache.org/viewvc?view=rev&rev=469170
Log:
(Mostly) comestic cleanup of JACOB module; removed some dead code as well.
Added:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/Sequence.java
Removed:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ExportedChannel.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Sequence.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommMatch.java
Modified:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Channel.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/DebugInfo.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobRunnable.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobThread.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Synch.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/CELL_.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/Cell.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/synch/SynchPrinter.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Comm.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommGroup.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommSend.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Channel.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Channel.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Channel.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Channel.java Mon Oct 30 07:57:02 2006
@@ -24,5 +24,5 @@
* Interface implemented by channel proxies.
*/
public interface Channel extends Serializable {
- public String export();
+ public String export();
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java Mon Oct 30 07:57:02 2006
@@ -18,134 +18,98 @@
*/
package org.apache.ode.jacob;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
- * Base-class for method-list objects. Method-lists objects should extends
- * this class <em>and</em> implement one <code>Channel</code> interface.
+ * Base-class for method-list objects. Method-lists objects should extends this
+ * class <em>and</em> implement one <code>Channel</code> interface.
*/
public abstract class ChannelListener<CT extends Channel> extends JacobObject {
- private static Log __log = LogFactory.getLog(ChannelListener.class);
+ private static Log __log = LogFactory.getLog(ChannelListener.class);
- private transient Set<Method> _implementedMethods;
- private transient CT _channel;
+ private transient Set<Method> _implementedMethods;
- protected ChannelListener(CT channel)
- throws IllegalStateException {
+ private transient CT _channel;
- if (this.getClass()
- .getSuperclass()
- .getSuperclass() != ChannelListener.class) {
- throw new IllegalStateException("Inheritence in ChannelListener classes not allowed!");
- }
-
- if (channel == null)
- throw new IllegalArgumentException("Null channel!");
- _channel = channel;
-
- }
-
- public CT getChannel() { return _channel; }
-
- public void setChannel(CT channel) { _channel = channel; }
-
- public Set<ChannelListener> or(ChannelListener other) {
- HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
- retval.add(this);
- retval.add(other);
- return retval;
- }
-
- public Set<ChannelListener> or(Set<ChannelListener> other) {
- HashSet<ChannelListener> retval = new HashSet<ChannelListener>(other);
- retval.add(this);
- return retval;
- }
- /**
- * DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public Set<Method> getImplementedMethods() {
- if (_implementedMethods == null) {
- Set<Method> implementedMethods = new HashSet<Method>();
- getImplementedMethods(implementedMethods, getClass().getSuperclass());
- _implementedMethods = Collections.unmodifiableSet(implementedMethods);
- }
-
- return _implementedMethods;
- }
-
- private Set<Method> getImplementedMethods(Set<Method> methods, Class clazz){
- Class[] interfaces = clazz.getInterfaces();
-
- for (int i = 0 ; i < interfaces.length; ++i) {
- if (interfaces[i] != Channel.class) {
- Method[] allmethods = interfaces[i].getDeclaredMethods();
+ protected ChannelListener(CT channel) throws IllegalStateException {
+ if (this.getClass().getSuperclass().getSuperclass() != ChannelListener.class) {
+ throw new IllegalStateException("Inheritence in ChannelListener classes not allowed!");
+ }
+ if (channel == null) {
+ throw new IllegalArgumentException("Null channel!");
+ }
+ _channel = channel;
+ }
+
+ public CT getChannel() {
+ return _channel;
+ }
- for (int j = 0; j < allmethods.length; ++j) {
- methods.add(allmethods[j]);
+ public void setChannel(CT channel) {
+ _channel = channel;
+ }
+
+ public Set<ChannelListener> or(ChannelListener other) {
+ HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
+ retval.add(this);
+ retval.add(other);
+ return retval;
+ }
+
+ public Set<ChannelListener> or(Set<ChannelListener> other) {
+ HashSet<ChannelListener> retval = new HashSet<ChannelListener>(other);
+ retval.add(this);
+ return retval;
+ }
+
+ public Set<Method> getImplementedMethods() {
+ if (_implementedMethods == null) {
+ Set<Method> implementedMethods = new HashSet<Method>();
+ getImplementedMethods(implementedMethods, getClass().getSuperclass());
+ _implementedMethods = Collections.unmodifiableSet(implementedMethods);
+ }
+ return _implementedMethods;
+ }
+
+ private static Set<Method> getImplementedMethods(Set<Method> methods, Class clazz) {
+ Class[] interfaces = clazz.getInterfaces();
+ for (int i=0; i<interfaces.length; ++i) {
+ if (interfaces[i] != Channel.class) {
+ Method[] allmethods = interfaces[i].getDeclaredMethods();
+ for (int j=0; j<allmethods.length; ++j) {
+ methods.add(allmethods[j]);
+ }
+ getImplementedMethods(methods, interfaces[i]);
+ }
}
-
- getImplementedMethods(methods, interfaces[i]);
- }
- }
- return methods;
- }
-
- /**
- * DOCUMENTME
- *
- * @param method DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public Method getMethod(String method) {
- for (Iterator<Method> i = getImplementedMethods().iterator();i.hasNext();) {
- Method meth = i.next();
- if (meth.getName().equals(method)) {
- return meth;
- }
- }
-
- assert !_implementedMethods.contains(method);
-
- throw new IllegalArgumentException("No such method: " + method + " in " + _implementedMethods);
-
- }
-
- /**
- * Get a description of the object for debugging purposes.
- *
- * @return human-readable description.
- */
- public String toString() {
- StringBuffer buf = new StringBuffer(getClassName());
- buf.append('{');
- for (Iterator<Method> i = getImplementedMethods().iterator(); i.hasNext();) {
- Method method = i.next();
- buf.append(method.getName());
- buf.append("()");
-
- if (i.hasNext()) {
- buf.append("&");
- }
- }
-
- buf.append('}');
-
- return buf.toString();
- }
-
- protected Log log() {
- return __log;
- }
+ return methods;
+ }
+
+ /**
+ * Get a description of the object for debugging purposes.
+ *
+ * @return human-readable description.
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer(getClassName());
+ buf.append('{');
+ for (Method m : getImplementedMethods()) {
+ buf.append(m.getName());
+ buf.append("()");
+ buf.append("&");
+ }
+ buf.setLength(buf.length()-1);
+ buf.append('}');
+ return buf.toString();
+ }
+
+ protected Log log() {
+ return __log;
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/DebugInfo.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/DebugInfo.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/DebugInfo.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/DebugInfo.java Mon Oct 30 07:57:02 2006
@@ -21,40 +21,40 @@
import java.io.PrintStream;
import java.io.Serializable;
-
/**
* Standard debug information for channels, objects (channel reads), and
* messages (channel writes).
- *
+ *
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public class DebugInfo implements Serializable {
- /** Stringified representation of the instance. */
- private String _creator = "unknown";
+ /** Stringified representation of the instance. */
+ private String _creator = "unknown";
- /** Stack trace */
- private StackTraceElement[] _stackTrace = new StackTraceElement[0];
+ /** Stack trace */
+ private StackTraceElement[] _stackTrace = new StackTraceElement[0];
- public void setCreator(String creator) {
- _creator = creator;
- }
-
- public String getCreator() {
- return _creator;
- }
-
- public void setLocation(StackTraceElement[] location) {
- _stackTrace = location;
- }
-
- public StackTraceElement[] getLocation() {
- return _stackTrace;
- }
-
- public void printStackTrace(PrintStream pw) {
- pw.println(_creator);
-
- for (int i = 0; i < _stackTrace.length; i++)
- pw.println("\tat " + _stackTrace[i]);
- }
+ public void setCreator(String creator) {
+ _creator = creator;
+ }
+
+ public String getCreator() {
+ return _creator;
+ }
+
+ public void setLocation(StackTraceElement[] location) {
+ _stackTrace = location;
+ }
+
+ public StackTraceElement[] getLocation() {
+ return _stackTrace;
+ }
+
+ public void printStackTrace(PrintStream pw) {
+ pw.println(_creator);
+
+ for (int i=0; i<_stackTrace.length; i++) {
+ pw.println("\tat " + _stackTrace[i]);
+ }
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java Mon Oct 30 07:57:02 2006
@@ -18,160 +18,123 @@
*/
package org.apache.ode.jacob;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.jacob.vpu.JacobVPU;
-
import java.io.Serializable;
import java.lang.reflect.Method;
-import java.util.Iterator;
import java.util.Set;
+import org.apache.ode.jacob.vpu.JacobVPU;
/**
- * Base class for constructs which rely on a Java method body to represent
- * some aspect of the process.
+ * Base class for constructs which rely on a Java method body to represent some
+ * aspect of the process.
*/
public abstract class JacobObject implements Serializable {
- private static final Log __log = LogFactory.getLog(JacobObject.class);
+ public abstract Set<Method> getImplementedMethods();
+
+ /**
+ * Get the unadorned (no package) name of this class.
+ */
+ protected String getClassName() {
+ return getClassName(getClass());
+ }
+
+ /**
+ * Get the unadorned (no package) name of a class.
+ */
+ protected static String getClassName(Class clazz) {
+ String className = clazz.getName();
+ return (className.indexOf('.') == -1) ? className : className.substring(clazz.getPackage().getName().length() + 1);
+ }
+
+ protected static Object getExtension(Class extensionClass) {
+ return JacobVPU.activeJacobThread().getExtension(extensionClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <T extends Channel> T importChannel(String channelId, Class<T> channelClass) {
+ return (T) JacobVPU.activeJacobThread().importChannel(channelId, channelClass);
+ }
+
+ /**
+ * Instantiation; the Java code <code>instance(new F(x,y,z))</code> is
+ * equivalent to <code>F(x,y,z)</code> in the process calculus.
+ *
+ * @param concretion the concretion of a process template
+ */
+ protected static void instance(JacobRunnable concretion) {
+ JacobVPU.activeJacobThread().instance(concretion);
+ }
+
+ protected <T extends Channel> T newChannel(Class<T> channelType)
+ throws IllegalArgumentException
+ {
+ return newChannel(channelType, null);
+ }
+
+ /**
+ * Channel creation; the Java code <code>Channel x = newChannel(XChannel.class) ...</code>
+ * is equivalent to <code>(new x) ... </code> in the process calculus.
+ */
+ @SuppressWarnings("unchecked")
+ protected <T extends Channel> T newChannel(Class<T> channelType, String description)
+ throws IllegalArgumentException
+ {
+ return (T) JacobVPU.activeJacobThread().newChannel(channelType, toString(), description);
+ }
+
+ /**
+ * Object; the Java code "object(x, ChannelListener)" is equivalent to
+ * <code>x ? ChannelListener</code> in the process algebra.
+ *
+ * @param methodList method list for the communication reduction
+ * @see JacobThread#object
+ */
+ protected static <T extends Channel> T object(ChannelListener<T> methodList) {
+ JacobVPU.activeJacobThread().object(false, methodList);
+ return methodList.getChannel();
+ }
+
+ protected static void object(boolean replication, ChannelListener methodList) {
+ JacobVPU.activeJacobThread().object(replication, methodList);
+ }
- public abstract Set getImplementedMethods();
+ protected static void object(boolean replication, ChannelListener[] methodLists) {
+ JacobVPU.activeJacobThread().object(replication, methodLists);
+ }
+
+ protected static void object(boolean replication, Set<ChannelListener> methodLists) {
+ JacobVPU.activeJacobThread().object(replication,
+ methodLists.toArray(new ChannelListener[methodLists.size()]));
+ }
- /**
- * @see Object#toString
- */
- public String toString() {
- StringBuffer buf = new StringBuffer("<JMB:");
- buf.append(getClassName());
- buf.append(">");
-
- return buf.toString();
- }
-
- /**
- * Get the unadorned (no package) name of this class.
- *
- * @return name of this class sans package prefix
- */
- protected String getClassName() {
- return getClassName(getClass());
- }
-
- /**
- * Get the unadorned (no package) name of a class.
- *
- * @param clazz class whose name should be returned
- *
- * @return name of the passed in calss sans package prefix
- */
- protected static String getClassName(Class clazz) {
- String className = clazz.getName();
-
- return (className.indexOf('.') == -1)
- ? className
- : className.substring(clazz.getPackage().getName().length() + 1);
- }
-
- protected Object getExtension(Class extensionClass) {
- return JacobVPU.activeJacobThread().getExtension(extensionClass);
- }
-
- @SuppressWarnings("unchecked")
- protected <T extends Channel> T importChannel(String channelId, Class<T> channelClass) {
- return (T) JacobVPU.activeJacobThread().importChannel(channelId, channelClass);
- }
-
- /**
- * Instantiation; the Java code <code>instance(new F(x,y,z))</code> is
- * equivalent to <code>F(x,y,z)</code> in the process calculus.
- *
- * @param concretion the concretion of a process template
- */
- protected void instance(JacobRunnable concretion) {
- JacobVPU.activeJacobThread().instance(concretion);
- }
-
- /**
- * Get the logger for this class.
- *
- * @return static class-level {@link Log} object
- */
- protected Log log() {
- return __log;
- }
-
- protected <T extends Channel> T newChannel(Class<T> channelType) throws IllegalArgumentException {
- return newChannel(channelType, null);
-
- }
- /**
- * Channel creation; the Java code <code>Channel x =
- * newChannel(XChannel.class) ...</code> is equivalent to <code>(new x) ...
- * </code> in the process calculus.
- *
- * @param channelType
- * @param description DOCUMENTME
- *
- * @return
- *
- * @throws IllegalArgumentException
- */
- @SuppressWarnings("unchecked")
- protected <T extends Channel> T newChannel(Class<T> channelType, String description)
- throws IllegalArgumentException {
- return (T) JacobVPU.activeJacobThread().newChannel(channelType, toString(), description);
- }
-
- /**
- * Object; the Java code "object(x, ChannelListener)" is equivalent to <code>x ?
- * ChannelListener</code> in the process algebra.
- *
- * @param methodList method list for the communication reduction
- *
- * @see JacobThread#object
- */
- protected <T extends Channel> T object(ChannelListener<T> methodList) {
- JacobVPU.activeJacobThread().object(false, methodList);
- return methodList.getChannel();
- }
-
- protected void object(boolean replication, ChannelListener methodList) {
- JacobVPU.activeJacobThread().object(replication, methodList);
- }
-
- protected void object(boolean replication, ChannelListener[] methodLists) {
- JacobVPU.activeJacobThread().object(replication, methodLists);
- }
-
- protected void object(boolean replication, Set<ChannelListener> methodLists) {
- JacobVPU.activeJacobThread().object(replication, methodLists.toArray(new ChannelListener[methodLists.size()]));
- }
-
- protected <T extends Channel> T replication(ChannelListener<T> methodList) {
- JacobVPU.activeJacobThread().object(true, methodList);
- return methodList.getChannel();
- }
-
- /**
- * Obtain a replicated channel broadcaster.
- * @param channel target channel
- * @return replicated channel broadcaster
- */
- protected <T extends Channel> T replication(T channel) {
- // TODO: we should create a replicated wrapper here.
- return channel;
- }
-
- public Method getMethod(Object methodName) {
- Set implementedMethods = getImplementedMethods();
- for (Iterator i = implementedMethods.iterator(); i.hasNext();) {
- Method method = (Method) i.next();
+ protected static <T extends Channel> T replication(ChannelListener<T> methodList) {
+ JacobVPU.activeJacobThread().object(true, methodList);
+ return methodList.getChannel();
+ }
- if (method.getName().equals(methodName))
- return method;
+ /**
+ * Obtain a replicated channel broadcaster.
+ *
+ * @param channel target channel
+ * @return replicated channel broadcaster
+ */
+ protected static <T extends Channel> T replication(T channel) {
+ // TODO: we should create a replicated wrapper here.
+ return channel;
}
- throw new IllegalArgumentException("No such method \"" + methodName + "\"!");
- }
+ public Method getMethod(String methodName) {
+ Set<Method> implementedMethods = getImplementedMethods();
+ for (Method m : implementedMethods) {
+ if (m.getName().equals(methodName)) {
+ return m;
+ }
+ }
+ throw new IllegalArgumentException("No such method \"" + methodName + "\"!");
+ }
+
+ public String toString() {
+ return "<JacobObject:" + getClassName() + ">";
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobRunnable.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobRunnable.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobRunnable.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobRunnable.java Mon Oct 30 07:57:02 2006
@@ -19,101 +19,81 @@
package org.apache.ode.jacob;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.ArrayUtils;
-
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;
+import org.apache.ode.utils.ArrayUtils;
/**
* Base class for process abstractions. An abstraction is a parameterized
* process template, whose instantiation is termed a <em>concretion</em>.
* Abstractions may define a set of bound channel names or other parameters
- * which are resolved at the time of the concretion. For example the
- * process term abstraction of a memory cell:
- * <code>Cell(s,v) := s ? { read(...) = ... & write(...) = ... }</code>
- * would be represented by the following Java class:
- * <code>
+ * which are resolved at the time of the concretion. For example the process
+ * term abstraction of a memory cell:
+ * <code>Cell(s,v) := s ? { read(...) = ... & write(...) = ... }</code> would
+ * be represented by the following Java class: <code>
* <pre>
* public class Cell extends JacobRunnable {
- * private CellChannel s;
- * private Object v;
- * public Cell(CellChannel s, Object v) {
- * this.s = s;
- * this.v = v;
- * }
- * public void run() {
- * object(new CellChannelListener(s) { read(...) {...}
- * write(...) {...} } );
- * }
+ * private CellChannel s;
+ *
+ * private Object v;
+ *
+ * public Cell(CellChannel s, Object v) {
+ * this.s = s;
+ * this.v = v;
+ * }
+ *
+ * public void run() {
+ * object(new CellChannelListener(s) { read(...) {...}
+ * write(...) {...} } );
+ * }
* }
* </pre>
- * </code>
- * An example of the Java expression representing the concretion of this abstraction
- * would look like:
- * <code>
+ * </code> An example of the Java expression representing the concretion of this
+ * abstraction would look like: <code>
* <pre>
- * .
- * .
- * // (new c) Cell(c,v)
- * Integer v = Integer.valueOf(0);
- * CellChannel c = (CellChannel)newChannel(CellChanell.class);
- * instance(new Cell(c, v));
- * .
- * .
+ * .
+ * .
+ * // (new c) Cell(c,v)
+ * Integer v = Integer.valueOf(0);
+ * CellChannel c = (CellChannel)newChannel(CellChannel.class);
+ * instance(new Cell(c, v));
+ * .
+ * .
* </pre>
* </code>
- *
+ *
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com" />
*/
public abstract class JacobRunnable extends JacobObject {
- private static final Log __log = LogFactory.getLog(JacobRunnable.class);
+ private static final Set<Method> IMPLEMENTED_METHODS;
+
+ static {
+ try {
+ Method m = JacobRunnable.class.getMethod("run", ArrayUtils.EMPTY_CLASS_ARRAY);
+ IMPLEMENTED_METHODS = Collections.singleton(m);
+ } catch (NoSuchMethodException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ public Set<Method> getImplementedMethods() {
+ return IMPLEMENTED_METHODS;
+ }
+
+ /**
+ * Peform the template reduction, i.e. do whatever it is that the
+ * templetized process does. This method may do some combination of in-line
+ * Java, and JACOB operations.
+ * <p>
+ * <em>Note that JACOB operations are performed in parallel, so the
+ * sequencing of JACOB operations is irrelevant</em>
+ */
+ public abstract void run();
- private static final Set<Method> IMPLEMENTED_METHODS;
- static {
- try {
- IMPLEMENTED_METHODS = Collections.singleton(JacobRunnable.class.getMethod("run", ArrayUtils.EMPTY_CLASS_ARRAY));
- } catch (NoSuchMethodException e) {
- throw new AssertionError(e);
+ public String toString() {
+ return getClassName() + "(...)";
}
- }
- public Set<Method> getImplementedMethods() {
- return IMPLEMENTED_METHODS;
- }
-
- /**
- * <p>
- * Peform the template reduction, i.e. do whatever it is that the
- * templetized process does. This method may do some combination of in-line
- * Java, and JACOB operations.
- * </p>
- *
- * <p>
- * <em>Note that JACOB operations are performed in parallel, so the
- * sequencing of JACOB operations is irrelevant</em>
- * </p>
- */
- public abstract void run();
-
- /**
- * Pretty print.
- * @see Object#toString
- */
- public String toString() {
- StringBuffer buf = new StringBuffer(getClassName());
- buf.append("(...)");
-
- return buf.toString();
- }
-
- /**
- * @see JacobObject#log
- */
- protected Log log() {
- return __log;
- }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobThread.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobThread.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobThread.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobThread.java Mon Oct 30 07:57:02 2006
@@ -20,98 +20,74 @@
import java.lang.reflect.Method;
-
/**
- * Class exposing the JACOB operations. Note: these operations represent a
- * subset of a process algebra mapped into Java invocations; other aspects of
- * the syntax are represented natively in Java. In particular, parallel
- * composition is modelled as sequential Java invocation and if-else are
- * modelled using Java's if-else statement. Note also that the scoping rules
- * for channel names are simply the Java object visibility rules.
+ * Class exposing the JACOB operations.
+ * <p>
+ * Note: these operations represent a subset of a process algebra mapped into
+ * Java invocations; other aspects of the syntax are represented natively in Java.
+ * In particular, parallel composition is modelled as sequential Java invocation
+ * and if-else are modelled using Java's if-else statement. Note also that the
+ * scoping rules for channel names are simply the Java object visibility rules.
*/
public interface JacobThread {
- /**
- * DOCUMENTME
- *
- * @param extensionClass DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public Object getExtension(Class extensionClass);
-
- /**
- * DOCUMENTME
- *
- * @param channel DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public String exportChannel(Channel channel);
-
- /**
- * DOCUMENTME
- *
- * @param channelId DOCUMENTME
- * @param channelClass DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public Channel importChannel(String channelId, Class channelClass);
-
- /**
- * Create a process instance i.e. a concretion of a process abstraction.
- *
- * @param concretion new concretion of some abstraction
- */
- public void instance(JacobRunnable concretion);
-
- /**
- * Send a message (object invocation). This method shouldn't really be used
- * as {@link Channel} objects may be used as proxies in this respect.
- *
- * @param channel channel on which to send the message
- * @param method method to apply
- * @param args arguments
- */
- public Channel message(Channel channel, Method method, Object[] args);
-
- /**
- * Create a new (fresh) channel.
- *
- * @param channelType type of channel to create
- *
- * @return newly created channel
- */
- public Channel newChannel(Class channelType, String creator,
- String description);
-
- /**
- * <p>
- * Receive a message on a channel, allowing for possible replication. The
- * effect of this method is to register a listener (the method list) for a
- * message on the channel to consume either one or an infinate number of
- * messages on the channel (depending on the value of the
- * <code>replicate</code> argument.
- * </p>
- *
- * <p>
- * With respect to process terms, the Java expression <code>object(false, x,
- * ChannelListener)</code> corresponds to the process term <code> x ? { ChannelListener }</code>; if
- * in the same expression the initial <code>replicate</code> parameter were
- * instead set to <code>true</code>, corresponding term would be <code> ! x
- * ? { ChannelListener }</code>.
- * </p>
- *
- * @param replicate if set the a replication operator is present
- * @param methodList object representation of the method list
- *
- * @throws IllegalArgumentException if the method list does not match the
- * channel kind
- */
- public void object(boolean replicate, ChannelListener methodList)
- throws IllegalArgumentException;
+ public Object getExtension(Class extensionClass);
+
+ public String exportChannel(Channel channel);
+
+ public Channel importChannel(String channelId, Class channelClass);
+ /**
+ * Create a process instance i.e. a concretion of a process abstraction.
+ */
+ public void instance(JacobRunnable concretion);
+
+ /**
+ * Send a message (object invocation). This method shouldn't really be used
+ * as {@link Channel} objects may be used as proxies in this respect.
+ *
+ * @param channel
+ * channel on which to send the message
+ * @param method
+ * method to apply
+ * @param args
+ * arguments
+ */
+ public Channel message(Channel channel, Method method, Object[] args);
+
+ /**
+ * Create a new channel.
+ */
+ public Channel newChannel(Class channelType, String creator,
+ String description);
+
+ /**
+ * <p>
+ * Receive a message on a channel, allowing for possible replication. The
+ * effect of this method is to register a listener (the method list) for a
+ * message on the channel to consume either one or an infinate number of
+ * messages on the channel (depending on the value of the
+ * <code>replicate</code> argument.
+ * </p>
+ *
+ * <p>
+ * With respect to process terms, the Java expression <code>object(false, x,
+ * ChannelListener)</code>
+ * corresponds to the process term <code> x ? { ChannelListener }</code>;
+ * if in the same expression the initial <code>replicate</code> parameter
+ * were instead set to <code>true</code>, corresponding term would be
+ * <code> ! x ? { ChannelListener }</code>.
+ * </p>
+ *
+ * @param replicate
+ * if set the a replication operator is present
+ * @param methodList
+ * object representation of the method list
+ * @throws IllegalArgumentException
+ * if the method list does not match the channel kind
+ */
+ public void object(boolean replicate, ChannelListener methodList)
+ throws IllegalArgumentException;
- public void object(boolean reaplicate, ChannelListener[] methodLists)
- throws IllegalArgumentException;
+ public void object(boolean reaplicate, ChannelListener[] methodLists)
+ throws IllegalArgumentException;
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Synch.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Synch.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Synch.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/Synch.java Mon Oct 30 07:57:02 2006
@@ -19,12 +19,13 @@
package org.apache.ode.jacob;
/**
- * DOCUMENTME.
- * <p>Created on Mar 4, 2004 at 11:18:56 AM.</p>
- *
- * @jacob.kind
+ * Synch represents a synchronous invocation callback notification.
+ * <p>
+ * It is the only allowable return type (other than "void") for JACOB objects.
+ *
+ * @jacob.kind
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
-public interface Synch {
- public void ret();
+public interface Synch {
+ public void ret();
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/CELL_.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/CELL_.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/CELL_.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/CELL_.java Mon Oct 30 07:57:02 2006
@@ -21,54 +21,48 @@
import org.apache.ode.jacob.JacobRunnable;
import org.apache.ode.jacob.Val;
-
/**
* Cell process template Java representation. This class is equivalent to the
- * following process calculus expression: <code> Cell(self, val) = self ? [
- * read(r) = { Cell(self, val) | r ! val(val) } & write(newVal) = {
- * Cell(self, newVal) } ] </code>
+ * following process calculus expression:
+ * <code>
+ * Cell(self, val) = self ? [ read(r) = { Cell(self, val) | r ! val(val) } & write(newVal) = { Cell(self, newVal) } ]
+ * </code>
*/
public class CELL_<T> extends JacobRunnable {
- private CellChannel _self;
- private T _val;
+ private CellChannel _self;
+
+ private T _val;
- public CELL_(CellChannel self, T val) {
- _self = self;
- _val = val;
- }
-
- public void run() {
- // INSTANTIATION{Cell(run,val)}
- // ==> run ? [ read(r)={...} & write(newVal)={...} ]
- object(new CellChannelListener(_self) {
- private static final long serialVersionUID = 8883128084307471572L;
-
- public void read(Val r) {
- // COMMUNICATION{x & [read... & ... ] | x ! read}
- // ==> Cell(run, val) ...
- instance(new CELL_<T>(_self, _val));
-
- // ... | r ! val(val)
- r.val(_val);
-
- // Note: sequential Java above translates to parallel proc calc expression!
- }
-
- @SuppressWarnings("unchecked")
- public void write(Object newVal) {
- // COMMUNICATION{x & [... & write...]
- // ==> Cell(run, newVal)
- instance(new CELL_(_self, newVal));
- }
- });
- }
-
- /**
- * DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public String toString() {
- return "CellProcess[self=" + _self + ", val=" + _val + "]";
- }
+ public CELL_(CellChannel self, T val) {
+ _self = self;
+ _val = val;
+ }
+
+ public void run() {
+ // INSTANTIATION{Cell(run,val)}
+ // ==> run ? [ read(r)={...} & write(newVal)={...} ]
+ object(new CellChannelListener(_self) {
+ private static final long serialVersionUID = 8883128084307471572L;
+
+ public void read(Val r) {
+ // COMMUNICATION{x & [read... & ... ] | x ! read} ==> Cell(run, val) ...
+ instance(new CELL_<T>(_self, _val));
+
+ // ... | r ! val(val)
+ r.val(_val);
+
+ // Note: sequential Java above translates to parallel proc calc expression!
+ }
+
+ @SuppressWarnings("unchecked")
+ public void write(Object newVal) {
+ // COMMUNICATION{x & [... & write...] ==> Cell(run, newVal)
+ instance(new CELL_(_self, newVal));
+ }
+ });
+ }
+
+ public String toString() {
+ return "CellProcess[self=" + _self + ", val=" + _val + "]";
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/Cell.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/Cell.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/Cell.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/cell/Cell.java Mon Oct 30 07:57:02 2006
@@ -21,8 +21,8 @@
import org.apache.ode.jacob.Val;
/**
- * Channel type for a cell. The channel allows reading of and setting the
- * values of a cell.
+ * Channel type for a cell. The channel allows reading of and setting the values of a cell.
+ *
* @jacob.kind
*/
public interface Cell {
Added: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/Sequence.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/Sequence.java?view=auto&rev=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/Sequence.java (added)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/sequence/Sequence.java Mon Oct 30 07:57:02 2006
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.examples.sequence;
+
+import org.apache.ode.jacob.JacobRunnable;
+import org.apache.ode.jacob.SynchChannel;
+import org.apache.ode.jacob.SynchChannelListener;
+
+/**
+ * Abstract process that executes a number of steps sequentially.
+ */
+public abstract class Sequence extends JacobRunnable {
+ private int _steps;
+ private int _current;
+ private SynchChannel _done;
+
+ /**
+ * Create a {@link Sequence} with a number of steps.
+ *
+ * @param steps number of steps
+ * @param done synchronous callback
+ */
+ public Sequence(int steps, SynchChannel done) {
+ _steps = steps;
+ _current = 0;
+ _done = done;
+ }
+
+ /**
+ * Process execution block
+ */
+ public void run() {
+ if (_current >= _steps) {
+ if (_done != null) {
+ _done.ret();
+ }
+ } else {
+ SynchChannel r = newChannel(SynchChannel.class);
+ object(new SynchChannelListener(r) {
+ private static final long serialVersionUID = -6999108928780639603L;
+
+ public void ret() {
+ ++_current;
+ instance(Sequence.this);
+ }
+ });
+ instance(doStep(_current, r));
+ }
+ }
+
+ /**
+ * Execute a step
+ * @param step step number
+ * @param done notification after step completion
+ * @return runnable process
+ */
+ protected abstract JacobRunnable doStep(int step, SynchChannel done);
+}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/synch/SynchPrinter.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/synch/SynchPrinter.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/synch/SynchPrinter.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/examples/synch/SynchPrinter.java Mon Oct 30 07:57:02 2006
@@ -25,60 +25,61 @@
import org.apache.ode.jacob.vpu.JacobVPU;
/**
- * DOCUMENTME.
- * <p>Created on Mar 4, 2004 at 4:22:05 PM.</p>
+ * Example JACOB process illustrating the use of {@link SynchPrint}
*
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
-public class SynchPrinter {
+public class SynchPrinter {
- public static final class SystemPrinter extends JacobRunnable {
- private static final long serialVersionUID = -8516348116865575605L;
-
- private SynchPrintChannel self;
- public SystemPrinter(SynchPrintChannel self) {
- this.self = self;
- }
- public void run() {
- object(true, new SynchPrintChannelListener(self) {
- private static final long serialVersionUID = -1990741944766989782L;
-
- public SynchChannel print(String msg) {
- System.out.println(msg);
- return null;
+ public static final class SystemPrinter extends JacobRunnable {
+ private static final long serialVersionUID = -8516348116865575605L;
+
+ private SynchPrintChannel _self;
+
+ public SystemPrinter(SynchPrintChannel self) {
+ _self = self;
+ }
+
+ public void run() {
+ object(true, new SynchPrintChannelListener(_self) {
+ private static final long serialVersionUID = -1990741944766989782L;
+
+ public SynchChannel print(String msg) {
+ System.out.println(msg);
+ return null; // SynchChannel automatically created by JacobVPU
+ }
+ });
}
- });
}
- }
- public static final class Tester extends JacobRunnable {
- private static final long serialVersionUID = 7899682832271627464L;
+ public static final class Tester extends JacobRunnable {
+ private static final long serialVersionUID = 7899682832271627464L;
- public void run() {
- final SynchPrintChannel p = newChannel(SynchPrintChannel.class);
- instance(new SystemPrinter(p));
- object(new SynchChannelListener(p.print("1")) {
- public void ret() {
- object(new SynchChannelListener(p.print("2")) {
- public void ret() {
- object(new SynchChannelListener(p.print("3")) {
+ public void run() {
+ final SynchPrintChannel p = newChannel(SynchPrintChannel.class);
+ instance(new SystemPrinter(p));
+ object(new SynchChannelListener(p.print("1")) {
public void ret() {
+ object(new SynchChannelListener(p.print("2")) {
+ public void ret() {
+ object(new SynchChannelListener(p.print("3")) {
+ public void ret() {
+ }
+ });
+ }
+ });
}
- });
- }
- });
+ });
}
- });
}
- }
- public static void main(String args[]) {
- JacobVPU vpu = new JacobVPU();
- vpu.setContext(new ExecutionQueueImpl(null));
- vpu.inject(new Tester());
- while (vpu.execute()) {
- // run
+ public static void main(String args[]) {
+ JacobVPU vpu = new JacobVPU();
+ vpu.setContext(new ExecutionQueueImpl(null));
+ vpu.inject(new Tester());
+ while (vpu.execute()) {
+ // run
+ }
}
- }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Comm.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Comm.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Comm.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Comm.java Mon Oct 30 07:57:02 2006
@@ -22,41 +22,45 @@
/**
* DOCUMENTME.
- * <p>Created on Feb 16, 2004 at 8:44:27 PM.</p>
+ * <p>
+ * Created on Feb 16, 2004 at 8:44:27 PM.
+ * </p>
*
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
-public abstract class Comm extends ExecutionQueueObject {
- private CommChannel _channel;
- private CommGroup _group;
-
- protected Comm() {}
-
- protected Comm(CommGroup group, CommChannel chnl) {
- _group = group;
- _channel = chnl;
- }
-
-
- public CommChannel getChannel() {
- return _channel;
- }
-
- public void setChannel(CommChannel channel) {
- _channel = channel;
- }
-
- public CommGroup getGroup() {
- return _group;
- }
-
- public void setGroup(CommGroup group) {
- if (_group != null)
- throw new IllegalStateException("Attempted to call setGroup() twice!");
- _group = group;
- }
-
- public String toString() {
- return ObjectPrinter.toString(this, new Object[] { "chnl", _channel, "group", _group });
- }
+public abstract class Comm extends ExecutionQueueObject {
+ private CommChannel _channel;
+
+ private CommGroup _group;
+
+ protected Comm() {
+ }
+
+ protected Comm(CommGroup group, CommChannel chnl) {
+ _group = group;
+ _channel = chnl;
+ }
+
+ public CommChannel getChannel() {
+ return _channel;
+ }
+
+ public void setChannel(CommChannel channel) {
+ _channel = channel;
+ }
+
+ public CommGroup getGroup() {
+ return _group;
+ }
+
+ public void setGroup(CommGroup group) {
+ if (_group != null) {
+ throw new IllegalStateException("Attempted to call setGroup() twice!");
+ }
+ _group = group;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[] { "chnl", _channel, "group", _group });
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommChannel.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommChannel.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommChannel.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommChannel.java Mon Oct 30 07:57:02 2006
@@ -21,42 +21,30 @@
import org.apache.ode.utils.ObjectPrinter;
/**
- *
* DOCUMENTME.
*
- * <p>
- * Created on Feb 16, 2004 at 9:48:47 PM.
- * </p>
- *
- *
- *
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
- *
*/
public class CommChannel extends ExecutionQueueObject {
- private Class _type;
+ private Class _type;
- public CommChannel(Class type) {
- _type = type;
- }
-
- public Class getType() {
- return _type;
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer(ObjectPrinter.getShortClassName(_type));
- if (getDescription() != null) {
- buf.append(':');
- buf.append(getDescription());
+ public CommChannel(Class type) {
+ _type = type;
}
- buf.append('#');
- buf.append(getId());
- return buf.toString();
- }
+ public Class getType() {
+ return _type;
+ }
-}
+ public String toString() {
+ StringBuffer buf = new StringBuffer(ObjectPrinter.getShortClassName(_type));
+ if (getDescription() != null) {
+ buf.append(':').append(getDescription());
+ }
+ buf.append('#').append(getId());
+ return buf.toString();
+ }
+}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommGroup.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommGroup.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommGroup.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommGroup.java Mon Oct 30 07:57:02 2006
@@ -36,44 +36,44 @@
public class CommGroup extends ExecutionQueueObject {
- boolean _isReplicated;
- List<Comm> _comms = new ArrayList<Comm>();
+ boolean _isReplicated;
- public CommGroup(boolean replicated) {
- _isReplicated = replicated;
- }
-
- /**
- * Read the value of the replication operator flag. CommRecv (channel reads)
- * with the replication flag set are left in the queue indefinately.
- *
- * @return true or false
- */
- public boolean isReplicated() {
- return _isReplicated;
- }
-
- public void add(Comm comm) {
- comm.setGroup(this);
- _comms.add(comm);
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer();
- for (Iterator<Comm> i = _comms.iterator(); i.hasNext();) {
- buf.append(i.next());
- if (i.hasNext()) buf.append(" + ");
- }
-
- return buf.toString();
- }
-
- public String getDescription() {
- return toString();
- }
-
- public Iterator<Comm> getElements() {
- return _comms.iterator();
- }
+ List<Comm> _comms = new ArrayList<Comm>();
+
+ public CommGroup(boolean replicated) {
+ _isReplicated = replicated;
+ }
+
+ /**
+ * Read the value of the replication operator flag. CommRecv (channel reads)
+ * with the replication flag set are left in the queue indefinately.
+ *
+ * @return true or false
+ */
+ public boolean isReplicated() {
+ return _isReplicated;
+ }
+
+ public void add(Comm comm) {
+ comm.setGroup(this);
+ _comms.add(comm);
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ for (Comm c : _comms) {
+ buf.append(c).append(" + ");
+ }
+ buf.setLength(buf.length()-1);
+ return buf.toString();
+ }
+
+ public String getDescription() {
+ return toString();
+ }
+
+ public Iterator<Comm> getElements() {
+ return _comms.iterator();
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommRecv.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommRecv.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommRecv.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommRecv.java Mon Oct 30 07:57:02 2006
@@ -21,38 +21,40 @@
import org.apache.ode.jacob.ChannelListener;
/**
- * Persistent store representation of an object (i.e. channel read) waiting
- * for a message (i.e. channel write / method application). This class
- * maintains an opaque byte array which specifies the continuation (the exact
- * nature of this data is determined by the main JACOB VPU), as well as
- * information regarding which methods are supported by the object, and
- * whether the read is of a replicated variety.
+ * Persistent store representation of an object (i.e. channel read) waiting for
+ * a message (i.e. channel write / method application). This class maintains an
+ * opaque byte array which specifies the continuation (the exact nature of this
+ * data is determined by the main JACOB VPU), as well as information regarding
+ * which methods are supported by the object, and whether the read is of a
+ * replicated variety.
*/
public class CommRecv extends Comm {
- private ChannelListener _continuation;
+ private ChannelListener _continuation;
- protected CommRecv() {}
+ protected CommRecv() {
+ }
- public CommRecv(CommChannel chnl, ChannelListener continuation) {
- super(null, chnl);
- _continuation = continuation;
- }
+ public CommRecv(CommChannel chnl, ChannelListener continuation) {
+ super(null, chnl);
+ _continuation = continuation;
+ }
- /**
- * Get the continuation for this object (channel read). The continuation is
- * what happens after a message is matched to the object. It is up to the
- * JACOB VPU to determine what is placed here, but it will generally
- * consist of some serialized representation of an appropriate ChannelListener object
- * (see {@link ChannelListener}.
- *
- * @return byte array representing the serialized form of the continuation
- */
- public ChannelListener getContinuation() { return _continuation ;}
+ /**
+ * Get the continuation for this object (channel read). The continuation is
+ * what happens after a message is matched to the object. It is up to the
+ * JACOB VPU to determine what is placed here, but it will generally consist
+ * of some serialized representation of an appropriate ChannelListener
+ * object (see {@link ChannelListener}.
+ *
+ * @return byte array representing the serialized form of the continuation
+ */
+ public ChannelListener getContinuation() {
+ return _continuation;
+ }
- public String toString() {
- StringBuffer buf = new StringBuffer(getChannel().toString());
- buf.append(" ? ");
- buf.append(_continuation.toString());
- return buf.toString();
- }
+ public String toString() {
+ StringBuffer buf = new StringBuffer(getChannel().toString());
+ buf.append(" ? ").append(_continuation.toString());
+ return buf.toString();
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommSend.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommSend.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommSend.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/CommSend.java Mon Oct 30 07:57:02 2006
@@ -22,49 +22,47 @@
/**
* Persistent store representation of a message (i.e. method application /
- * channel write) waiting for a corresponding object (i.e. channel read).
- * This structure consists of a label identifying the method that should be
- * applied to the object once it is available, and the arguments that should
- * be applied to said method.
- *
+ * channel write) waiting for a corresponding object (i.e. channel read). This
+ * structure consists of a label identifying the method that should be applied
+ * to the object once it is available, and the arguments that should be applied
+ * to said method.
+ *
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public class CommSend extends Comm {
- private Object[] _args;
- private Method _method;
+ private Object[] _args;
- protected CommSend() {}
+ private Method _method;
- public CommSend(CommChannel chnl, Method method, Object[] args) {
- super(null, chnl);
- _args = args;
- _method = method;
- }
-
- public Method getMethod() {
- return _method;
- }
-
- /**
- * Get the arguments for the method application.
- *
- * @return array of arguments that should be applied to the method
- */
- public Object[] getArgs() {
- return _args;
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer(getChannel().toString());
- buf.append(" ! ");
- buf.append(_method.getName());
- buf.append('(');
- for (int i = 0; _args != null && i < _args.length; ++i) {
- if (i != 0) buf.append(',');
- buf.append(_args[i]);
+ protected CommSend() {
+ }
+
+ public CommSend(CommChannel chnl, Method method, Object[] args) {
+ super(null, chnl);
+ _args = args;
+ _method = method;
+ }
+
+ public Method getMethod() {
+ return _method;
+ }
+
+ /**
+ * Get the arguments for the method application.
+ */
+ public Object[] getArgs() {
+ return _args;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer(getChannel().toString());
+ buf.append(" ! ").append(_method.getName()).append('(');
+ for (int i = 0; _args != null && i < _args.length; ++i) {
+ if (i != 0) buf.append(',');
+ buf.append(_args[i]);
+ }
+ buf.append(')');
+ return buf.toString();
}
- buf.append(')');
- return buf.toString();
- }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java Mon Oct 30 07:57:02 2006
@@ -20,41 +20,47 @@
/**
* Base class for items we find in the {@link ExecutionQueue}.
- * <p>Created on Feb 17, 2004 at 3:44:24 PM.</p>
*
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
public class ExecutionQueueObject {
- /** A unique idefntifer for this object in the soup (should only be set by soup). */
- private Object _id;
-
- /** A human-readable description of the object. */
- private String _description;
-
- public String getDescription() {
- return _description;
- }
-
- public void setDescription(String description) {
- if (_description != null)
- throw new IllegalStateException("Description already set for " + this);
- _description = description;
- }
-
- public void setId(Object id) {
- if (_id != null)
- throw new IllegalStateException("Object id already set for " + this);
- _id = id;
- }
-
- public Object getId() {
- return _id;
- }
-
- public boolean equals(Object obj) {
- if (_id == null || ((ExecutionQueueObject)obj)._id == null)
- return this==obj;
- return ((ExecutionQueueObject)obj)._id.equals(_id);
- }
+ /**
+ * A unique idefntifer for this object in the queue (should only be set by queue).
+ */
+ private Object _id;
+
+ /**
+ * A human-readable description of the object.
+ */
+ private String _description;
+
+ public String getDescription() {
+ return _description;
+ }
+
+ public void setDescription(String description) {
+ if (_description != null) {
+ throw new IllegalStateException("Description already set for " + this);
+ }
+ _description = description;
+ }
+
+ public void setId(Object id) {
+ if (_id != null) {
+ throw new IllegalStateException("Object id already set for " + this);
+ }
+ _id = id;
+ }
+
+ public Object getId() {
+ return _id;
+ }
+
+ public boolean equals(Object obj) {
+ if (_id == null || ((ExecutionQueueObject) obj)._id == null) {
+ return this == obj;
+ }
+ return ((ExecutionQueueObject) obj)._id.equals(_id);
+ }
}
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ChannelFactory.java Mon Oct 30 07:57:02 2006
@@ -18,92 +18,76 @@
*/
package org.apache.ode.jacob.vpu;
-import org.apache.ode.jacob.Channel;
-import org.apache.ode.utils.ArrayUtils;
-
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-public class ChannelFactory {
- private static final Method METHOD_OBJECT_EQUALS;
- private static final Method METHOD_CHANNEL_EXPORT;
-
- static {
- try {
- METHOD_OBJECT_EQUALS = Object.class.getMethod("equals", new Class[] { Object.class });
- } catch (Exception e) {
- throw new AssertionError("No equals(Object) method on Object!");
- }
+import org.apache.ode.jacob.Channel;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.utils.ArrayUtils;
- try {
- METHOD_CHANNEL_EXPORT = Channel.class.getMethod("export", ArrayUtils.EMPTY_CLASS_ARRAY);
- } catch (Exception e) {
- throw new AssertionError("No export() method on Object!");
- }
- }
+public class ChannelFactory {
+ private static final Method METHOD_OBJECT_EQUALS;
- /**
- * DOCUMENTME
- *
- * @param channel DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public static Object getBackend(Channel channel) {
- ChannelInvocationHandler cih = (ChannelInvocationHandler)Proxy.getInvocationHandler(channel);
- return cih._backend;
- }
-
-
- /**
- * DOCUMENTME
- *
- * @return DOCUMENTME
- */
- public static Channel createChannel(Object backend, Class type) {
- InvocationHandler h = new ChannelInvocationHandler(backend);
- Class[] ifaces = new Class[] { Channel.class, type };
- Object proxy = Proxy.newProxyInstance(Channel.class.getClassLoader(), ifaces, h);
- return (Channel)proxy;
- }
+ private static final Method METHOD_CHANNEL_EXPORT;
- public static final class ChannelInvocationHandler implements InvocationHandler {
- private Object _backend;
+ static {
+ try {
+ METHOD_OBJECT_EQUALS = Object.class.getMethod("equals", new Class[] { Object.class });
+ } catch (Exception e) {
+ throw new AssertionError("No equals(Object) method on Object!");
+ }
- ChannelInvocationHandler(Object backend) {
- _backend = backend;
+ try {
+ METHOD_CHANNEL_EXPORT = Channel.class.getMethod("export", ArrayUtils.EMPTY_CLASS_ARRAY);
+ } catch (Exception e) {
+ throw new AssertionError("No export() method on Object!");
+ }
}
- public String toString() {
- return _backend.toString();
+ public static Object getBackend(Channel channel) {
+ ChannelInvocationHandler cih = (ChannelInvocationHandler) Proxy.getInvocationHandler(channel);
+ return cih._backend;
}
- public boolean equals(Object other) {
- return ((ChannelInvocationHandler)other)._backend.equals(_backend);
+ public static Channel createChannel(CommChannel backend, Class type) {
+ InvocationHandler h = new ChannelInvocationHandler(backend);
+ Class[] ifaces = new Class[] { Channel.class, type };
+ Object proxy = Proxy.newProxyInstance(Channel.class.getClassLoader(), ifaces, h);
+ return (Channel) proxy;
}
- public int hashCode() {
- return _backend.hashCode();
- }
+ public static final class ChannelInvocationHandler implements InvocationHandler {
+ private CommChannel _backend;
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- if (method.getDeclaringClass() == Object.class) {
- if (method.equals(METHOD_OBJECT_EQUALS)) {
- return Boolean.valueOf(this.equals(Proxy.getInvocationHandler(args[0])));
+ ChannelInvocationHandler(CommChannel backend) {
+ _backend = backend;
}
- return method.invoke(this, args);
- }
+ public String toString() {
+ return _backend.toString();
+ }
- if (method.equals(METHOD_CHANNEL_EXPORT)) {
- return JacobVPU.activeJacobThread().exportChannel((Channel)proxy);
- }
+ public boolean equals(Object other) {
+ return ((ChannelInvocationHandler) other)._backend.equals(_backend);
+ }
- return JacobVPU.activeJacobThread().message((Channel)proxy, method, args);
- }
+ public int hashCode() {
+ return _backend.hashCode();
+ }
- }
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if (method.getDeclaringClass() == Object.class) {
+ if (method.equals(METHOD_OBJECT_EQUALS)) {
+ return Boolean.valueOf(this.equals(Proxy.getInvocationHandler(args[0])));
+ }
+ return method.invoke(this, args);
+ }
+ if (method.equals(METHOD_CHANNEL_EXPORT)) {
+ return JacobVPU.activeJacobThread().exportChannel((Channel) proxy);
+ }
+ return JacobVPU.activeJacobThread().message((Channel) proxy, method, args);
+ }
+ } // class ChannelInvocationHandler
}
-
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java (original)
+++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java Mon Oct 30 07:57:02 2006
@@ -61,8 +61,7 @@
import com.thoughtworks.xstream.mapper.Mapper;
/**
- * A fast, in-memory {@link org.apache.ode.jacob.soup.ExecutionQueue}
- * implementation.
+ * A fast, in-memory {@link org.apache.ode.jacob.soup.ExecutionQueue} implementation.
*/
public class ExecutionQueueImpl implements ExecutionQueue {
/** Class-level logger. */
@@ -71,9 +70,8 @@
private ClassLoader _classLoader;
/**
- * Cached set of enqueued {@link Continuation} objects (i.e. those reed
- * using the
- * {@link #enqueueReaction(org.apache.ode.jacob.soup.Continuation) method}).
+ * Cached set of enqueued {@link Continuation} objects (i.e. those read using
+ * {@link #enqueueReaction(org.apache.ode.jacob.soup.Continuation)}).
* These reactions are "cached"--that is it is not sent directly to the DAO
* layer--to minimize unnecessary serialization/deserialization of closures.
* This is a pretty useful optimization, as most {@link Continuation}s are
@@ -310,13 +308,12 @@
// Write out the reactions.
sos.writeInt(_reactions.size());
- for (Iterator i = _reactions.iterator(); i.hasNext();) {
- Continuation continuation = (Continuation) i.next();
- sos.writeObject(continuation.getClosure());
- sos.writeUTF(continuation.getMethod().getName());
- sos.writeInt(continuation.getArgs() == null ? 0 : continuation.getArgs().length);
- for (int j = 0; continuation.getArgs() != null && j < continuation.getArgs().length; ++j)
- sos.writeObject(continuation.getArgs()[j]);
+ for (Continuation c : _reactions) {
+ sos.writeObject(c.getClosure());
+ sos.writeUTF(c.getMethod().getName());
+ sos.writeInt(c.getArgs() == null ? 0 : c.getArgs().length);
+ for (int j = 0; c.getArgs() != null && j < c.getArgs().length; ++j)
+ sos.writeObject(c.getArgs()[j]);
}
sos.writeInt(_channels.values().size());
@@ -366,15 +363,12 @@
}
// If we have no reactions, but there are some channels that have
- // external references,
- // we are not done.
+ // external references, we are not done.
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
if (i.next().refCount > 0) {
return false;
}
}
-
- // Otherwise, we are done.
return true;
}
@@ -418,55 +412,18 @@
}
}
- // // Do some cleanup, if the channel is empty we can remove it from
- // memory.
+ // Do some cleanup, if the channel is empty we can remove it from memory.
// if (cframe != null && cframe.msgFrames.isEmpty() &&
// cframe.objFrames.isEmpty() && cframe.refCount ==0)
// _channels.values().remove(cframe);
}
- // TODO revisit: apparently dead wood
- // private JacobObject cloneClosure(JacobObject closure) {
- // long startTime = System.currentTimeMillis();
- // try {
- // ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
- // ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(bos);
- // sos.writeObject(closure);
- // sos.close();
- // long readStart = System.currentTimeMillis();
- // ExecutionQueueInputStream cis = new ExecutionQueueInputStream(new
- // ByteArrayInputStream(bos.toByteArray()));
- // JacobObject ret = (JacobObject) cis.readObject();
- // cis.close();
- //
- // long copyTime = System.currentTimeMillis() - startTime;
- // long readTime = System.currentTimeMillis() - readStart;
- // long copyBytes = bos.size();
- //
- // _statistics.cloneClosureBytes += copyBytes;
- // _statistics.cloneClosureTimeMs += copyTime;
- // _statistics.cloneClosureReadTimeMs += readTime;
- // _statistics.cloneClousreCount++;
- //
- // if (__log.isDebugEnabled()) {
- // __log.debug("cloneClosure(" + closure + "): serialized " + copyBytes + "
- // bytes in " + copyTime+ "ms.");
- // }
- // return ret;
- // } catch (Exception ex) {
- // throw new RuntimeException("Internal Error in ExecutionQueueImpl.java",
- // ex);
- // }
- // }
-
/**
* Verify that a {@link ExecutionQueueObject} is new, that is it has not
* already been added to the soup.
*
- * @param so
- * object to check.
- * @throws IllegalArgumentException
- * in case the object is not new
+ * @param so object to check.
+ * @throws IllegalArgumentException in case the object is not new
*/
private void verifyNew(ExecutionQueueObject so) throws IllegalArgumentException {
if (so.getId() != null)