You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/03/08 11:55:58 UTC
svn commit: r1298351 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
examples/src/main/java/org/apache/hama/examples/
graph/src/main/java/org/apache/hama/graph/
Author: edwardyoon
Date: Thu Mar 8 10:55:58 2012
New Revision: 1298351
URL: http://svn.apache.org/viewvc?rev=1298351&view=rev
Log:
Add MinIntCombiner to SSSP example
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Mar 8 10:55:58 2012
@@ -4,6 +4,7 @@ Release 0.5 - Unreleased
NEW FEATURES
+ HAMA-518: Add MinIntCombiner to SSSP example (edwardyoon)
HAMA-367: Runtime Compression of BSP Messages to Improve the Performance (Apurv Verma via tjungblut)
HAMA-501: Add Avro RPC (tjungblut)
HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Mar 8 10:55:58 2012
@@ -311,7 +311,10 @@ public final class BSPPeerImpl<K1, V1, K
Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(
conf.getClass("bsp.combiner.class", Combiner.class), conf);
- return combiner.combine(messages);
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ bundle.addMessage(combiner.combine(messages));
+
+ return bundle;
} else {
BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
for (M message : messages) {
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Combiner.java Thu Mar 8 10:55:58 2012
@@ -27,6 +27,6 @@ public abstract class Combiner<M extends
* @param messages
* @return the combined message
*/
- public abstract BSPMessageBundle<M> combine(Iterable<M> messages);
+ public abstract M combine(Iterable<M> messages);
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Thu Mar 8 10:55:58 2012
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.FileOutputFormat;
@@ -71,8 +70,7 @@ public class CombineExample {
public static class SumCombiner extends Combiner<IntWritable> {
@Override
- public BSPMessageBundle<IntWritable> combine(Iterable<IntWritable> messages) {
- BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+ public IntWritable combine(Iterable<IntWritable> messages) {
int sum = 0;
Iterator<IntWritable> it = messages.iterator();
@@ -80,8 +78,7 @@ public class CombineExample {
sum += it.next().get();
}
- bundle.addMessage(new IntWritable(sum));
- return bundle;
+ return new IntWritable(sum);
}
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Thu Mar 8 10:55:58 2012
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
@@ -67,6 +68,23 @@ public class SSSP {
}
}
+ public static class MinIntCombiner extends Combiner<IntWritable> {
+
+ @Override
+ public IntWritable combine(Iterable<IntWritable> messages) {
+ int minDist = Integer.MAX_VALUE;
+
+ Iterator<IntWritable> it = messages.iterator();
+ while (it.hasNext()) {
+ int msgValue = it.next().get();
+ if (minDist > msgValue)
+ minDist = msgValue;
+ }
+
+ return new IntWritable(minDist);
+ }
+ }
+
private static void printUsage() {
System.out.println("Usage: <startnode> <input> <output> [tasks]");
System.exit(-1);
@@ -92,6 +110,7 @@ public class SSSP {
}
ssspJob.setVertexClass(ShortestPathVertex.class);
+ ssspJob.setCombinerClass(MinIntCombiner.class);
ssspJob.setInputFormat(SequenceFileInputFormat.class);
ssspJob.setInputKeyClass(VertexWritable.class);
ssspJob.setInputValueClass(VertexArrayWritable.class);
@@ -100,6 +119,8 @@ public class SSSP {
ssspJob.setOutputFormat(SequenceFileOutputFormat.class);
ssspJob.setOutputKeyClass(Text.class);
ssspJob.setOutputValueClass(IntWritable.class);
+ // Iterate until all the nodes have been reached.
+ ssspJob.setMaxIteration(Integer.MAX_VALUE);
long startTime = System.currentTimeMillis();
if (ssspJob.waitForCompletion(true)) {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Mar 8 10:55:58 2012
@@ -22,11 +22,14 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.Combiner;
public class GraphJob extends BSPJob {
public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+ public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
- public GraphJob(HamaConfiguration conf, Class<?> exampleClass) throws IOException {
+ public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
+ throws IOException {
super(conf);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
@@ -48,4 +51,11 @@ public class GraphJob extends BSPJob {
return (Class<? extends Vertex<? extends Writable>>) conf.getClass(
VERTEX_CLASS_ATTR, Vertex.class);
}
+
+ @Override
+ public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
+ ensureState(JobState.DEFINE);
+ conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
+ }
+
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Mar 8 10:55:58 2012
@@ -35,57 +35,37 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("unchecked")
public class GraphJobRunner extends BSP {
public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+ private Configuration conf;
+ private Combiner<? extends Writable> combiner;
private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
- private String masterTask;
+
private String FLAG_MESSAGE = "hama.graph.msg.counts";
+ private final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+
+ private String masterTask;
+ private boolean updated = true;
+ private int globalUpdateCounts = 0;
- @SuppressWarnings("unchecked")
@Override
public void bsp(BSPPeer peer) throws IOException, SyncException,
InterruptedException {
int maxIteration = peer.getConfiguration().getInt(
"hama.graph.max.iteration", 30);
- boolean updated = true;
int iteration = 0;
while (updated && iteration < maxIteration) {
- int globalUpdateCounts = 0;
+ globalUpdateCounts = 0;
peer.sync();
- MapWritable msg = null;
- Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
- while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
-
- for (Entry<Writable, Writable> e : msg.entrySet()) {
- String vertexID = ((Text) e.getKey()).toString();
-
- if (vertexID.toString().equals(FLAG_MESSAGE)) {
- if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
- updated = false;
- } else {
- globalUpdateCounts += ((IntWritable) e.getValue()).get();
- }
- } else {
- Writable value = e.getValue();
-
- if (msgMap.containsKey(vertexID)) {
- LinkedList<Writable> msgs = msgMap.get(vertexID);
- msgs.add(value);
- msgMap.put(vertexID, msgs);
- } else {
- LinkedList<Writable> msgs = new LinkedList<Writable>();
- msgs.add(value);
- msgMap.put(vertexID, msgs);
- }
- }
- }
- }
+ // Map <vertexID, messages>
+ Map<String, LinkedList<Writable>> messages = parseMessages(peer);
// exit if there's no update made
if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
@@ -101,32 +81,91 @@ public class GraphJobRunner extends BSP
// send msgCounts to the master task
MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(msgMap.size()));
+ updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(messages.size()));
peer.send(masterTask, updatedCnt);
- for (Map.Entry<String, LinkedList<Writable>> e : msgMap.entrySet()) {
- if (e.getValue().size() > 0) {
- vertices.get(e.getKey()).compute(e.getValue().iterator());
+ for (Map.Entry<String, LinkedList<Writable>> e : messages.entrySet()) {
+ LinkedList msgs = e.getValue();
+ if (combiner != null) {
+ Writable combined = combiner.combine(msgs);
+ msgs = new LinkedList();
+ msgs.add(combined);
}
+
+ vertices.get(e.getKey()).compute(msgs.iterator());
}
iteration++;
}
}
- @SuppressWarnings("unchecked")
+ private Map<String, LinkedList<Writable>> parseMessages(BSPPeer peer)
+ throws IOException {
+ MapWritable msg = null;
+ Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
+ while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
+
+ for (Entry<Writable, Writable> e : msg.entrySet()) {
+ String vertexID = ((Text) e.getKey()).toString();
+
+ if (vertexID.toString().equals(FLAG_MESSAGE)) {
+ if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+ updated = false;
+ } else {
+ globalUpdateCounts += ((IntWritable) e.getValue()).get();
+ }
+ } else {
+ Writable value = e.getValue();
+
+ if (msgMap.containsKey(vertexID)) {
+ LinkedList<Writable> msgs = msgMap.get(vertexID);
+ msgs.add(value);
+ msgMap.put(vertexID, msgs);
+ } else {
+ LinkedList<Writable> msgs = new LinkedList<Writable>();
+ msgs.add(value);
+ msgMap.put(vertexID, msgs);
+ }
+ }
+ }
+ }
+
+ return msgMap;
+ }
+
public void setup(BSPPeer peer) throws IOException, SyncException,
InterruptedException {
- Configuration conf = peer.getConfiguration();
+ this.conf = peer.getConfiguration();
// Choose one as a master to collect global updates
- masterTask = peer.getPeerName(0);
- LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
+ this.masterTask = peer.getPeerName(0);
+
+ if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
+ Combiner.class)) {
+ LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
+
+ combiner = (Combiner<? extends Writable>) ReflectionUtils.newInstance(
+ conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
+ conf);
+ }
+ loadVertices(peer);
+ long numberVertices = vertices.size() * peer.getNumPeers();
+
+ for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
+ e.getValue().setNumVertices(numberVertices);
+
+ LinkedList<Writable> msgIterator = new LinkedList<Writable>();
+ msgIterator.add(e.getValue().getValue());
+ e.getValue().compute(msgIterator.iterator());
+ }
+ }
+
+ private void loadVertices(BSPPeer peer) throws IOException {
+ LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;
while ((next = peer.readNext()) != null) {
Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
- .newInstance(
- peer.getConfiguration().getClass("hama.graph.vertex.class",
- Vertex.class), peer.getConfiguration());
+ .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
+ conf);
vertex.setVertexID(next.getKey().getName());
vertex.peer = peer;
@@ -141,27 +180,11 @@ public class GraphJobRunner extends BSP
vertex.edges = edges;
vertices.put(next.getKey().getName(), vertex);
}
-
- long numberVertices = vertices.size() * peer.getNumPeers();
- startVertexCompute(numberVertices);
- }
-
- @SuppressWarnings("unchecked")
- private void startVertexCompute(long numberVertices) throws IOException {
- for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
- // TODO there's no another way to set numVertices?
- e.getValue().setNumVertices(numberVertices);
-
- LinkedList<Writable> msgIterator = new LinkedList<Writable>();
- msgIterator.add(e.getValue().getValue());
- e.getValue().compute(msgIterator.iterator());
- }
}
/**
* Just write <new Text(vertexID), (Writable) value> pair as a result
*/
- @SuppressWarnings("unchecked")
public void cleanup(BSPPeer peer) throws IOException {
for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
peer.write(new Text(e.getValue().getVertexID()), e.getValue().getValue());
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1298351&r1=1298350&r2=1298351&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Mar 8 10:55:58 2012
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-public interface VertexInterface<MSGTYPE> {
+import org.apache.hadoop.io.Writable;
+
+public interface VertexInterface<MSGTYPE extends Writable> {
/**
* @return the vertex ID.