You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ha...@apache.org on 2013/01/19 01:58:07 UTC

svn commit: r1435433 - in /ode/trunk/jacob/src: main/java/org/apache/ode/jacob/ main/java/org/apache/ode/jacob/vpu/ test/java/org/apache/ode/jacob/examples/eratosthenes/

Author: hadrian
Date: Sat Jan 19 00:58:07 2013
New Revision: 1435433

URL: http://svn.apache.org/viewvc?rev=1435433&view=rev
Log:
ODE-987. Refactor a bit the ChannelListener and pave way to simplify the api

Modified:
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
    ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ChannelListener.java Sat Jan 19 00:58:07 2013
@@ -19,7 +19,6 @@
 package org.apache.ode.jacob;
 
 import java.lang.reflect.Method;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -30,25 +29,8 @@ import java.util.Set;
  */
 @SuppressWarnings("serial")
 public abstract class ChannelListener extends JacobObject {
-    private transient Set<Method> _implementedMethods;
-    private transient Channel _channel;
 
-    protected ChannelListener(Channel channel) throws IllegalStateException {
-        assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
-               "Inheritance in ChannelListener classes not allowed!";
-        if (channel == null) {
-            throw new IllegalArgumentException("Null channel!");
-        }
-        _channel = channel;
-    }
-
-    public Channel getChannel() {
-        return _channel;
-    }
-
-    public void setChannel(Channel channel) {
-        _channel = channel;
-    }
+    public abstract Set<Method> getImplementedMethods();
 
     public Set<ChannelListener> or(ChannelListener other) {
         HashSet<ChannelListener> retval = new HashSet<ChannelListener>();
@@ -63,21 +45,13 @@ public abstract class ChannelListener ex
         return retval;
     }
 
-    public Set<Method> getImplementedMethods() {
-        if (_implementedMethods == null) {
-            Set<Method> implementedMethods = new HashSet<Method>();
-            ClassUtil.getImplementedMethods(implementedMethods, getClass().getSuperclass());
-            _implementedMethods = Collections.unmodifiableSet(implementedMethods);
-        }
-        return _implementedMethods;
-    }
-
     /**
      * Get a description of the object for debugging purposes.
      *
      * @return human-readable description.
      */
     public String toString() {
+        // TODO: needs improvement
         StringBuffer buf = new StringBuffer(getClassName());
         buf.append('{');
         for (Method m : getImplementedMethods()) {

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/JacobObject.java Sat Jan 19 00:58:07 2013
@@ -82,9 +82,8 @@ public abstract class JacobObject implem
      * @param methodList method list for the communication reduction
      * @see JacobThread#object
      */
-    protected static Channel object(ChannelListener methodList) {
+    protected static void object(ChannelListener methodList) {
         JacobVPU.activeJacobThread().object(false, methodList);
-        return methodList.getChannel();
     }
 
     protected static void object(boolean replication, ChannelListener methodList) {
@@ -100,9 +99,8 @@ public abstract class JacobObject implem
                 methodLists.toArray(new ChannelListener[methodLists.size()]));
     }
 
-    protected static Channel replication(ChannelListener methodList) {
+    protected static void replication(ChannelListener methodList) {
         JacobVPU.activeJacobThread().object(true, methodList);
-        return methodList.getChannel();
     }
 
     /**

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/ReceiveProcess.java Sat Jan 19 00:58:07 2013
@@ -27,10 +27,16 @@ import java.util.Set;
 @SuppressWarnings("serial")
 public abstract class ReceiveProcess<T extends Channel> extends ChannelListener {
     private transient Set<Method> _implementedMethods;
+    private transient Channel channel;
     private T receiver;
     
     protected ReceiveProcess(T channel, T receiver) throws IllegalStateException {
-        super(channel);
+        assert getClass().getSuperclass().getSuperclass() == ChannelListener.class :
+            "Inheritance in ChannelListener classes not allowed!";
+        if (channel == null) {
+            throw new IllegalArgumentException("Null channel!");
+        }
+        this.channel = channel;
         this.receiver = receiver;
     }
 
@@ -38,6 +44,14 @@ public abstract class ReceiveProcess<T e
         return receiver;
     }
 
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
     public Set<Method> getImplementedMethods() {
         if (_implementedMethods == null) {
             Set<Method> implementedMethods = new HashSet<Method>();

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Sat Jan 19 00:58:07 2013
@@ -330,11 +330,13 @@ public final class JacobVPU {
                 msg.append(_cycle);
                 msg.append(": ");
                 for (int i = 0; i < ml.length; ++i) {
-                    if (i != 0) msg.append(" + ");
-                    msg.append(ml[i].getChannel());
-                    msg.append(" ? ");
-                    msg.append(ml.toString());
-
+                    if (ml[i] instanceof ReceiveProcess) {
+                        ReceiveProcess<?> rp = (ReceiveProcess<?>)ml[i];
+                        if (i != 0) msg.append(" + ");
+                        msg.append(rp.getChannel());
+                        msg.append(" ? ");
+                        msg.append(rp.toString());
+                    }
                 }
                 LOG.trace(msg.toString());
             }
@@ -343,11 +345,14 @@ public final class JacobVPU {
 
             CommGroup grp = new CommGroup(replicate);
             for (int i = 0; i < ml.length; ++i) {
-                CommChannel chnl = (CommChannel) ChannelFactory.getBackend(ml[i].getChannel());
-                // TODO see below..
-                // oframe.setDebugInfo(fillDebugInfo());
-                CommRecv recv = new CommRecv(chnl, ml[i]);
-                grp.add(recv);
+                if (ml[i] instanceof ReceiveProcess) {
+                    CommChannel chnl = (CommChannel)ChannelFactory.getBackend(
+                        ((ReceiveProcess<?>)ml[i]).getChannel());
+                    // TODO see below..
+                    // oframe.setDebugInfo(fillDebugInfo());
+                    CommRecv recv = new CommRecv(chnl, ml[i]);
+                    grp.add(recv);
+                }
             }
             _executionQueue.add(grp);
         }

Modified: ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java?rev=1435433&r1=1435432&r2=1435433&view=diff
==============================================================================
--- ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java (original)
+++ ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/eratosthenes/Sieve.java Sat Jan 19 00:58:07 2013
@@ -70,13 +70,15 @@ public class Sieve extends JacobRunnable
     }
 
     public void run() {
-        _out.val(_n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
+        Synch ret = newChannel(Synch.class);
+        object(new ReceiveProcess<Synch>(ret, new Synch() {
             public void ret() {
                 instance(new Counter(_out, _n+1));
             }
         }) {
             private static final long serialVersionUID = -4336285925619915276L;
-        }));
+        });
+        _out.val(_n, ret);
     }
   }
 
@@ -104,16 +106,18 @@ public class Sieve extends JacobRunnable
     public void run() {
       object(new ReceiveProcess<NaturalNumberStream>(_in, new NaturalNumberStream() {
         public void val(final int n, final Synch ret) {
-          _primes.val(n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
-            public void ret() {
-              NaturalNumberStream x = newChannel(NaturalNumberStream.class);
-              instance(new PrimeFilter(n, _in, x));
-              instance(new Head(x, _primes));
-              ret.ret();
-            }
-          }) {
-              private static final long serialVersionUID = -3009595654233593893L;
-          }));
+            Synch r = newChannel(Synch.class);
+            object(new ReceiveProcess<Synch>(r, new Synch() {
+                public void ret() {
+                  NaturalNumberStream x = newChannel(NaturalNumberStream.class);
+                  instance(new PrimeFilter(n, _in, x));
+                  instance(new Head(x, _primes));
+                  ret.ret();
+                }
+            }) {
+                private static final long serialVersionUID = -3009595654233593893L;
+            });
+            _primes.val(n, r);
        }
       }) {
           private static final long serialVersionUID = -2145752474431263689L;
@@ -166,13 +170,15 @@ public class Sieve extends JacobRunnable
        object(true, new ReceiveProcess<NaturalNumberStream>(_in, new NaturalNumberStream() {
           public void val(int n, final Synch ret) {
               if (n % _prime != 0) {
-                 _out.val(n, (Synch)object(new ReceiveProcess<Synch>(newChannel(Synch.class), new Synch() {
-                     public void ret() {
-                         ret.ret();
-                     }
-                 }) {
-                     private static final long serialVersionUID = 2523405590764193613L;
-                 }));
+                  Synch r = newChannel(Synch.class);
+                  object(new ReceiveProcess<Synch>(r, new Synch() {
+                      public void ret() {
+                          ret.ret();
+                      }
+                  }) {
+                      private static final long serialVersionUID = 2523405590764193613L;
+                  });
+                  _out.val(n, r);
               } else {
                  ret.ret();
               }