You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ha...@apache.org on 2013/01/19 01:58:07 UTC
svn commit: r1435433 - in /ode/trunk/jacob/src:
main/java/org/apache/ode/jacob/ main/java/org/apache/ode/jacob/vpu/
test/java/org/apache/ode/jacob/examples/eratosthenes/
Author: hadrian
Date: Sat Jan 19 00:58:07 2013
New Revision: 1435433
URL: http://svn.apache.org/viewvc?rev=1435433&view=rev
Log:
ODE-987. Refactor a bit the ChannelListener and pave way to simplify the api
Modified:
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java Sat Jan 19 00:58:07 2013
@@ -19,7 +19,6 @@
package org.apache.ode.jacob;
import java.lang.reflect.Method;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -30,25 +29,8 @@ import java.util.Set;
*/
@SuppressWarnings("serial")
public abstract class ChannelListener extends JacobObject {
- private transient Set<Method> _implementedMethods;
- private transient Channel _channel;
- protected ChannelListener(Channel channel) throws IllegalStateException {
- assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
- "Inheritance in ChannelListener classes not allowed!";
- if (channel == null) {
- throw new IllegalArgumentException("Null channel!");
- }
- _channel = channel;
- }
-
- public Channel getChannel() {
- return _channel;
- }
-
- public void setChannel(Channel channel) {
- _channel = channel;
- }
+ public abstract Set<Method> getImplementedMethods();
public Set<ChannelListener> or(ChannelListener other) {
HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
@@ -63,21 +45,13 @@ public abstract class ChannelListener ex
return retval;
}
- public Set<Method> getImplementedMethods() {
- if (_implementedMethods == null) {
- Set<Method> implementedMethods = new HashSet<Method>();
- ClassUtil.getImplementedMethods(implementedMethods, getClass().getSuperclass());
- _implementedMethods = Collections.unmodifiableSet(implementedMethods);
- }
- return _implementedMethods;
- }
-
/**
* Get a description of the object for debugging purposes.
*
* @return human-readable description.
*/
public String toString() {
+ // TODO: needs improvement
StringBuffer buf = new StringBuffer(getClassName());
buf.append('{');
for (Method m : getImplementedMethods()) {
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java Sat Jan 19 00:58:07 2013
@@ -82,9 +82,8 @@ public abstract class JacobObject implem
* @param methodList method list for the communication reduction
* @see JacobThread#object
*/
- protected static Channel object(ChannelListener methodList) {
+ protected static void object(ChannelListener methodList) {
JacobVPU.activeJacobThread().object(false, methodList);
- return methodList.getChannel();
}
protected static void object(boolean replication, ChannelListener methodList) {
@@ -100,9 +99,8 @@ public abstract class JacobObject implem
methodLists.toArray(new ChannelListener[methodLists.size()]));
}
- protected static Channel replication(ChannelListener methodList) {
+ protected static void replication(ChannelListener methodList) {
JacobVPU.activeJacobThread().object(true, methodList);
- return methodList.getChannel();
}
/**
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java Sat Jan 19 00:58:07 2013
@@ -27,10 +27,16 @@ import java.util.Set;
@SuppressWarnings("serial")
public abstract class ReceiveProcess<T extends Channel> extends ChannelListener {
private transient Set<Method> _implementedMethods;
+ private transient Channel channel;
private T receiver;
protected ReceiveProcess(T channel, T receiver) throws IllegalStateException {
- super(channel);
+ assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
+ "Inheritance in ChannelListener classes not allowed!";
+ if (channel == null) {
+ throw new IllegalArgumentException("Null channel!");
+ }
+ this.channel = channel;
this.receiver = receiver;
}
@@ -38,6 +44,14 @@ public abstract class ReceiveProcess<T e
return receiver;
}
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
public Set<Method> getImplementedMethods() {
if (_implementedMethods == null) {
Set<Method> implementedMethods = new HashSet<Method>();
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Sat Jan 19 00:58:07 2013
@@ -330,11 +330,13 @@ public final class JacobVPU {
msg.append(_cycle);
msg.append(": ");
for (int i = 0; i < ml.length; ++i) {
- if (i != 0) msg.append(" + ");
- msg.append(ml[i].getChannel());
- msg.append(" ? ");
- msg.append(ml.toString());
-
+ if (ml[i] instanceof ReceiveProcess) {
+ ReceiveProcess<?> rp = (ReceiveProcess<?>)ml[i];
+ if (i != 0) msg.append(" + ");
+ msg.append(rp.getChannel());
+ msg.append(" ? ");
+ msg.append(rp.toString());
+ }
}
LOG.trace(msg.toString());
}
@@ -343,11 +345,14 @@ public final class JacobVPU {
CommGroup grp = new CommGroup(replicate);
for (int i = 0; i < ml.length; ++i) {
- CommChannel chnl = (CommChannel) ChannelFactory.getBackend(ml[i].getChannel());
- // TODO see below..
- // oframe.setDebugInfo(fillDebugInfo());
- CommRecv recv = new CommRecv(chnl, ml[i]);
- grp.add(recv);
+ if (ml[i] instanceof ReceiveProcess) {
+ CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
+ ((ReceiveProcess<?>)ml[i]).getChannel());
+ // TODO see below..
+ // oframe.setDebugInfo(fillDebugInfo());
+ CommRecv recv = new CommRecv(chnl, ml[i]);
+ grp.add(recv);
+ }
}
_executionQueue.add(grp);
}
Modified: ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java (original)
+++ ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java Sat Jan 19 00:58:07 2013
@@ -70,13 +70,15 @@ public class Sieve extends JacobRunnable
}
public void run() {
- _out.val(_n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
+ Synch ret = newChannel(Synch.class);
+ object(new ReceiveProcess<Synch>(ret, new Synch() {
public void ret() {
instance(new Counter(_out, _n+1));
}
}) {
private static final long serialVersionUID = -4336285925619915276L;
- }));
+ });
+ _out.val(_n, ret);
}
}
@@ -104,16 +106,18 @@ public class Sieve extends JacobRunnable
public void run() {
object(new ReceiveProcess<NaturalNumberStream>(_in, new NaturalNumberStream() {
public void val(final int n, final Synch ret) {
- _primes.val(n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
- public void ret() {
- NaturalNumberStream x = newChannel(NaturalNumberStream.class);
- instance(new PrimeFilter(n, _in, x));
- instance(new Head(x, _primes));
- ret.ret();
- }
- }) {
- private static final long serialVersionUID = -3009595654233593893L;
- }));
+ Synch r = newChannel(Synch.class);
+ object(new ReceiveProcess<Synch>(r, new Synch() {
+ public void ret() {
+ NaturalNumberStream x = newChannel(NaturalNumberStream.class);
+ instance(new PrimeFilter(n, _in, x));
+ instance(new Head(x, _primes));
+ ret.ret();
+ }
+ }) {
+ private static final long serialVersionUID = -3009595654233593893L;
+ });
+ _primes.val(n, r);
}
}) {
private static final long serialVersionUID = -2145752474431263689L;
@@ -166,13 +170,15 @@ public class Sieve extends JacobRunnable
object(true, new ReceiveProcess<NaturalNumberStream>(_in, new NaturalNumberStream() {
public void val(int n, final Synch ret) {
if (n % _prime != 0) {
- _out.val(n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
- public void ret() {
- ret.ret();
- }
- }) {
- private static final long serialVersionUID = 2523405590764193613L;
- }));
+ Synch r = newChannel(Synch.class);
+ object(new ReceiveProcess<Synch>(r, new Synch() {
+ public void ret() {
+ ret.ret();
+ }
+ }) {
+ private static final long serialVersionUID = 2523405590764193613L;
+ });
+ _out.val(n, r);
} else {
ret.ret();
}