You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/06/14 06:06:26 UTC
svn commit: r1350083 - in /incubator/hama/trunk: ./
examples/src/main/java/org/apache/hama/examples/
examples/src/test/java/org/apache/hama/examples/
graph/src/main/java/org/apache/hama/graph/
Author: edwardyoon
Date: Thu Jun 14 04:06:25 2012
New Revision: 1350083
URL: http://svn.apache.org/viewvc?rev=1350083&view=rev
Log:
Add voteToHalt() mechanism in Graph API
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jun 14 04:06:25 2012
@@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012
NEW FEATURES
+ HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
HAMA-566: Add disk-based queue (tjungblut)
HAMA-552: Add a sorted message queue (tjungblut)
HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Thu Jun 14 04:06:25 2012
@@ -66,10 +66,14 @@ public class MindistSearch {
boolean updated = false;
while (messages.hasNext()) {
Text next = messages.next();
- if (currentComponent.compareTo(next) > 0) {
- updated = true;
- setValue(next);
- }
+ if(currentComponent != null && next != null) {
+ if (currentComponent.compareTo(next) > 0) {
+ updated = true;
+ setValue(next);
+ }
+ } else {
+ this.voteToHalt();
+ }
}
if (updated) {
sendMessageToNeighbors(getValue());
Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Thu Jun 14 04:06:25 2012
@@ -42,10 +42,10 @@ public class MindistSearchTest extends T
String[] input = new String[] { "0", "1\t4\t7", "2\t3\t8", "3\t5", "4\t1", "5\t6",
"6", "7", "8\t3", "9\t0" };
- private static String INPUT = "/tmp/pagerank-tmp.seq";
- private static String TEXT_INPUT = "/tmp/pagerank.txt";
- private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
- private static String OUTPUT = "/tmp/pagerank-out";
+ private static String INPUT = "/tmp/mdst-tmp.seq";
+ private static String TEXT_INPUT = "/tmp/mdst.txt";
+ private static String TEXT_OUTPUT = INPUT + "mdst.txt.seq";
+ private static String OUTPUT = "/tmp/mdst-out";
private Configuration conf = new HamaConfiguration();
private FileSystem fs;
@@ -58,7 +58,7 @@ public class MindistSearchTest extends T
public void testMindistSearch() throws Exception {
generateTestData();
try {
- MindistSearch.main(new String[] { INPUT, OUTPUT });
+ MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "2" });
verifyResult();
} finally {
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jun 14 04:06:25 2012
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -259,33 +258,39 @@ public final class GraphJobRunner<V exte
}
int messagesSize = messages.size();
- Iterator<Entry<V, LinkedList<M>>> iterator = messages.entrySet()
- .iterator();
- while (iterator.hasNext()) {
- Entry<V, LinkedList<M>> e = iterator.next();
- LinkedList<M> msgs = e.getValue();
- if (combiner != null) {
- M combined = combiner.combine(msgs);
+
+ for (Vertex<V, E, M> vertex : vertices.values()) {
+ LinkedList<M> msgs = messages.get(vertex.getVertexID());
+ if (vertex.isHalted() && msgs != null) {
+ vertex.votedToHalt = false;
+ }
+ if (msgs == null) {
msgs = new LinkedList<M>();
- msgs.add(combined);
}
- Vertex<V, E, M> vertex = vertices.get(e.getKey());
- M lastValue = vertex.getValue();
- vertex.compute(msgs.iterator());
- if (aggregators != null) {
- if (this.aggregators != null) {
- for (int i = 0; i < this.aggregators.length; i++) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(vertex, vertex.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) aggregator);
- intern.aggregate(vertex, lastValue, vertex.getValue());
- intern.aggregateInternal();
+
+ if (!vertex.isHalted()) {
+ if (combiner != null) {
+ M combined = combiner.combine(msgs);
+ msgs = new LinkedList<M>();
+ msgs.add(combined);
+ }
+ M lastValue = vertex.getValue();
+ vertex.compute(msgs.iterator());
+
+ if (aggregators != null) {
+ if (this.aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+ aggregator.aggregate(vertex, vertex.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) aggregator);
+ intern.aggregate(vertex, lastValue, vertex.getValue());
+ intern.aggregateInternal();
+ }
}
}
}
}
- iterator.remove();
}
runAggregators(peer, messagesSize);
@@ -476,11 +481,11 @@ public final class GraphJobRunner<V exte
/**
* @return a new vertex instance
*/
- public static <VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable> Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertexInstance(
+ public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(
Class<?> vertexClass, Configuration conf) {
@SuppressWarnings("unchecked")
- Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
- .newInstance(vertexClass, conf);
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
+ vertexClass, conf);
return vertex;
}
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jun 14 04:06:25 2012
@@ -36,6 +36,8 @@ public abstract class Vertex<V extends W
private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
private List<Edge<V, E>> edges;
+ protected boolean votedToHalt = false;
+
public Configuration getConf() {
return peer.getConfiguration();
}
@@ -163,6 +165,15 @@ public abstract class Vertex<V extends W
}
@Override
+ public void voteToHalt() {
+ this.votedToHalt = true;
+ }
+
+ public boolean isHalted() {
+ return votedToHalt;
+ }
+
+ @Override
public int hashCode() {
final int prime = 31;
int result = 1;
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1350083&r1=1350082&r2=1350083&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jun 14 04:06:25 2012
@@ -81,6 +81,11 @@ public interface VertexInterface<V exten
public long getSuperstepCount();
/**
+ * Vote to halt.
+ */
+ public void voteToHalt();
+
+ /**
* Sets the vertex value
*/
public void setValue(M value);