You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/03/08 11:55:58 UTC

svn commit: r1298351 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/

Author: edwardyoon
Date: Thu Mar  8 10:55:58 2012
New Revision: 1298351

URL: http://svn.apache.org/viewvc?rev=1298351&view=rev
Log:
Add MinIntCombiner to SSSP example

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Mar  8 10:55:58 2012
@@ -4,6 +4,7 @@ Release 0.5 - Unreleased
 
   NEW FEATURES
 
+   HAMA-518: Add MinIntCombiner to SSSP example (edwardyoon)
    HAMA-367: Runtime Compression of BSP Messages to Improve the Performance (Apurv Verma via tjungblut)
    HAMA-501: Add Avro RPC (tjungblut)
    HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Mar  8 10:55:58 2012
@@ -311,7 +311,10 @@ public final class BSPPeerImpl<K1, V1, K
       Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
           conf.getClass("bsp.combiner.class", Combiner.class), conf);
 
-      return combiner.combine(messages);
+      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+      bundle.addMessage(combiner.combine(messages));
+      
+      return bundle;
     } else {
       BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
       for (M message : messages) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java Thu Mar  8 10:55:58 2012
@@ -27,6 +27,6 @@ public abstract class Combiner<M extends
    * @param messages
    * @return the combined message
    */
-  public abstract BSPMessageBundle<M> combine(Iterable<M> messages);
+  public abstract M combine(Iterable<M> messages);
   
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Thu Mar  8 10:55:58 2012
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.FileOutputFormat;
@@ -71,8 +70,7 @@ public class CombineExample {
   public static class SumCombiner extends Combiner<IntWritable> {
 
     @Override
-    public BSPMessageBundle<IntWritable> combine(Iterable<IntWritable> messages) {
-      BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+    public IntWritable combine(Iterable<IntWritable> messages) {
       int sum = 0;
 
       Iterator<IntWritable> it = messages.iterator();
@@ -80,8 +78,7 @@ public class CombineExample {
         sum += it.next().get();
       }
 
-      bundle.addMessage(new IntWritable(sum));
-      return bundle;
+      return new IntWritable(sum);
     }
   }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Thu Mar  8 10:55:58 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
@@ -67,6 +68,23 @@ public class SSSP {
     }
   }
 
+  public static class MinIntCombiner extends Combiner<IntWritable> {
+
+    @Override
+    public IntWritable combine(Iterable<IntWritable> messages) {
+      int minDist = Integer.MAX_VALUE;
+
+      Iterator<IntWritable> it = messages.iterator();
+      while (it.hasNext()) {
+        int msgValue = it.next().get();
+        if (minDist > msgValue)
+          minDist = msgValue;
+      }
+
+      return new IntWritable(minDist);
+    }
+  }
+
   private static void printUsage() {
     System.out.println("Usage: <startnode> <input> <output> [tasks]");
     System.exit(-1);
@@ -92,6 +110,7 @@ public class SSSP {
     }
 
     ssspJob.setVertexClass(ShortestPathVertex.class);
+    ssspJob.setCombinerClass(MinIntCombiner.class);
     ssspJob.setInputFormat(SequenceFileInputFormat.class);
     ssspJob.setInputKeyClass(VertexWritable.class);
     ssspJob.setInputValueClass(VertexArrayWritable.class);
@@ -100,6 +119,8 @@ public class SSSP {
     ssspJob.setOutputFormat(SequenceFileOutputFormat.class);
     ssspJob.setOutputKeyClass(Text.class);
     ssspJob.setOutputValueClass(IntWritable.class);
+    // Iterate until all the nodes have been reached.
+    ssspJob.setMaxIteration(Integer.MAX_VALUE);
 
     long startTime = System.currentTimeMillis();
     if (ssspJob.waitForCompletion(true)) {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Mar  8 10:55:58 2012
@@ -22,11 +22,14 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.Combiner;
 
 public class GraphJob extends BSPJob {
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+  public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
 
-  public GraphJob(HamaConfiguration conf, Class<?> exampleClass) throws IOException {
+  public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
+      throws IOException {
     super(conf);
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
@@ -48,4 +51,11 @@ public class GraphJob extends BSPJob {
     return (Class<? extends Vertex<? extends Writable>>) conf.getClass(
         VERTEX_CLASS_ATTR, Vertex.class);
   }
+
+  @Override
+  public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
+    ensureState(JobState.DEFINE);
+    conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
+  }
+
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Mar  8 10:55:58 2012
@@ -35,57 +35,37 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("unchecked")
 public class GraphJobRunner extends BSP {
   public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+  private Configuration conf;
+  private Combiner<? extends Writable> combiner;
   private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
-  private String masterTask;
+
   private String FLAG_MESSAGE = "hama.graph.msg.counts";
+  private final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+
+  private String masterTask;
+  private boolean updated = true;
+  private int globalUpdateCounts = 0;
 
-  @SuppressWarnings("unchecked")
   @Override
   public void bsp(BSPPeer peer) throws IOException, SyncException,
       InterruptedException {
     int maxIteration = peer.getConfiguration().getInt(
         "hama.graph.max.iteration", 30);
 
-    boolean updated = true;
     int iteration = 0;
     while (updated && iteration < maxIteration) {
-      int globalUpdateCounts = 0;
+      globalUpdateCounts = 0;
       peer.sync();
 
-      MapWritable msg = null;
-      Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
-      while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
-
-        for (Entry<Writable, Writable> e : msg.entrySet()) {
-          String vertexID = ((Text) e.getKey()).toString();
-
-          if (vertexID.toString().equals(FLAG_MESSAGE)) {
-            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
-              updated = false;
-            } else {
-              globalUpdateCounts += ((IntWritable) e.getValue()).get();
-            }
-          } else {
-            Writable value = e.getValue();
-
-            if (msgMap.containsKey(vertexID)) {
-              LinkedList<Writable> msgs = msgMap.get(vertexID);
-              msgs.add(value);
-              msgMap.put(vertexID, msgs);
-            } else {
-              LinkedList<Writable> msgs = new LinkedList<Writable>();
-              msgs.add(value);
-              msgMap.put(vertexID, msgs);
-            }
-          }
-        }
-      }
+      // Map <vertexID, messages>
+      Map<String, LinkedList<Writable>> messages = parseMessages(peer);
 
       // exit if there's no update made
       if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
@@ -101,32 +81,91 @@ public class GraphJobRunner extends BSP 
 
       // send msgCounts to the master task
       MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size()));
+      updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(messages.size()));
       peer.send(masterTask, updatedCnt);
 
-      for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) {
-        if (e.getValue().size() > 0) {
-          vertices.get(e.getKey()).compute(e.getValue().iterator());
+      for (Map.Entry<String, LinkedList<Writable>> e : messages.entrySet()) {
+        LinkedList msgs = e.getValue();
+        if (combiner != null) {
+          Writable combined = combiner.combine(msgs);
+          msgs = new LinkedList();
+          msgs.add(combined);
         }
+
+        vertices.get(e.getKey()).compute(msgs.iterator());
       }
       iteration++;
     }
   }
 
-  @SuppressWarnings("unchecked")
+  private Map<String, LinkedList<Writable>> parseMessages(BSPPeer peer)
+      throws IOException {
+    MapWritable msg = null;
+    Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
+    while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
+
+      for (Entry<Writable, Writable> e : msg.entrySet()) {
+        String vertexID = ((Text) e.getKey()).toString();
+
+        if (vertexID.toString().equals(FLAG_MESSAGE)) {
+          if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+            updated = false;
+          } else {
+            globalUpdateCounts += ((IntWritable) e.getValue()).get();
+          }
+        } else {
+          Writable value = e.getValue();
+
+          if (msgMap.containsKey(vertexID)) {
+            LinkedList<Writable> msgs = msgMap.get(vertexID);
+            msgs.add(value);
+            msgMap.put(vertexID, msgs);
+          } else {
+            LinkedList<Writable> msgs = new LinkedList<Writable>();
+            msgs.add(value);
+            msgMap.put(vertexID, msgs);
+          }
+        }
+      }
+    }
+
+    return msgMap;
+  }
+
   public void setup(BSPPeer peer) throws IOException, SyncException,
       InterruptedException {
-    Configuration conf = peer.getConfiguration();
+    this.conf = peer.getConfiguration();
     // Choose one as a master to collect global updates
-    masterTask = peer.getPeerName(0);
-    LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
+    this.masterTask = peer.getPeerName(0);
+
+    if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
+        Combiner.class)) {
+      LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
+
+      combiner = (Combiner<? extends Writable>) ReflectionUtils.newInstance(
+          conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
+          conf);
+    }
 
+    loadVertices(peer);
+    long numberVertices = vertices.size() * peer.getNumPeers();
+
+    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
+      e.getValue().setNumVertices(numberVertices);
+
+      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
+      msgIterator.add(e.getValue().getValue());
+      e.getValue().compute(msgIterator.iterator());
+    }
+  }
+
+  private void loadVertices(BSPPeer peer) throws IOException {
+    LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
     KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
       Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
-          .newInstance(
-              peer.getConfiguration().getClass("hama.graph.vertex.class",
-                  Vertex.class), peer.getConfiguration());
+          .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
+              conf);
       vertex.setVertexID(next.getKey().getName());
       vertex.peer = peer;
 
@@ -141,27 +180,11 @@ public class GraphJobRunner extends BSP 
       vertex.edges = edges;
       vertices.put(next.getKey().getName(), vertex);
     }
-
-    long numberVertices = vertices.size() * peer.getNumPeers();
-    startVertexCompute(numberVertices);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void startVertexCompute(long numberVertices) throws IOException {
-    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
-      // TODO there's no another way to set numVertices?
-      e.getValue().setNumVertices(numberVertices);
-
-      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
-      msgIterator.add(e.getValue().getValue());
-      e.getValue().compute(msgIterator.iterator());
-    }
   }
 
   /**
    * Just write <new Text(vertexID), (Writable) value> pair as a result
    */
-  @SuppressWarnings("unchecked")
   public void cleanup(BSPPeer peer) throws IOException {
     for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
       peer.write(new Text(e.getValue().getVertexID()), e.getValue().getValue());

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Mar  8 10:55:58 2012
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-public interface VertexInterface<MSGTYPE> {
+import org.apache.hadoop.io.Writable;
+
+public interface VertexInterface<MSGTYPE extends Writable> {
 
   /**
    * @return the vertex ID.