You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/06/14 06:06:26 UTC

svn commit: r1350083 - in /incubator/hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/

Author: edwardyoon
Date: Thu Jun 14 04:06:25 2012
New Revision: 1350083

URL: http://svn.apache.org/viewvc?rev=1350083&view=rev
Log:
Add voteToHalt() mechanism in Graph API

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jun 14 04:06:25 2012
@@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012 
 
   NEW FEATURES
 
+   HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
    HAMA-566: Add disk-based queue (tjungblut)
    HAMA-552: Add a sorted message queue (tjungblut)   
    HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Thu Jun 14 04:06:25 2012
@@ -66,10 +66,14 @@ public class MindistSearch {
         boolean updated = false;
         while (messages.hasNext()) {
           Text next = messages.next();
-          if (currentComponent.compareTo(next) > 0) {
-            updated = true;
-            setValue(next);
-          }
+          if(currentComponent != null && next != null) {
+            if (currentComponent.compareTo(next) > 0) {
+              updated = true;
+              setValue(next);
+            }
+          } else {
+            this.voteToHalt();
+          } 
         }
         if (updated) {
           sendMessageToNeighbors(getValue());

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Thu Jun 14 04:06:25 2012
@@ -42,10 +42,10 @@ public class MindistSearchTest extends T
   String[] input = new String[] { "0", "1\t4\t7", "2\t3\t8", "3\t5", "4\t1", "5\t6",
       "6", "7", "8\t3", "9\t0" };
 
-  private static String INPUT = "/tmp/pagerank-tmp.seq";
-  private static String TEXT_INPUT = "/tmp/pagerank.txt";
-  private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
-  private static String OUTPUT = "/tmp/pagerank-out";
+  private static String INPUT = "/tmp/mdst-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/mdst.txt";
+  private static String TEXT_OUTPUT = INPUT + "mdst.txt.seq";
+  private static String OUTPUT = "/tmp/mdst-out";
   private Configuration conf = new HamaConfiguration();
   private FileSystem fs;
 
@@ -58,7 +58,7 @@ public class MindistSearchTest extends T
   public void testMindistSearch() throws Exception {
     generateTestData();
     try {
-      MindistSearch.main(new String[] { INPUT, OUTPUT });
+      MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "2" });
 
       verifyResult();
     } finally {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jun 14 04:06:25 2012
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -259,33 +258,39 @@ public final class GraphJobRunner<V exte
       }
 
       int messagesSize = messages.size();
-      Iterator<Entry<V, LinkedList<M>>> iterator = messages.entrySet()
-          .iterator();
-      while (iterator.hasNext()) {
-        Entry<V, LinkedList<M>> e = iterator.next();
-        LinkedList<M> msgs = e.getValue();
-        if (combiner != null) {
-          M combined = combiner.combine(msgs);
+
+      for (Vertex<V, E, M> vertex : vertices.values()) {
+        LinkedList<M> msgs = messages.get(vertex.getVertexID());
+        if (vertex.isHalted() && msgs != null) {
+          vertex.votedToHalt = false;
+        }
+        if (msgs == null) {
           msgs = new LinkedList<M>();
-          msgs.add(combined);
         }
-        Vertex<V, E, M> vertex = vertices.get(e.getKey());
-        M lastValue = vertex.getValue();
-        vertex.compute(msgs.iterator());
-        if (aggregators != null) {
-          if (this.aggregators != null) {
-            for (int i = 0; i < this.aggregators.length; i++) {
-              Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-              aggregator.aggregate(vertex, vertex.getValue());
-              if (isAbstractAggregator[i]) {
-                AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) aggregator);
-                intern.aggregate(vertex, lastValue, vertex.getValue());
-                intern.aggregateInternal();
+        
+        if (!vertex.isHalted()) {
+          if (combiner != null) {
+            M combined = combiner.combine(msgs);
+            msgs = new LinkedList<M>();
+            msgs.add(combined);
+          }
+          M lastValue = vertex.getValue();
+          vertex.compute(msgs.iterator());
+
+          if (aggregators != null) {
+            if (this.aggregators != null) {
+              for (int i = 0; i < this.aggregators.length; i++) {
+                Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+                aggregator.aggregate(vertex, vertex.getValue());
+                if (isAbstractAggregator[i]) {
+                  AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) aggregator);
+                  intern.aggregate(vertex, lastValue, vertex.getValue());
+                  intern.aggregateInternal();
+                }
               }
             }
           }
         }
-        iterator.remove();
       }
 
       runAggregators(peer, messagesSize);
@@ -476,11 +481,11 @@ public final class GraphJobRunner<V exte
   /**
    * @return a new vertex instance
    */
-  public static <VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable> Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertexInstance(
+  public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(
       Class<?> vertexClass, Configuration conf) {
     @SuppressWarnings("unchecked")
-    Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
-        .newInstance(vertexClass, conf);
+    Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
+      vertexClass, conf);
     return vertex;
   }
 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jun 14 04:06:25 2012
@@ -36,6 +36,8 @@ public abstract class Vertex<V extends W
   private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
   private List<Edge<V, E>> edges;
 
+  protected boolean votedToHalt = false;
+
   public Configuration getConf() {
     return peer.getConfiguration();
   }
@@ -163,6 +165,15 @@ public abstract class Vertex<V extends W
   }
 
   @Override
+  public void voteToHalt() {
+    this.votedToHalt = true;  
+  }
+  
+  public boolean isHalted() {
+    return votedToHalt;  
+  }
+
+  @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jun 14 04:06:25 2012
@@ -81,6 +81,11 @@ public interface VertexInterface<V exten
   public long getSuperstepCount();
 
   /**
+   * Vote to halt.
+   */
+  public void voteToHalt();
+
+  /**
    * Sets the vertex value
    */
   public void setValue(M value);