You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [5/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java Thu Feb 16 22:12:31 2012
@@ -22,47 +22,50 @@ import org.apache.giraph.graph.EdgeListV
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
import java.util.Iterator;
/**
* Vertex to allow unit testing of failure detection
*/
-public class SimpleFailVertex extends
- EdgeListVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
+public class SimpleFailVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleFailVertex.class);
+ /** TODO: Change this behavior to WorkerContext */
+ private static long SUPERSTEP = 0;
- static long superstep = 0;
-
- @Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
- if (getSuperstep() >= 1) {
- double sum = 0;
- while (msgIterator.hasNext()) {
- sum += msgIterator.next().get();
- }
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
- setVertexValue(vertexValue);
- if (getSuperstep() < 30) {
- if (getSuperstep() == 20) {
- if (getVertexId().get() == 10L) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- }
- System.exit(1);
- } else if (getSuperstep() - superstep > 10) {
- return;
- }
- }
- long edges = getNumOutEdges();
- sendMsgToAllEdges(
- new DoubleWritable(getVertexValue().get() / edges));
- } else {
- voteToHalt();
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ while (msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
+ setVertexValue(vertexValue);
+ if (getSuperstep() < 30) {
+ if (getSuperstep() == 20) {
+ if (getVertexId().get() == 10L) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted ", e);
}
- superstep = getSuperstep();
+ System.exit(1);
+ } else if (getSuperstep() - SUPERSTEP > 10) {
+ return;
+ }
}
+ long edges = getNumOutEdges();
+ sendMsgToAllEdges(
+ new DoubleWritable(getVertexValue().get() / edges));
+ } else {
+ voteToHalt();
+ }
+ SUPERSTEP = getSuperstep();
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Thu Feb 16 22:12:31 2012
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
import org.apache.giraph.graph.EdgeListVertex;
@@ -30,34 +31,35 @@ import org.apache.giraph.graph.EdgeListV
* Test whether messages can be sent and received by vertices.
*/
public class SimpleMsgVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- @Override
- public void compute(Iterator<IntWritable> msgIterator) {
- if (getVertexId().equals(new LongWritable(2))) {
- sendMsg(new LongWritable(1), new IntWritable(101));
- sendMsg(new LongWritable(1), new IntWritable(102));
- sendMsg(new LongWritable(1), new IntWritable(103));
- }
- if (!getVertexId().equals(new LongWritable(1))) {
- voteToHalt();
- }
- else {
- /* Check the messages */
- int sum = 0;
- while (msgIterator != null && msgIterator.hasNext()) {
- sum += msgIterator.next().get();
- }
- System.out.println("TestMsgVertex: Received a sum of " + sum +
- " (will stop on 306)");
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
+ @Override
+ public void compute(Iterator<IntWritable> msgIterator) {
+ if (getVertexId().equals(new LongWritable(2))) {
+ sendMsg(new LongWritable(1), new IntWritable(101));
+ sendMsg(new LongWritable(1), new IntWritable(102));
+ sendMsg(new LongWritable(1), new IntWritable(103));
+ }
+ if (!getVertexId().equals(new LongWritable(1))) {
+ voteToHalt();
+ } else {
+ /* Check the messages */
+ int sum = 0;
+ while (msgIterator != null && msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ LOG.info("TestMsgVertex: Received a sum of " + sum +
+ " (will stop on 306)");
- if (sum == 306) {
- voteToHalt();
- }
- }
- if (getSuperstep() > 3) {
- System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
- "messages in time");
- voteToHalt();
- }
+ if (sum == 306) {
+ voteToHalt();
+ }
+ }
+ if (getSuperstep() > 3) {
+ System.err.println("TestMsgVertex: Vertex 1 failed to receive " +
+ "messages in time");
+ voteToHalt();
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Thu Feb 16 22:12:31 2012
@@ -34,163 +34,171 @@ import org.apache.giraph.graph.WorkerCon
/**
* Vertex to allow unit testing of graph mutations.
*/
-public class SimpleMutateGraphVertex extends
- EdgeListVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
- /** Maximum number of ranges for vertex ids */
- private long maxRanges = 100;
- /** Class logger */
- private static Logger LOG =
- Logger.getLogger(SimpleMutateGraphVertex.class);
-
- /**
- * Unless we create a ridiculous number of vertices , we should not
- * collide within a vertex range defined by this method.
- *
- * @return Starting vertex id of the range
- */
- private long rangeVertexIdStart(int range) {
- return (Long.MAX_VALUE / maxRanges) * range;
+public class SimpleMutateGraphVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Class logger */
+ private static Logger LOG =
+ Logger.getLogger(SimpleMutateGraphVertex.class);
+ /** Maximum number of ranges for vertex ids */
+ private long maxRanges = 100;
+
+
+ /**
+ * Unless we create a ridiculous number of vertices , we should not
+ * collide within a vertex range defined by this method.
+ *
+ * @param range Range index
+ * @return Starting vertex id of the range
+ */
+ private long rangeVertexIdStart(int range) {
+ return (Long.MAX_VALUE / maxRanges) * range;
+ }
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator)
+ throws IOException {
+ SimpleMutateGraphVertexWorkerContext workerContext =
+ (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
+ if (getSuperstep() == 0) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 1) {
+ // Send messages to vertices that are sure not to exist
+ // (creating them)
+ LongWritable destVertexId =
+ new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
+ sendMsg(destVertexId, new DoubleWritable(0.0));
+ } else if (getSuperstep() == 2) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 3) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumVertices() +
+ " vertices when should have " + vertexCount * 2 +
+ " on superstep " + getSuperstep());
+ }
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount != getNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumEdges() +
+ " edges when should have " + edgeCount +
+ " on superstep " + getSuperstep());
+ }
+ // Create vertices that are sure not to exist (doubling vertices)
+ LongWritable vertexIndex =
+ new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+ BasicVertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> vertex =
+ instantiateVertex(vertexIndex, null, null, null);
+ addVertexRequest(vertex);
+ // Add edges to those remote vertices as well
+ addEdgeRequest(vertexIndex,
+ new Edge<LongWritable, FloatWritable>(
+ getVertexId(), new FloatWritable(0.0f)));
+ } else if (getSuperstep() == 4) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 5) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumVertices() +
+ " when should have " + vertexCount * 2 +
+ " on superstep " + getSuperstep());
+ }
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount + vertexCount != getNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumEdges() +
+ " edges when should have " + edgeCount + vertexCount +
+ " on superstep " + getSuperstep());
+ }
+ // Remove the edges created in superstep 3
+ LongWritable vertexIndex =
+ new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+ workerContext.increaseEdgesRemoved();
+ removeEdgeRequest(vertexIndex, getVertexId());
+ } else if (getSuperstep() == 6) {
+ // Remove all the vertices created in superstep 3
+ if (getVertexId().compareTo(
+ new LongWritable(rangeVertexIdStart(3))) >= 0) {
+ removeVertexRequest(getVertexId());
+ }
+ } else if (getSuperstep() == 7) {
+ long origEdgeCount = workerContext.getOrigEdgeCount();
+ if (origEdgeCount != getNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumEdges() +
+ " edges when should have " + origEdgeCount +
+ " on superstep " + getSuperstep());
+ }
+ } else if (getSuperstep() == 8) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount / 2 != getNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getNumVertices() +
+ " vertices when should have " + vertexCount / 2 +
+ " on superstep " + getSuperstep());
+ }
+ } else {
+ voteToHalt();
}
+ }
+
+ /**
+ * Worker context used with {@link SimpleMutateGraphVertex}.
+ */
+ public static class SimpleMutateGraphVertexWorkerContext
+ extends WorkerContext {
+ /** Cached vertex count */
+ private long vertexCount;
+ /** Cached edge count */
+ private long edgeCount;
+ /** Original number of edges */
+ private long origEdgeCount;
+ /** Number of edges removed during superstep */
+ private int edgesRemoved = 0;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException { }
+
+ @Override
+ public void postApplication() { }
@Override
- public void compute(Iterator<DoubleWritable> msgIterator)
- throws IOException {
+ public void preSuperstep() { }
- SimpleMutateGraphVertexWorkerContext workerContext =
- (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
- if (getSuperstep() == 0) {
- } else if (getSuperstep() == 1) {
- // Send messages to vertices that are sure not to exist
- // (creating them)
- LongWritable destVertexId =
- new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
- sendMsg(destVertexId, new DoubleWritable(0.0));
- } else if (getSuperstep() == 2) {
- } else if (getSuperstep() == 3) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
- " vertices when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount != getNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
- " edges when should have " + edgeCount +
- " on superstep " + getSuperstep());
- }
- // Create vertices that are sure not to exist (doubling vertices)
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
- BasicVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> vertex =
- instantiateVertex(vertexIndex, null, null, null);
- addVertexRequest(vertex);
- // Add edges to those remote vertices as well
- addEdgeRequest(vertexIndex,
- new Edge<LongWritable, FloatWritable>(
- getVertexId(), new FloatWritable(0.0f)));
- } else if (getSuperstep() == 4) {
- } else if (getSuperstep() == 5) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
- " when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount + vertexCount != getNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
- " edges when should have " + edgeCount + vertexCount +
- " on superstep " + getSuperstep());
- }
- // Remove the edges created in superstep 3
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
- workerContext.increaseEdgesRemoved();
- removeEdgeRequest(vertexIndex, getVertexId());
- } else if (getSuperstep() == 6) {
- // Remove all the vertices created in superstep 3
- if (getVertexId().compareTo(
- new LongWritable(rangeVertexIdStart(3))) >= 0) {
- removeVertexRequest(getVertexId());
- }
- } else if (getSuperstep() == 7) {
- long orig_edge_count = workerContext.getOrigEdgeCount();
- if (orig_edge_count != getNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
- " edges when should have " + orig_edge_count +
- " on superstep " + getSuperstep());
- }
- } else if (getSuperstep() == 8) {
- long vertex_count = workerContext.getVertexCount();
- if (vertex_count / 2 != getNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
- " vertices when should have " + vertex_count / 2 +
- " on superstep " + getSuperstep());
- }
- }
- else {
- voteToHalt();
- }
- }
-
- public static class SimpleMutateGraphVertexWorkerContext
- extends WorkerContext {
- /** Cached vertex count */
- private long vertexCount;
- /** Cached edge count */
- private long edgeCount;
- /** Original number of edges */
- private long origEdgeCount;
- /** Number of edges removed during superstep */
- private int edgesRemoved = 0;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException { }
-
- @Override
- public void postApplication() { }
-
- @Override
- public void preSuperstep() { }
-
- @Override
- public void postSuperstep() {
- vertexCount = getNumVertices();
- edgeCount = getNumEdges();
- if (getSuperstep() == 1) {
- origEdgeCount = edgeCount;
- }
- LOG.info("Got " + vertexCount + " vertices, " +
- edgeCount + " edges on superstep " +
- getSuperstep());
- LOG.info("Removed " + edgesRemoved);
- edgesRemoved = 0;
- }
-
- public long getVertexCount() {
- return vertexCount;
- }
-
- public long getEdgeCount() {
- return edgeCount;
- }
-
- public long getOrigEdgeCount() {
- return origEdgeCount;
- }
-
- public void increaseEdgesRemoved() {
- this.edgesRemoved++;
- }
+ @Override
+ public void postSuperstep() {
+ vertexCount = getNumVertices();
+ edgeCount = getNumEdges();
+ if (getSuperstep() == 1) {
+ origEdgeCount = edgeCount;
+ }
+ LOG.info("Got " + vertexCount + " vertices, " +
+ edgeCount + " edges on superstep " +
+ getSuperstep());
+ LOG.info("Removed " + edgesRemoved);
+ edgesRemoved = 0;
+ }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ public long getOrigEdgeCount() {
+ return origEdgeCount;
+ }
+
+ /**
+ * Increase the number of edges removed by one.
+ */
+ public void increaseEdgesRemoved() {
+ this.edgesRemoved++;
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Feb 16 22:12:31 2012
@@ -45,205 +45,230 @@ import java.util.Map;
* Demonstrates the basic Pregel PageRank implementation.
*/
public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
- /** Number of supersteps for this test */
- public static final int MAX_SUPERSTEPS = 30;
- /** Logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertex.class);
+ /** Number of supersteps for this test */
+ public static final int MAX_SUPERSTEPS = 30;
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimplePageRankVertex.class);
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
+ MinAggregator minAggreg = (MinAggregator) getAggregator("min");
+ MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ while (msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
+ setVertexValue(vertexValue);
+ maxAggreg.aggregate(vertexValue);
+ minAggreg.aggregate(vertexValue);
+ sumAggreg.aggregate(1L);
+ LOG.info(getVertexId() + ": PageRank=" + vertexValue +
+ " max=" + maxAggreg.getAggregatedValue() +
+ " min=" + minAggreg.getAggregatedValue());
+ }
+
+ if (getSuperstep() < MAX_SUPERSTEPS) {
+ long edges = getNumOutEdges();
+ sendMsgToAllEdges(
+ new DoubleWritable(getVertexValue().get() / edges));
+ } else {
+ voteToHalt();
+ }
+ }
+
+ /**
+ * Worker context used with {@link SimplePageRankVertex}.
+ */
+ public static class SimplePageRankVertexWorkerContext extends
+ WorkerContext {
+ /** Final max value for verification for local jobs */
+ private static double FINAL_MAX;
+ /** Final min value for verification for local jobs */
+ private static double FINAL_MIN;
+ /** Final sum value for verification for local jobs */
+ private static long FINAL_SUM;
+
+ public static double getFinalMax() {
+ return FINAL_MAX;
+ }
- @Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
- LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
- MinAggregator minAggreg = (MinAggregator) getAggregator("min");
- MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
- if (getSuperstep() >= 1) {
- double sum = 0;
- while (msgIterator.hasNext()) {
- sum += msgIterator.next().get();
- }
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
- setVertexValue(vertexValue);
- maxAggreg.aggregate(vertexValue);
- minAggreg.aggregate(vertexValue);
- sumAggreg.aggregate(1L);
- LOG.info(getVertexId() + ": PageRank=" + vertexValue +
- " max=" + maxAggreg.getAggregatedValue() +
- " min=" + minAggreg.getAggregatedValue());
- }
+ public static double getFinalMin() {
+ return FINAL_MIN;
+ }
- if (getSuperstep() < MAX_SUPERSTEPS) {
- long edges = getNumOutEdges();
- sendMsgToAllEdges(
- new DoubleWritable(getVertexValue().get() / edges));
- } else {
- voteToHalt();
- }
+ public static long getFinalSum() {
+ return FINAL_SUM;
}
- public static class SimplePageRankVertexWorkerContext extends
- WorkerContext {
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ registerAggregator("sum", LongSumAggregator.class);
+ registerAggregator("min", MinAggregator.class);
+ registerAggregator("max", MaxAggregator.class);
+ }
- public static double finalMax, finalMin;
- public static long finalSum;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
-
- registerAggregator("sum", LongSumAggregator.class);
- registerAggregator("min", MinAggregator.class);
- registerAggregator("max", MaxAggregator.class);
- }
-
- @Override
- public void postApplication() {
-
- LongSumAggregator sumAggreg =
- (LongSumAggregator) getAggregator("sum");
- MinAggregator minAggreg =
- (MinAggregator) getAggregator("min");
- MaxAggregator maxAggreg =
- (MaxAggregator) getAggregator("max");
-
- finalSum = sumAggreg.getAggregatedValue().get();
- finalMax = maxAggreg.getAggregatedValue().get();
- finalMin = minAggreg.getAggregatedValue().get();
-
- LOG.info("aggregatedNumVertices=" + finalSum);
- LOG.info("aggregatedMaxPageRank=" + finalMax);
- LOG.info("aggregatedMinPageRank=" + finalMin);
- }
-
- @Override
- public void preSuperstep() {
-
- LongSumAggregator sumAggreg =
- (LongSumAggregator) getAggregator("sum");
- MinAggregator minAggreg =
- (MinAggregator) getAggregator("min");
- MaxAggregator maxAggreg =
- (MaxAggregator) getAggregator("max");
-
- if (getSuperstep() >= 3) {
- LOG.info("aggregatedNumVertices=" +
- sumAggreg.getAggregatedValue() +
- " NumVertices=" + getNumVertices());
- if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
- throw new RuntimeException("wrong value of SumAggreg: " +
- sumAggreg.getAggregatedValue() + ", should be: " +
- getNumVertices());
- }
- DoubleWritable maxPagerank =
- (DoubleWritable) maxAggreg.getAggregatedValue();
- LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
- DoubleWritable minPagerank =
- (DoubleWritable) minAggreg.getAggregatedValue();
- LOG.info("aggregatedMinPageRank=" + minPagerank.get());
- }
- useAggregator("sum");
- useAggregator("min");
- useAggregator("max");
- sumAggreg.setAggregatedValue(new LongWritable(0L));
- }
+ @Override
+ public void postApplication() {
- @Override
- public void postSuperstep() { }
+ LongSumAggregator sumAggreg =
+ (LongSumAggregator) getAggregator("sum");
+ MinAggregator minAggreg =
+ (MinAggregator) getAggregator("min");
+ MaxAggregator maxAggreg =
+ (MaxAggregator) getAggregator("max");
+
+ FINAL_SUM = sumAggreg.getAggregatedValue().get();
+ FINAL_MAX = maxAggreg.getAggregatedValue().get();
+ FINAL_MIN = minAggreg.getAggregatedValue().get();
+
+ LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+ LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+ LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
}
-
- /**
- * Simple VertexReader that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexReader extends
- GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertexReader.class);
-
- public SimplePageRankVertexReader() {
- super();
- }
- @Override
- public boolean nextVertex() {
- return totalRecords > recordsRead;
- }
+ @Override
+ public void preSuperstep() {
- @Override
- public BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- getCurrentVertex() throws IOException {
- BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- vertex = BspUtils.createVertex(configuration);
-
- LongWritable vertexId = new LongWritable(
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
- DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
- long destVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
- edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
- vertex.initialize(vertexId, vertexValue, edges, null);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
- ", vertexValue=" + vertex.getVertexValue() +
- ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
+ LongSumAggregator sumAggreg =
+ (LongSumAggregator) getAggregator("sum");
+ MinAggregator minAggreg =
+ (MinAggregator) getAggregator("min");
+ MaxAggregator maxAggreg =
+ (MaxAggregator) getAggregator("max");
+
+ if (getSuperstep() >= 3) {
+ LOG.info("aggregatedNumVertices=" +
+ sumAggreg.getAggregatedValue() +
+ " NumVertices=" + getNumVertices());
+ if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+ throw new RuntimeException("wrong value of SumAggreg: " +
+ sumAggreg.getAggregatedValue() + ", should be: " +
+ getNumVertices());
+ }
+ DoubleWritable maxPagerank =
+ (DoubleWritable) maxAggreg.getAggregatedValue();
+ LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+ DoubleWritable minPagerank =
+ (DoubleWritable) minAggreg.getAggregatedValue();
+ LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+ }
+ useAggregator("sum");
+ useAggregator("min");
+ useAggregator("max");
+ sumAggreg.setAggregatedValue(new LongWritable(0L));
}
- /**
- * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable> {
- @Override
- public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new SimplePageRankVertexReader();
- }
- }
+ @Override
+ public void postSuperstep() { }
+ }
+
+ /**
+ * Simple VertexReader that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexReader extends
+ GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimplePageRankVertexReader.class);
/**
- * Simple VertexWriter that supports {@link SimplePageRankVertex}
+ * Constructor.
*/
- public static class SimplePageRankVertexWriter extends
- TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
- public SimplePageRankVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
+ public SimplePageRankVertexReader() {
+ super();
+ }
- @Override
- public void writeVertex(
- BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
- }
+ @Override
+ public boolean nextVertex() {
+ return totalRecords > recordsRead;
}
+ @Override
+ public BasicVertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable>
+ getCurrentVertex() throws IOException {
+ BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ vertex = BspUtils.createVertex(configuration);
+
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+ long destVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+ edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+ vertex.initialize(vertexId, vertexValue, edges, null);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+ ", vertexValue=" + vertex.getVertexValue() +
+ ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+ }
+ return vertex;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable,
+ DoubleWritable, FloatWritable, DoubleWritable> {
+ @Override
+ public VertexReader<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new SimplePageRankVertexReader();
+ }
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexWriter extends
+ TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
/**
- * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+ * Constructor with line writer.
+ *
+ * @param lineRecordWriter Line writer that will do the writing.
*/
- public static class SimplePageRankVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ public SimplePageRankVertexWriter(
+ RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
- @Override
- public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimplePageRankVertexWriter(recordWriter);
- }
+ @Override
+ public void writeVertex(
+ BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
+ */
+ public static class SimplePageRankVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ @Override
+ public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new SimplePageRankVertexWriter(recordWriter);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Thu Feb 16 22:12:31 2012
@@ -56,215 +56,230 @@ import java.util.Map;
* Demonstrates the basic Pregel shortest paths implementation.
*/
public class SimpleShortestPathsVertex extends
- EdgeListVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> implements Tool {
- /** Configuration */
- private Configuration conf;
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleShortestPathsVertex.class);
- /** The shortest paths id */
- public static String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
- /** Default shortest paths id */
- public static long SOURCE_ID_DEFAULT = 1;
+ EdgeListVertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> implements Tool {
+ /** The shortest paths id */
+ public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
+ /** Default shortest paths id */
+ public static final long SOURCE_ID_DEFAULT = 1;
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleShortestPathsVertex.class);
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Is this vertex the source id?
+ *
+ * @return True if the source id
+ */
+ private boolean isSource() {
+ return getVertexId().get() ==
+ getContext().getConfiguration().getLong(SOURCE_ID,
+ SOURCE_ID_DEFAULT);
+ }
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ if (getSuperstep() == 0) {
+ setVertexValue(new DoubleWritable(Double.MAX_VALUE));
+ }
+ double minDist = isSource() ? 0d : Double.MAX_VALUE;
+ while (msgIterator.hasNext()) {
+ minDist = Math.min(minDist, msgIterator.next().get());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
+ " vertex value = " + getVertexValue());
+ }
+ if (minDist < getVertexValue().get()) {
+ setVertexValue(new DoubleWritable(minDist));
+ for (LongWritable targetVertexId : this) {
+ FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex " + getVertexId() + " sent to " +
+ targetVertexId + " = " +
+ (minDist + edgeValue.get()));
+ }
+ sendMsg(targetVertexId,
+ new DoubleWritable(minDist + edgeValue.get()));
+ }
+ }
+ voteToHalt();
+ }
+
+ /**
+ * VertexInputFormat that supports {@link SimpleShortestPathsVertex}
+ */
+ public static class SimpleShortestPathsVertexInputFormat extends
+ TextVertexInputFormat<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
+ @Override
+ public VertexReader<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new SimpleShortestPathsVertexReader(
+ textInputFormat.createRecordReader(split, context));
+ }
+ }
+
+ /**
+ * VertexReader that supports {@link SimpleShortestPathsVertex}. In this
+ * case, the edge values are not used. The files should be in the
+ * following JSON format:
+ * JSONArray(<vertex id>, <vertex value>,
+ * JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
+ * Here is an example with vertex id 1, vertex value 4.3, and two edges.
+ * First edge has a destination vertex 2, edge value 2.1.
+ * Second edge has a destination vertex 3, edge value 0.7.
+ * [1,4.3,[[2,2.1],[3,0.7]]]
+ */
+ public static class SimpleShortestPathsVertexReader extends
+ TextVertexReader<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
/**
- * Is this vertex the source id?
+ * Constructor with the line record reader.
*
- * @return True if the source id
+ * @param lineRecordReader Will read from this line.
*/
- private boolean isSource() {
- return (getVertexId().get() ==
- getContext().getConfiguration().getLong(SOURCE_ID,
- SOURCE_ID_DEFAULT));
+ public SimpleShortestPathsVertexReader(
+ RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
}
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
- if (getSuperstep() == 0) {
- setVertexValue(new DoubleWritable(Double.MAX_VALUE));
- }
- double minDist = isSource() ? 0d : Double.MAX_VALUE;
- while (msgIterator.hasNext()) {
- minDist = Math.min(minDist, msgIterator.next().get());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
- " vertex value = " + getVertexValue());
- }
- if (minDist < getVertexValue().get()) {
- setVertexValue(new DoubleWritable(minDist));
- for (LongWritable targetVertexId : this) {
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getVertexId() + " sent to " +
- targetVertexId + " = " +
- (minDist + edgeValue.get()));
- }
- sendMsg(targetVertexId,
- new DoubleWritable(minDist + edgeValue.get()));
- }
- }
- voteToHalt();
- }
-
- /**
- * VertexInputFormat that supports {@link SimpleShortestPathsVertex}
- */
- public static class SimpleShortestPathsVertexInputFormat extends
- TextVertexInputFormat<LongWritable,
- DoubleWritable,
- FloatWritable,
- DoubleWritable> {
- @Override
- public VertexReader<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new SimpleShortestPathsVertexReader(
- textInputFormat.createRecordReader(split, context));
- }
- }
-
- /**
- * VertexReader that supports {@link SimpleShortestPathsVertex}. In this
- * case, the edge values are not used. The files should be in the
- * following JSON format:
- * JSONArray(<vertex id>, <vertex value>,
- * JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
- * Here is an example with vertex id 1, vertex value 4.3, and two edges.
- * First edge has a destination vertex 2, edge value 2.1.
- * Second edge has a destination vertex 3, edge value 0.7.
- * [1,4.3,[[2,2.1],[3,0.7]]]
- */
- public static class SimpleShortestPathsVertexReader extends
- TextVertexReader<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable> {
-
- public SimpleShortestPathsVertexReader(
- RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
-
- @Override
- public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- BasicVertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable>createVertex(getContext().getConfiguration());
-
- Text line = getRecordReader().getCurrentValue();
- try {
- JSONArray jsonVertex = new JSONArray(line.toString());
- LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
- DoubleWritable vertexValue = new DoubleWritable(jsonVertex.getDouble(1));
- Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
- JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
- for (int i = 0; i < jsonEdgeArray.length(); ++i) {
- JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
- edges.put(new LongWritable(jsonEdge.getLong(0)),
- new FloatWritable((float) jsonEdge.getDouble(1)));
- }
- vertex.initialize(vertexId, vertexValue, edges, null);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Couldn't get vertex from line " + line, e);
- }
- return vertex;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
- }
-
- /**
- * VertexOutputFormat that supports {@link SimpleShortestPathsVertex}
- */
- public static class SimpleShortestPathsVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable,
- FloatWritable> {
-
- @Override
- public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimpleShortestPathsVertexWriter(recordWriter);
- }
- }
-
- /**
- * VertexWriter that supports {@link SimpleShortestPathsVertex}
- */
- public static class SimpleShortestPathsVertexWriter extends
- TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
- public SimpleShortestPathsVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
- FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- JSONArray jsonVertex = new JSONArray();
- try {
- jsonVertex.put(vertex.getVertexId().get());
- jsonVertex.put(vertex.getVertexValue().get());
- JSONArray jsonEdgeArray = new JSONArray();
- for (LongWritable targetVertexId : vertex) {
- JSONArray jsonEdge = new JSONArray();
- jsonEdge.put(targetVertexId.get());
- jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
- jsonEdgeArray.put(jsonEdge);
- }
- jsonVertex.put(jsonEdgeArray);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "writeVertex: Couldn't write vertex " + vertex);
- }
- getRecordWriter().write(new Text(jsonVertex.toString()), null);
- }
+ public BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ BasicVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> vertex =
+ BspUtils.<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable>createVertex(getContext().getConfiguration());
+
+ Text line = getRecordReader().getCurrentValue();
+ try {
+ JSONArray jsonVertex = new JSONArray(line.toString());
+ LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
+ DoubleWritable vertexValue =
+ new DoubleWritable(jsonVertex.getDouble(1));
+ Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+ JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+ for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+ JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+ edges.put(new LongWritable(jsonEdge.getLong(0)),
+ new FloatWritable((float) jsonEdge.getDouble(1)));
+ }
+ vertex.initialize(vertexId, vertexValue, edges, null);
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "next: Couldn't get vertex from line " + line, e);
+ }
+ return vertex;
}
@Override
- public Configuration getConf() {
- return conf;
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
}
+ }
+ /**
+ * VertexOutputFormat that supports {@link SimpleShortestPathsVertex}
+ */
+ public static class SimpleShortestPathsVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, DoubleWritable,
+ FloatWritable> {
@Override
- public void setConf(Configuration conf) {
- this.conf = conf;
+ public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new SimpleShortestPathsVertexWriter(recordWriter);
+ }
+ }
+
+ /**
+ * VertexWriter that supports {@link SimpleShortestPathsVertex}
+ */
+ public static class SimpleShortestPathsVertexWriter extends
+ TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
+ /**
+ * Vertex writer with the internal line writer.
+ *
+ * @param lineRecordWriter Wil actually be written to.
+ */
+ public SimpleShortestPathsVertexWriter(
+ RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
}
@Override
- public int run(String[] argArray) throws Exception {
- Preconditions.checkArgument(argArray.length == 4,
- "run: Must have 4 arguments <input path> <output path> " +
- "<source vertex id> <# of workers>");
-
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.setVertexClass(getClass());
- job.setVertexInputFormatClass(
- SimpleShortestPathsVertexInputFormat.class);
- job.setVertexOutputFormatClass(
- SimpleShortestPathsVertexOutputFormat.class);
- FileInputFormat.addInputPath(job, new Path(argArray[0]));
- FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
- job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
- Long.parseLong(argArray[2]));
- job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
- Integer.parseInt(argArray[3]),
- 100.0f);
-
- return job.run(true) ? 0 : -1;
- }
-
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
- }
+ public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
+ FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ JSONArray jsonVertex = new JSONArray();
+ try {
+ jsonVertex.put(vertex.getVertexId().get());
+ jsonVertex.put(vertex.getVertexValue().get());
+ JSONArray jsonEdgeArray = new JSONArray();
+ for (LongWritable targetVertexId : vertex) {
+ JSONArray jsonEdge = new JSONArray();
+ jsonEdge.put(targetVertexId.get());
+ jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
+ jsonEdgeArray.put(jsonEdge);
+ }
+ jsonVertex.put(jsonEdgeArray);
+ } catch (JSONException e) {
+ throw new IllegalArgumentException(
+ "writeVertex: Couldn't write vertex " + vertex);
+ }
+ getRecordWriter().write(new Text(jsonVertex.toString()), null);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public int run(String[] argArray) throws Exception {
+ Preconditions.checkArgument(argArray.length == 4,
+ "run: Must have 4 arguments <input path> <output path> " +
+ "<source vertex id> <# of workers>");
+
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.setVertexClass(getClass());
+ job.setVertexInputFormatClass(
+ SimpleShortestPathsVertexInputFormat.class);
+ job.setVertexOutputFormatClass(
+ SimpleShortestPathsVertexOutputFormat.class);
+ FileInputFormat.addInputPath(job, new Path(argArray[0]));
+ FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
+ job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
+ Long.parseLong(argArray[2]));
+ job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
+ Integer.parseInt(argArray[3]),
+ 100.0f);
+
+ return job.run(true) ? 0 : -1;
+ }
+
+ /**
+ * Can be used for command line execution.
+ *
+ * @param args Command line arguments.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java Thu Feb 16 22:12:31 2012
@@ -31,18 +31,18 @@ import org.apache.giraph.graph.VertexCom
* Test whether combiner is called by summing up the messages.
*/
public class SimpleSumCombiner
- extends VertexCombiner<LongWritable, IntWritable> {
+ extends VertexCombiner<LongWritable, IntWritable> {
- @Override
- public Iterable<IntWritable> combine(LongWritable vertexIndex,
- Iterable<IntWritable> messages) throws IOException {
- int sum = 0;
- for (IntWritable msg : messages) {
- sum += msg.get();
- }
- List<IntWritable> value = new ArrayList<IntWritable>();
- value.add(new IntWritable(sum));
-
- return value;
+ @Override
+ public Iterable<IntWritable> combine(LongWritable vertexIndex,
+ Iterable<IntWritable> messages) throws IOException {
+ int sum = 0;
+ for (IntWritable msg : messages) {
+ sum += msg.get();
}
+ List<IntWritable> value = new ArrayList<IntWritable>();
+ value.add(new IntWritable(sum));
+
+ return value;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Feb 16 22:12:31 2012
@@ -44,115 +44,119 @@ import java.util.Map;
* finishes.
*/
public class SimpleSuperstepVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- @Override
- public void compute(Iterator<IntWritable> msgIterator) {
- if (getSuperstep() > 3) {
- voteToHalt();
- }
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ @Override
+ public void compute(Iterator<IntWritable> msgIterator) {
+ if (getSuperstep() > 3) {
+ voteToHalt();
}
+ }
+ /**
+ * Simple VertexReader that supports {@link SimpleSuperstepVertex}
+ */
+ public static class SimpleSuperstepVertexReader extends
+ GeneratedVertexReader<LongWritable, IntWritable,
+ FloatWritable, IntWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleSuperstepVertexReader.class);
/**
- * Simple VertexReader that supports {@link SimpleSuperstepVertex}
+ * Constructor.
*/
- public static class SimpleSuperstepVertexReader extends
- GeneratedVertexReader<LongWritable, IntWritable,
- FloatWritable, IntWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleSuperstepVertexReader.class);
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return totalRecords > recordsRead;
- }
-
- public SimpleSuperstepVertexReader() {
- super();
- }
-
- @Override
- public BasicVertex<LongWritable, IntWritable, FloatWritable,
- IntWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- BasicVertex<LongWritable, IntWritable,
- FloatWritable, IntWritable> vertex =
- BspUtils.<LongWritable, IntWritable,
- FloatWritable, IntWritable>createVertex(
- configuration);
- long tmpId = reverseIdOrder ?
- ((inputSplit.getSplitIndex() + 1) * totalRecords) -
- recordsRead - 1 :
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
- LongWritable vertexId = new LongWritable(tmpId);
- IntWritable vertexValue =
- new IntWritable((int) (vertexId.get() * 10));
- Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
- long destVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- edgeMap.put(new LongWritable(destVertexId),
- new FloatWritable(edgeValue));
- vertex.initialize(vertexId, vertexValue, edgeMap, null);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
- ", vertexValue=" + vertex.getVertexValue() +
- ", destinationId=" + destVertexId +
- ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
+ public SimpleSuperstepVertexReader() {
+ super();
}
- /**
- * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable,
- IntWritable, FloatWritable, IntWritable> {
- @Override
- public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
- createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new SimpleSuperstepVertexReader();
- }
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return totalRecords > recordsRead;
}
- /**
- * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexWriter extends
- TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
- public SimpleSuperstepVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(
- BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
- }
+ @Override
+ public BasicVertex<LongWritable, IntWritable, FloatWritable,
+ IntWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ BasicVertex<LongWritable, IntWritable,
+ FloatWritable, IntWritable> vertex =
+ BspUtils.<LongWritable, IntWritable, FloatWritable,
+ IntWritable>createVertex(configuration);
+ long tmpId = reverseIdOrder ?
+ ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+ recordsRead - 1 :
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+ LongWritable vertexId = new LongWritable(tmpId);
+ IntWritable vertexValue =
+ new IntWritable((int) (vertexId.get() * 10));
+ Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
+ long destVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ edgeMap.put(new LongWritable(destVertexId),
+ new FloatWritable(edgeValue));
+ vertex.initialize(vertexId, vertexValue, edgeMap, null);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
+ ", vertexValue=" + vertex.getVertexValue() +
+ ", destinationId=" + destVertexId +
+ ", edgeValue=" + edgeValue);
+ }
+ return vertex;
}
+ }
+ /**
+ * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
+ */
+ public static class SimpleSuperstepVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable,
+ IntWritable, FloatWritable, IntWritable> {
+ @Override
+ public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new SimpleSuperstepVertexReader();
+ }
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
+ */
+ public static class SimpleSuperstepVertexWriter extends
+ TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
/**
- * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
+ * Constructor with the line record writer.
+ *
+ * @param lineRecordWriter Writer to write to.
*/
- public static class SimpleSuperstepVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ public SimpleSuperstepVertexWriter(
+ RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
+ }
- @Override
- public VertexWriter<LongWritable, IntWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimpleSuperstepVertexWriter(recordWriter);
- }
+ @Override
+ public void writeVertex(BasicVertex<LongWritable, IntWritable,
+ FloatWritable, ?> vertex) throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
+ }
+ }
+
+ /**
+ * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
+ */
+ public static class SimpleSuperstepVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ @Override
+ public VertexWriter<LongWritable, IntWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new SimpleSuperstepVertexWriter(recordWriter);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -33,39 +33,38 @@ import org.apache.giraph.lib.TextVertexO
* Simple text based vertex output format example.
*/
public class SimpleTextVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ /**
+ * Simple text based vertex writer
+ */
+ private static class SimpleTextVertexWriter
+ extends TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
/**
- * Simple text based vertex writer
+ * Initialize with the LineRecordWriter.
+ *
+ * @param lineRecordWriter Line record writer from TextOutputFormat
*/
- private static class SimpleTextVertexWriter
- extends TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
-
- /**
- * Initialize with the LineRecordWriter.
- *
- * @param lineRecordWriter Line record writer from TextOutputFormat
- */
- public SimpleTextVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(
- BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
- }
+ public SimpleTextVertexWriter(
+ RecordWriter<Text, Text> lineRecordWriter) {
+ super(lineRecordWriter);
}
@Override
- public VertexWriter<LongWritable, IntWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimpleTextVertexWriter(recordWriter);
+ public void writeVertex(
+ BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getVertexId().toString()),
+ new Text(vertex.getVertexValue().toString()));
}
+ }
+
+ @Override
+ public VertexWriter<LongWritable, IntWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new SimpleTextVertexWriter(recordWriter);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Thu Feb 16 22:12:31 2012
@@ -22,7 +22,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepVertex.
+ SimpleSuperstepVertexInputFormat;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.WorkerContext;
@@ -43,123 +44,139 @@ import org.apache.hadoop.util.ToolRunner
* computation.
*/
public class SimpleVertexWithWorkerContext extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
- implements Tool {
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
+ implements Tool {
+ /** Directory name of where to write. */
+ public static final String OUTPUTDIR = "svwwc.outputdir";
+ /** Halting condition for the number of supersteps */
+ private static final int TESTLENGTH = 30;
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator)
+ throws IOException {
+
+ long superstep = getSuperstep();
+
+ if (superstep < TESTLENGTH) {
+ EmitterWorkerContext emitter =
+ (EmitterWorkerContext) getWorkerContext();
+ emitter.emit("vertexId=" + getVertexId() +
+ " superstep=" + superstep + "\n");
+ } else {
+ voteToHalt();
+ }
+ }
- public static final String OUTPUTDIR = "svwwc.outputdir";
- private static final int TESTLENGTH = 30;
+ /**
+ * Example worker context to emit data as part of a superstep.
+ */
+ @SuppressWarnings("rawtypes")
+ public static class EmitterWorkerContext extends WorkerContext {
+ /** File name prefix */
+ private static final String FILENAME = "emitter_";
+ /** Output stream to dump the strings. */
+ private DataOutputStream out;
@Override
- public void compute(Iterator<DoubleWritable> msgIterator)
- throws IOException {
-
- long superstep = getSuperstep();
-
- if (superstep < TESTLENGTH) {
- EmitterWorkerContext emitter =
- (EmitterWorkerContext) getWorkerContext();
- emitter.emit("vertexId=" + getVertexId() +
- " superstep=" + superstep + "\n");
- } else {
- voteToHalt();
- }
+ public void preApplication() {
+ Context context = getContext();
+ FileSystem fs;
+
+ try {
+ fs = FileSystem.get(context.getConfiguration());
+
+ String p = context.getConfiguration()
+ .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
+ if (p == null) {
+ throw new IllegalArgumentException(
+ SimpleVertexWithWorkerContext.OUTPUTDIR +
+ " undefined!");
+ }
+
+ Path path = new Path(p);
+ if (!fs.exists(path)) {
+ throw new IllegalArgumentException(path +
+ " doesn't exist");
+ }
+
+ Path outF = new Path(path, FILENAME +
+ context.getTaskAttemptID());
+ if (fs.exists(outF)) {
+ throw new IllegalArgumentException(outF +
+ " aready exists");
+ }
+
+ out = fs.create(outF);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "can't initialize WorkerContext", e);
+ }
}
- @SuppressWarnings("rawtypes")
- public static class EmitterWorkerContext extends WorkerContext {
-
- private static final String FILENAME = "emitter_";
- private DataOutputStream out;
-
- @Override
- public void preApplication() {
- Context context = getContext();
- FileSystem fs;
-
- try {
- fs = FileSystem.get(context.getConfiguration());
-
- String p = context.getConfiguration()
- .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
- if (p == null) {
- throw new IllegalArgumentException(
- SimpleVertexWithWorkerContext.OUTPUTDIR +
- " undefined!");
- }
-
- Path path = new Path(p);
- if (!fs.exists(path)) {
- throw new IllegalArgumentException(path +
- " doesn't exist");
- }
-
- Path outF = new Path(path, FILENAME +
- context.getTaskAttemptID());
- if (fs.exists(outF)) {
- throw new IllegalArgumentException(outF +
- " aready exists");
- }
-
- out = fs.create(outF);
- } catch (IOException e) {
- throw new RuntimeException(
- "can't initialize WorkerContext", e);
- }
- }
-
- @Override
- public void postApplication() {
- if (out != null) {
- try {
- out.flush();
- out.close();
- } catch (IOException e) {
- throw new RuntimeException(
- "can't finalize WorkerContext", e);
- }
- out = null;
- }
+ @Override
+ public void postApplication() {
+ if (out != null) {
+ try {
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "can't finalize WorkerContext", e);
}
+ out = null;
+ }
+ }
- @Override
- public void preSuperstep() { }
+ @Override
+ public void preSuperstep() { }
- @Override
- public void postSuperstep() { }
+ @Override
+ public void postSuperstep() { }
- public void emit(String s) {
- try {
- out.writeUTF(s);
- } catch (IOException e) {
- throw new RuntimeException("can't emit", e);
- }
- }
+ /**
+ * Write this string to the output stream.
+ *
+ * @param s String to dump.
+ */
+ public void emit(String s) {
+ try {
+ out.writeUTF(s);
+ } catch (IOException e) {
+ throw new RuntimeException("can't emit", e);
+ }
}
+ }
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- throw new IllegalArgumentException(
- "run: Must have 2 arguments <output path> <# of workers>");
- }
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.setVertexClass(getClass());
- job.setVertexInputFormatClass(
- SimpleSuperstepVertexInputFormat.class);
- job.setWorkerContextClass(EmitterWorkerContext.class);
- Configuration conf = job.getConfiguration();
- conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
- job.setWorkerConfiguration(Integer.parseInt(args[1]),
- Integer.parseInt(args[1]),
- 100.0f);
- if (job.run(true) == true) {
- return 0;
- } else {
- return -1;
- }
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "run: Must have 2 arguments <output path> <# of workers>");
}
-
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.setVertexClass(getClass());
+ job.setVertexInputFormatClass(
+ SimpleSuperstepVertexInputFormat.class);
+ job.setWorkerContextClass(EmitterWorkerContext.class);
+ Configuration conf = job.getConfiguration();
+ conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+ job.setWorkerConfiguration(Integer.parseInt(args[1]),
+ Integer.parseInt(args[1]),
+ 100.0f);
+ if (job.run(true)) {
+ return 0;
+ } else {
+ return -1;
}
-}
\ No newline at end of file
+ }
+
+ /**
+ * Executable from the command line.
+ *
+ * @param args Command line arguments.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SumAggregator.java Thu Feb 16 22:12:31 2012
@@ -24,30 +24,38 @@ import org.apache.giraph.graph.Aggregato
/**
* Aggregator for summing up values.
- *
*/
public class SumAggregator implements Aggregator<DoubleWritable> {
-
+ /** Aggregated sum */
private double sum = 0;
+ /**
+ * Aggregate a double.
+ *
+ * @param value Value to aggregate.
+ */
public void aggregate(double value) {
- sum += value;
+ sum += value;
}
+ @Override
public void aggregate(DoubleWritable value) {
- sum += value.get();
+ sum += value.get();
}
+ @Override
public void setAggregatedValue(DoubleWritable value) {
- sum = value.get();
+ sum = value.get();
}
+ @Override
public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(sum);
+ return new DoubleWritable(sum);
}
+ @Override
public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
+ return new DoubleWritable();
}
}