You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/07 23:09:36 UTC
svn commit: r1198972 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: aching
Date: Mon Nov 7 22:09:35 2011
New Revision: 1198972
URL: http://svn.apache.org/viewvc?rev=1198972&view=rev
Log:
GIRAPH-47: Export Worker's Context/State to vertices through
pre/post/Application/Superstep. (cmartella via aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/CODE_CONVENTIONS
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Nov 7 22:09:35 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-47: Export Worker's Context/State to vertices through
+ pre/post/Application/Superstep. (cmartella via aching)
+
GIRAPH-71: SequenceFileVertexInputFormat missing license header;
rat fails. (jghoman)
Modified: incubator/giraph/trunk/CODE_CONVENTIONS
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CODE_CONVENTIONS?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/CODE_CONVENTIONS (original)
+++ incubator/giraph/trunk/CODE_CONVENTIONS Mon Nov 7 22:09:35 2011
@@ -49,7 +49,9 @@ if (LOG.isInfoEnabled()) {
}
- All classes, members, and member methods should have Javadoc in the following
- style. C-style comments for javadoc and // comments for non-javadoc.
+ style. C-style comments for javadoc and // comments for non-javadoc. Also, the comment
+ block should have a line break that separates the comment section and the @ section.
+ See below.
/**
* This is an example class
@@ -76,6 +78,11 @@ public class Giraffe {
}
}
-- Class members should not begin with 'm_' or '_'.
+- Class members should not begin with 'm_' or '_'
- No warnings allowed, but be as specific as possible with warning suppression
-- Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
\ No newline at end of file
+- Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
+- Static variable names should be entirely capitalized and seperated by '_'
+ (i.e. private static int FOO_BAR_BAR = 2)
+- Non-static variable and method names should not begin capitalized and should only use
+ alphanumeric characters (i.e. int fooBarBar)
+- All classnames begin capitalized then use lower casing (i.e. class FooBarBar)
\ No newline at end of file
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Mon Nov 7 22:09:35 2011
@@ -46,19 +46,6 @@ public class PageRankBenchmark extends
public static String SUPERSTEP_COUNT = "PageRankBenchmark.superstepCount";
@Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void postApplication() {
- }
-
- @Override
- public void preSuperstep() {
- }
-
- @Override
public void compute(Iterator<DoubleWritable> msgIterator) {
if (getSuperstep() >= 1) {
double sum = 0;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Mon Nov 7 22:09:35 2011
@@ -45,38 +45,28 @@ public class RandomMessageBenchmark exte
private Configuration conf;
/** How many supersteps to run */
- public static String SUPERSTEP_COUNT = "RandomMessageBenchmark.superstepCount";
-
+ public static String SUPERSTEP_COUNT =
+ "RandomMessageBenchmark.superstepCount";
+
/** How many bytes per message */
- public static String NUM_BYTES_PER_MESSAGE = "RandomMessageBenchmark.numBytesPerMessage";
+ public static String NUM_BYTES_PER_MESSAGE =
+ "RandomMessageBenchmark.numBytesPerMessage";
/** How many bytes per message */
- public static String NUM_MESSAGES_PER_VERTEX = "RandomMessageBenchmark.numMessagesPerVertex";
-
+ public static String NUM_MESSAGES_PER_VERTEX =
+ "RandomMessageBenchmark.numMessagesPerVertex";
+
/** Random generator for random bytes message */
private Random rnd = new Random(System.currentTimeMillis());
@Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void postApplication() {
- }
-
- @Override
- public void preSuperstep() {
- }
-
- @Override
public void compute(Iterator<BytesWritable> msgIterator) {
byte [] message = new byte[getConf().getInt(NUM_BYTES_PER_MESSAGE, 16)];
int numMessage = getConf().getInt(NUM_MESSAGES_PER_VERTEX, 1);
if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
for (int i=0; i < numMessage; i++) {
- rnd.nextBytes(message);
+ rnd.nextBytes(message);
sendMsgToAllEdges(new BytesWritable(message));
- }
+ }
} else {
voteToHalt();
}
@@ -125,7 +115,7 @@ public class RandomMessageBenchmark exte
"flusher",
true,
"Number of flush threads");
-
+
HelpFormatter formatter = new HelpFormatter();
if (args.length == 0) {
formatter.printHelp(getClass().getName(), options, true);
@@ -178,11 +168,11 @@ public class RandomMessageBenchmark exte
SUPERSTEP_COUNT,
Integer.parseInt(cmd.getOptionValue('s')));
job.getConfiguration().setInt(
- RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
- Integer.parseInt(cmd.getOptionValue('b')));
+ RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+ Integer.parseInt(cmd.getOptionValue('b')));
job.getConfiguration().setInt(
- RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
- Integer.parseInt(cmd.getOptionValue('n')));
+ RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
+ Integer.parseInt(cmd.getOptionValue('n')));
boolean isVerbose = false;
if (cmd.hasOption('v')) {
@@ -193,8 +183,8 @@ public class RandomMessageBenchmark exte
Integer.parseInt(cmd.getOptionValue('s')));
}
if (cmd.hasOption('f')) {
- job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
- Integer.parseInt(cmd.getOptionValue('f')));
+ job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ Integer.parseInt(cmd.getOptionValue('f')));
}
if (job.run(isVerbose) == true) {
return 0;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Mon Nov 7 22:09:35 2011
@@ -18,7 +18,6 @@
package org.apache.giraph.bsp;
-import org.apache.giraph.graph.BasicVertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -39,14 +38,6 @@ public interface CentralizedService<I ex
void setup();
/**
- * Get the representative Vertex for this worker. It can used to
- * call pre/post application/superstep methods defined by the user.
- *
- * @return representation vertex
- */
- BasicVertex<I, V, E, M> getRepresentativeVertex();
-
- /**
* Get the current global superstep of the application to work on.
*
* @return global superstep (begins at INPUT_SUPERSTEP)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Mon Nov 7 22:09:35 2011
@@ -27,6 +27,7 @@ import org.apache.giraph.graph.Aggregato
import org.apache.giraph.graph.GraphMapper;
import org.apache.giraph.graph.VertexRange;
import org.apache.giraph.graph.BasicVertexRangeBalancer;
+import org.apache.giraph.graph.WorkerContext;
/**
* All workers should have access to this centralized service to
@@ -53,6 +54,12 @@ public interface CentralizedServiceWorke
*/
int getPort();
+ /**
+ *
+ * @return worker's WorkerContext
+ */
+ WorkerContext getWorkerContext();
+
/**
* Get a synchronized map to the partitions and their sorted vertex lists.
* This could be used to run compute for the vertices or checkpointing.
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Mon Nov 7 22:09:35 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
import java.util.Iterator;
@@ -39,21 +40,14 @@ import java.util.Iterator;
public class SimpleCheckpointVertex extends
Vertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
implements Tool {
+ private static Logger LOG =
+ Logger.getLogger(SimpleCheckpointVertex.class);
/** Configuration */
private Configuration conf;
- /** User can access this after the application finishes if local */
- public static long finalSum;
- /** Number of supersteps to run (6 by default) */
- private static int supersteps = 6;
- /** Filename to indicate whether a fault was found */
- public final String faultFile = "/tmp/faultFile";
/** Which superstep to cause the worker to fail */
public final int faultingSuperstep = 4;
/** Vertex id to fault on */
public final long faultingVertexId = 1;
- /** Enable the fault at the particular vertex id and superstep? */
- private static boolean enableFault = false;
-
/** Dynamically set number of supersteps */
public static final String SUPERSTEP_COUNT =
"simpleCheckpointVertex.superstepCount";
@@ -62,32 +56,16 @@ public class SimpleCheckpointVertex exte
"simpleCheckpointVertex.enableFault";
@Override
- public void preApplication() throws InstantiationException, IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- sumAggregator.setAggregatedValue(new LongWritable(0));
- supersteps = getConf().getInt(SUPERSTEP_COUNT, supersteps);
- enableFault = getConf().getBoolean(ENABLE_FAULT, false);
- }
-
- @Override
- public void postApplication() {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- finalSum = sumAggregator.getAggregatedValue().get();
- }
-
- @Override
- public void preSuperstep() {
- useAggregator(LongSumAggregator.class.getName());
- }
-
- @Override
public void compute(Iterator<FloatWritable> msgIterator) {
+ SimpleCheckpointVertexWorkerContext workerContext =
+ (SimpleCheckpointVertexWorkerContext) getWorkerContext();
+
LongSumAggregator sumAggregator = (LongSumAggregator)
getAggregator(LongSumAggregator.class.getName());
+
+ boolean enableFault = workerContext.getEnableFault();
+ int supersteps = workerContext.getSupersteps();
+
if (enableFault && (getSuperstep() == faultingSuperstep) &&
(getContext().getTaskAttemptID().getId() == 0) &&
(getVertexId().get() == faultingVertexId)) {
@@ -133,6 +111,56 @@ public class SimpleCheckpointVertex exte
sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
}
}
+
+ public static class SimpleCheckpointVertexWorkerContext
+ extends WorkerContext {
+ /** User can access this after the application finishes if local */
+ public static long finalSum;
+ /** Number of supersteps to run (6 by default) */
+ private int supersteps = 6;
+ /** Filename to indicate whether a fault was found */
+ public final String faultFile = "/tmp/faultFile";
+ /** Enable the fault at the particular vertex id and superstep? */
+ private boolean enableFault = false;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ sumAggregator.setAggregatedValue(new LongWritable(0));
+ supersteps = getContext().getConfiguration()
+ .getInt(SUPERSTEP_COUNT, supersteps);
+ enableFault = getContext().getConfiguration()
+ .getBoolean(ENABLE_FAULT, false);
+ }
+
+ @Override
+ public void postApplication() {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ finalSum = sumAggregator.getAggregatedValue().get();
+ LOG.info("finalSum="+ finalSum);
+ }
+
+ @Override
+ public void preSuperstep() {
+ useAggregator(LongSumAggregator.class.getName());
+ }
+
+ @Override
+ public void postSuperstep() { }
+
+ public int getSupersteps() {
+ return this.supersteps;
+ }
+
+ public boolean getEnableFault() {
+ return this.enableFault;
+ }
+ }
@Override
public int run(String[] args) throws Exception {
@@ -175,18 +203,15 @@ public class SimpleCheckpointVertex exte
return -1;
}
- getConf().setClass(GiraphJob.VERTEX_CLASS, getClass(), Vertex.class);
- getConf().setClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
- GeneratedVertexInputFormat.class,
- VertexInputFormat.class);
- getConf().setClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
- SimpleTextVertexOutputFormat.class,
- VertexOutputFormat.class);
- getConf().setInt(GiraphJob.MIN_WORKERS,
- Integer.parseInt(cmd.getOptionValue('w')));
- getConf().setInt(GiraphJob.MAX_WORKERS,
- Integer.parseInt(cmd.getOptionValue('w')));
GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+ bspJob.setVertexClass(getClass());
+ bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
+ bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
+ bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+ int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+
FileOutputFormat.setOutputPath(bspJob,
new Path(cmd.getOptionValue('o')));
boolean verbose = false;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Mon Nov 7 22:09:35 2011
@@ -29,6 +29,7 @@ import org.apache.log4j.Logger;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.WorkerContext;
/**
* Vertex to allow unit testing of graph mutations.
@@ -36,19 +37,11 @@ import org.apache.giraph.graph.MutableVe
public class SimpleMutateGraphVertex extends
Vertex<LongWritable, DoubleWritable,
FloatWritable, DoubleWritable> {
- private static int edgesRemoved = 0;
-
+ /** Maximum number of ranges for vertex ids */
+ private long maxRanges = 100;
/** Class logger */
private static Logger LOG =
Logger.getLogger(SimpleMutateGraphVertex.class);
- /** Cached vertex count */
- private static long VERTEX_COUNT;
- /** Cached edge count */
- private static long EDGE_COUNT;
- /** Original cached edge count */
- private static long ORIG_EDGE_COUNT;
- /** Maximum number of ranges for vertex ids */
- private static long MAX_RANGES = 100;
/**
* Unless we create a ridiculous number of vertices , we should not
@@ -57,12 +50,16 @@ public class SimpleMutateGraphVertex ext
* @return Starting vertex id of the range
*/
private long rangeVertexIdStart(int range) {
- return (Long.MAX_VALUE / MAX_RANGES) * range;
+ return (Long.MAX_VALUE / maxRanges) * range;
}
@Override
public void compute(Iterator<DoubleWritable> msgIterator) throws IOException {
- if (getSuperstep() == 1) {
+
+ SimpleMutateGraphVertexWorkerContext workerContext =
+ (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
+
+ if (getSuperstep() == 1) {
// Send messages to vertices that are sure not to exist
// (creating them)
LongWritable destVertexId =
@@ -70,16 +67,18 @@ public class SimpleMutateGraphVertex ext
sendMsg(destVertexId, new DoubleWritable(0.0));
} else if (getSuperstep() == 2) {
} else if (getSuperstep() == 3) {
- if (VERTEX_COUNT * 2 != getNumVertices()) {
+ long vertex_count = workerContext.getVertexCount();
+ if (vertex_count * 2 != getNumVertices()) {
throw new IllegalStateException(
"Impossible to have " + getNumVertices() +
- " vertices when should have " + VERTEX_COUNT * 2 +
+ " vertices when should have " + vertex_count * 2 +
" on superstep " + getSuperstep());
}
- if (EDGE_COUNT != getNumEdges()) {
+ long edge_count = workerContext.getEdgeCount();
+ if (edge_count != getNumEdges()) {
throw new IllegalStateException(
"Impossible to have " + getNumEdges() +
- " edges when should have " + EDGE_COUNT +
+ " edges when should have " + edge_count +
" on superstep " + getSuperstep());
}
// Create vertices that are sure not to exist (doubling vertices)
@@ -95,22 +94,24 @@ public class SimpleMutateGraphVertex ext
getVertexId(), new FloatWritable(0.0f)));
} else if (getSuperstep() == 4) {
} else if (getSuperstep() == 5) {
- if (VERTEX_COUNT * 2 != getNumVertices()) {
+ long vertex_count = workerContext.getVertexCount();
+ if (vertex_count * 2 != getNumVertices()) {
throw new IllegalStateException(
"Impossible to have " + getNumVertices() +
- " when should have " + VERTEX_COUNT * 2 +
+ " when should have " + vertex_count * 2 +
" on superstep " + getSuperstep());
}
- if (EDGE_COUNT + VERTEX_COUNT != getNumEdges()) {
+ long edge_count = workerContext.getEdgeCount();
+ if (edge_count + vertex_count != getNumEdges()) {
throw new IllegalStateException(
"Impossible to have " + getNumEdges() +
- " edges when should have " + EDGE_COUNT + VERTEX_COUNT +
+ " edges when should have " + edge_count + vertex_count +
" on superstep " + getSuperstep());
}
// Remove the edges created in superstep 3
LongWritable vertexIndex =
new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
- ++edgesRemoved;
+ workerContext.increaseEdgesRemoved();
removeEdgeRequest(vertexIndex, getVertexId());
} else if (getSuperstep() == 6) {
// Remove all the vertices created in superstep 3
@@ -119,17 +120,19 @@ public class SimpleMutateGraphVertex ext
removeVertexRequest(getVertexId());
}
} else if (getSuperstep() == 7) {
- if (ORIG_EDGE_COUNT != getNumEdges()) {
+ long orig_edge_count = workerContext.getOrigEdgeCount();
+ if (orig_edge_count != getNumEdges()) {
throw new IllegalStateException(
"Impossible to have " + getNumEdges() +
- " edges when should have " + ORIG_EDGE_COUNT +
+ " edges when should have " + orig_edge_count +
" on superstep " + getSuperstep());
}
} else if (getSuperstep() == 8) {
- if (VERTEX_COUNT / 2 != getNumVertices()) {
+ long vertex_count = workerContext.getVertexCount();
+ if (vertex_count / 2 != getNumVertices()) {
throw new IllegalStateException(
"Impossible to have " + getNumVertices() +
- " vertices when should have " + VERTEX_COUNT / 2 +
+ " vertices when should have " + vertex_count / 2 +
" on superstep " + getSuperstep());
}
}
@@ -138,17 +141,54 @@ public class SimpleMutateGraphVertex ext
}
}
- @Override
- public void postSuperstep() {
- VERTEX_COUNT = getNumVertices();
- EDGE_COUNT = getNumEdges();
- if (getSuperstep() == 1) {
- ORIG_EDGE_COUNT = EDGE_COUNT;
- }
- LOG.info("Got " + VERTEX_COUNT + " vertices, " +
- EDGE_COUNT + " edges on superstep " +
- getSuperstep());
- LOG.info("Removed " + edgesRemoved);
- edgesRemoved = 0;
+ public static class SimpleMutateGraphVertexWorkerContext extends WorkerContext {
+ /** Cached vertex count */
+ private long vertexCount;
+ /** Cached edge count */
+ private long edgeCount;
+ /** Original number of edges */
+ private long origEdgeCount;
+ /** Number of edges removed during superstep */
+ private int edgesRemoved = 0;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException { }
+
+ @Override
+ public void postApplication() { }
+
+ @Override
+ public void preSuperstep() {
+ vertexCount = getNumVertices();
+ edgeCount = getNumEdges();
+ if (getSuperstep() == 1) {
+ origEdgeCount = edgeCount;
+ }
+ LOG.info("Got " + vertexCount + " vertices, " +
+ edgeCount + " edges on superstep " +
+ getSuperstep());
+ LOG.info("Removed " + edgesRemoved);
+ edgesRemoved = 0;
+ }
+
+ @Override
+ public void postSuperstep() { }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ public long getOrigEdgeCount() {
+ return origEdgeCount;
+ }
+
+ public void increaseEdgesRemoved() {
+ this.edgesRemoved++;
+ }
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Mon Nov 7 22:09:35 2011
@@ -19,11 +19,13 @@
package org.apache.giraph.examples;
import com.google.common.collect.Maps;
+
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.lib.TextVertexOutputFormat;
import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
import org.apache.hadoop.io.DoubleWritable;
@@ -43,63 +45,11 @@ import java.util.Map;
* Demonstrates the basic Pregel PageRank implementation.
*/
public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
- /** User can access this sum after the application finishes if local */
- public static long finalSum;
- /** User can access this min after the application finishes if local */
- public static double finalMin;
- /** User can access this max after the application finishes if local */
- public static double finalMax;
- /** Logger */
+ /** Logger */
private static final Logger LOG =
Logger.getLogger(SimplePageRankVertex.class);
@Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- registerAggregator("sum", LongSumAggregator.class);
- registerAggregator("min", MinAggregator.class);
- registerAggregator("max", MaxAggregator.class);
- }
-
- @Override
- public void postApplication() {
- LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
- MinAggregator minAggreg = (MinAggregator) getAggregator("min");
- MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
- finalSum = sumAggreg.getAggregatedValue().get();
- finalMin = minAggreg.getAggregatedValue().get();
- finalMax = maxAggreg.getAggregatedValue().get();
-
- }
-
- @Override
- public void preSuperstep() {
- LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
- MinAggregator minAggreg = (MinAggregator) getAggregator("min");
- MaxAggregator maxAggreg = (MaxAggregator) getAggregator("max");
- if (getSuperstep() >= 3) {
- LOG.info("aggregatedNumVertices=" +
- sumAggreg.getAggregatedValue() +
- " NumVertices=" + getNumVertices());
- if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
- throw new RuntimeException("wrong value of SumAggreg: " +
- sumAggreg.getAggregatedValue() + ", should be: " +
- getNumVertices());
- }
- DoubleWritable maxPagerank =
- (DoubleWritable)maxAggreg.getAggregatedValue();
- LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
- DoubleWritable minPagerank =
- (DoubleWritable)minAggreg.getAggregatedValue();
- LOG.info("aggregatedMinPageRank=" + minPagerank.get());
- }
- useAggregator("sum");
- useAggregator("min");
- useAggregator("max");
- sumAggreg.setAggregatedValue(new LongWritable(0L));
- }
-
- @Override
public void compute(Iterator<DoubleWritable> msgIterator) {
LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
MinAggregator minAggreg = (MinAggregator) getAggregator("min");
@@ -129,6 +79,76 @@ public class SimplePageRankVertex extend
}
}
+ public static class SimplePageRankVertexWorkerContext extends
+ WorkerContext {
+
+ public static double finalMax, finalMin;
+ public static long finalSum;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+
+ registerAggregator("sum", LongSumAggregator.class);
+ registerAggregator("min", MinAggregator.class);
+ registerAggregator("max", MaxAggregator.class);
+ }
+
+ @Override
+ public void postApplication() {
+
+ LongSumAggregator sumAggreg =
+ (LongSumAggregator) getAggregator("sum");
+ MinAggregator minAggreg =
+ (MinAggregator) getAggregator("min");
+ MaxAggregator maxAggreg =
+ (MaxAggregator) getAggregator("max");
+
+ finalSum = sumAggreg.getAggregatedValue().get();
+ finalMax = maxAggreg.getAggregatedValue().get();
+ finalMin = minAggreg.getAggregatedValue().get();
+
+ LOG.info("aggregatedNumVertices=" + finalSum);
+ LOG.info("aggregatedMaxPageRank=" + finalMax);
+ LOG.info("aggregatedMinPageRank=" + finalMin);
+ }
+
+ @Override
+ public void preSuperstep() {
+
+ LongSumAggregator sumAggreg =
+ (LongSumAggregator) getAggregator("sum");
+ MinAggregator minAggreg =
+ (MinAggregator) getAggregator("min");
+ MaxAggregator maxAggreg =
+ (MaxAggregator) getAggregator("max");
+
+ if (getSuperstep() >= 3) {
+ LOG.info("aggregatedNumVertices=" +
+ sumAggreg.getAggregatedValue() +
+ " NumVertices=" + getNumVertices());
+ if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+ throw new RuntimeException("wrong value of SumAggreg: " +
+ sumAggreg.getAggregatedValue() + ", should be: " +
+ getNumVertices());
+ }
+ DoubleWritable maxPagerank =
+ (DoubleWritable) maxAggreg.getAggregatedValue();
+ LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+ DoubleWritable minPagerank =
+ (DoubleWritable) minAggreg.getAggregatedValue();
+ LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+ }
+ useAggregator("sum");
+ useAggregator("min");
+ useAggregator("max");
+ sumAggreg.setAggregatedValue(new LongWritable(0L));
+ }
+
+ @Override
+ public void postSuperstep() { }
+ }
+
/**
* Simple VertexReader that supports {@link SimplePageRankVertex}
*/
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Mon Nov 7 22:09:35 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Fully runnable example of how to
+ * emit worker data to HDFS during a graph
+ * computation.
+ */
+public class SimpleVertexWithWorkerContext extends
+ Vertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
+ implements Tool {
+
+ public static final String OUTPUTDIR = "svwwc.outputdir";
+ private static final int TESTLENGTH = 30;
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator)
+ throws IOException {
+
+ long superstep = getSuperstep();
+
+ if (superstep < TESTLENGTH) {
+ EmitterWorkerContext emitter =
+ (EmitterWorkerContext) getWorkerContext();
+ emitter.emit("vertexId=" + getVertexId() +
+ " superstep=" + superstep + "\n");
+ } else {
+ voteToHalt();
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static class EmitterWorkerContext extends WorkerContext {
+
+ private static final String FILENAME = "emitter_";
+ private DataOutputStream out;
+
+ @Override
+ public void preApplication() {
+ Context context = getContext();
+ FileSystem fs;
+
+ try {
+ fs = FileSystem.get(context.getConfiguration());
+
+ String p = context.getConfiguration()
+ .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
+ if (p == null) {
+ throw new IllegalArgumentException(
+ SimpleVertexWithWorkerContext.OUTPUTDIR +
+ " undefined!");
+ }
+
+ Path path = new Path(p);
+ if (!fs.exists(path)) {
+ throw new IllegalArgumentException(path +
+ " doesn't exist");
+ }
+
+ Path outF = new Path(path, FILENAME +
+ context.getTaskAttemptID());
+ if (fs.exists(outF)) {
+ throw new IllegalArgumentException(outF +
+ " aready exists");
+ }
+
+ out = fs.create(outF);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "can't initialize WorkerContext", e);
+ }
+ }
+
+ @Override
+ public void postApplication() {
+ if (out != null) {
+ try {
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "can't finalize WorkerContext", e);
+ }
+ out = null;
+ }
+ }
+
+ @Override
+ public void preSuperstep() { }
+
+ @Override
+ public void postSuperstep() { }
+
+ public void emit(String s) {
+ try {
+ out.writeUTF(s);
+ } catch (IOException e) {
+ throw new RuntimeException("can't emit", e);
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ throw new IllegalArgumentException(
+ "run: Must have 2 arguments <output path> <# of workers>");
+ }
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.setVertexClass(getClass());
+ job.setVertexInputFormatClass(
+ SimpleSuperstepVertexInputFormat.class);
+ job.setWorkerContextClass(EmitterWorkerContext.class);
+ Configuration conf = job.getConfiguration();
+ conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+ job.setWorkerConfiguration(Integer.parseInt(args[1]),
+ Integer.parseInt(args[1]),
+ 100.0f);
+ if (job.run(true) == true) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
+ }
+}
\ No newline at end of file
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Mon Nov 7 22:09:35 2011
@@ -49,34 +49,6 @@ public abstract class BasicVertex<I exte
public abstract void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
/**
- * Optionally defined by the user to be executed once on all workers
- * before application has started.
- *
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- public abstract void preApplication()
- throws InstantiationException, IllegalAccessException;
-
- /**
- * Optionally defined by the user to be executed once on all workers
- * after the application has completed.
- */
- public abstract void postApplication();
-
- /**
- * Optionally defined by the user to be executed once prior to vertex
- * processing on a worker for the current superstep.
- */
- public abstract void preSuperstep();
-
- /**
- * Optionally defined by the user to be executed once after all vertex
- * processing on a worker for the current superstep.
- */
- public abstract void postSuperstep();
-
- /**
* Must be defined by user to do computation on a single Vertex.
*
* @param msgIterator Iterator to the messages that were sent to this
@@ -233,6 +205,15 @@ public abstract class BasicVertex<I exte
public Mapper.Context getContext() {
return getGraphState().getContext();
}
+
+ /**
+ * Get the worker context
+ *
+ * @return WorkerContext context
+ */
+ public WorkerContext getWorkerContext() {
+ return getGraphState().getGraphMapper().getWorkerContext();
+ }
@Override
public final <A extends Writable> Aggregator<A> registerAggregator(
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Mon Nov 7 22:09:35 2011
@@ -123,8 +123,6 @@ public abstract class BspService <
private static final Logger LOG = Logger.getLogger(BspService.class);
/** File system */
private final FileSystem fs;
- /** Used to call pre/post application/superstep methods */
- private final BasicVertex<I, V, E, M> representativeVertex;
/** Checkpoint frequency */
private int checkpointFrequency = -1;
/** Vertex range map based on the superstep below */
@@ -627,11 +625,6 @@ public abstract class BspService <
throw new RuntimeException(e);
}
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
-
- this.representativeVertex =
- BspUtils.<I, V, E, M>createVertex(
- getConfiguration());
- this.representativeVertex.setGraphState(getGraphMapper().getGraphState());
this.checkpointFrequency =
conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
@@ -670,15 +663,6 @@ public abstract class BspService <
}
/**
- * Get the representative vertex
- *
- * @return Representative vertex for this service.
- */
- final public BasicVertex<I, V, E, M> getRepresentativeVertex() {
- return representativeVertex;
- }
-
- /**
* Get the latest application attempt and cache it.
*
* @return the latest application attempt
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Mon Nov 7 22:09:35 2011
@@ -81,6 +81,8 @@ public class BspServiceWorker<
private final int finalRpcPort;
/** List of aggregators currently in use */
private Set<String> aggregatorInUse = new TreeSet<String>();
+ /** Worker Context */
+ private WorkerContext workerContext;
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
@@ -93,11 +95,17 @@ public class BspServiceWorker<
getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
getTaskPartition();
+ this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
+ graphMapper.getGraphState());
}
public int getPort() {
return finalRpcPort;
}
+
+ public WorkerContext getWorkerContext() {
+ return workerContext;
+ }
/**
* Intended to check the health of the node. For instance, can it ssh,
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Mon Nov 7 22:09:35 2011
@@ -202,6 +202,42 @@ public class BspUtils {
resolver.setGraphState(graphState);
return resolver;
}
+
+ /**
+ * Get the user's subclassed WorkerContext.
+ *
+ * @param conf Configuration to check
+ * @return User's worker context class
+ */
+ public static Class<? extends WorkerContext>
+ getWorkerContextClass(Configuration conf) {
+ return (Class<? extends WorkerContext>)
+ conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+ DefaultWorkerContext.class,
+ WorkerContext.class);
+ }
+
+ /**
+ * Create a user worker context
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user worker context
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ WorkerContext createWorkerContext(Configuration conf,
+ GraphState<I, V, E, M> graphState) {
+ Class<? extends WorkerContext> workerContextClass =
+ getWorkerContextClass(conf);
+ WorkerContext workerContext =
+ ReflectionUtils.newInstance(workerContextClass, conf);
+ workerContext.setGraphState(graphState);
+ return workerContext;
+ }
+
/**
* Get the user's subclassed Vertex.
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java Mon Nov 7 22:09:35 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * A dumb implementation of {@link WorkerContext}. This is the default
+ * implementation when no WorkerContext is defined by the user. It does
+ * nothing.
+ */
+public class DefaultWorkerContext extends WorkerContext {
+
+ @Override
+ public void preApplication() { }
+
+ @Override
+ public void postApplication() { }
+
+ @Override
+ public void preSuperstep() { }
+
+ @Override
+ public void postSuperstep() { }
+}
\ No newline at end of file
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Mon Nov 7 22:09:35 2011
@@ -58,6 +58,8 @@ public class GiraphJob extends Job {
public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+ /** Worker context class */
+ public static final String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
/**
* Minimum number of simultaneous workers before this job can run (int)
@@ -392,7 +394,19 @@ public class GiraphJob extends Job {
vertexResolverClass,
VertexResolver.class);
}
-
+
+ /**
+ * Set the worker context class (optional)
+ *
+ * @param workerContextClass Determines what code is executed on a each
+ * worker before and after each superstep and computation
+ */
+ final public void setWorkerContextClass(Class<?> workerContextClass) {
+ getConfiguration().setClass(WORKER_CONTEXT_CLASS,
+ workerContextClass,
+ WorkerContext.class);
+ }
+
/**
* Set worker configuration for determining what is required for
* a superstep.
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Mon Nov 7 22:09:35 2011
@@ -107,6 +107,10 @@ public class GraphMapper<I extends Writa
public final AggregatorUsage getAggregatorUsage() {
return serviceWorker;
}
+
+ public final WorkerContext getWorkerContext() {
+ return serviceWorker.getWorkerContext();
+ }
public final GraphState<I,V,E,M> getGraphState() {
return graphState;
@@ -519,8 +523,7 @@ public class GraphMapper<I extends Writa
.setNumVertices(serviceWorker.getTotalVertices());
try {
- serviceWorker.getRepresentativeVertex().setGraphState(graphState);
- serviceWorker.getRepresentativeVertex().preApplication();
+ serviceWorker.getWorkerContext().preApplication();
} catch (InstantiationException e) {
LOG.fatal("map: preApplication failed in instantiation", e);
throw new RuntimeException(
@@ -588,7 +591,8 @@ public class GraphMapper<I extends Writa
serviceWorker.exchangeVertexRanges();
context.progress();
- serviceWorker.getRepresentativeVertex().preSuperstep();
+ serviceWorker.getWorkerContext().setGraphState(graphState);
+ serviceWorker.getWorkerContext().preSuperstep();
context.progress();
workerFinishedVertices = 0;
@@ -631,7 +635,7 @@ public class GraphMapper<I extends Writa
}
}
- serviceWorker.getRepresentativeVertex().postSuperstep();
+ serviceWorker.getWorkerContext().postSuperstep();
context.progress();
if (LOG.isInfoEnabled()) {
LOG.info("map: totalMem="
@@ -649,7 +653,7 @@ public class GraphMapper<I extends Writa
"(global vertices marked done)");
}
- serviceWorker.getRepresentativeVertex().postApplication();
+ serviceWorker.getWorkerContext().postApplication();
context.progress();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Mon Nov 7 22:09:35 2011
@@ -67,28 +67,6 @@ public abstract class LongDoubleFloatDou
}
@Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void postApplication() {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void preSuperstep() {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void postSuperstep() {
- // Do nothing, might be overriden by the user
- }
-
-
- @Override
public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) {
if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
if (LOG.isDebugEnabled()) {
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Mon Nov 7 22:09:35 2011
@@ -82,27 +82,6 @@ public abstract class Vertex<I extends W
}
@Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void postApplication() {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void preSuperstep() {
- // Do nothing, might be overriden by the user
- }
-
- @Override
- public void postSuperstep() {
- // Do nothing, might be overriden by the user
- }
-
- @Override
public final boolean addEdge(I targetVertexId, E edgeValue) {
if (destEdgeMap.put(
targetVertexId,
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1198972&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Mon Nov 7 22:09:35 2011
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * WorkerContext allows for the execution of user code
+ * on a per-worker basis. There's one WorkerContext per worker.
+ *
+ */
+@SuppressWarnings("rawtypes")
+public abstract class WorkerContext implements AggregatorUsage {
+ /** Global graph state */
+ private GraphState graphState;
+
+ public void setGraphState(GraphState graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Initialize the WorkerContext.
+ * This method is executed once on each Worker before the first
+ * superstep starts.
+ *
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ public abstract void preApplication() throws InstantiationException,
+ IllegalAccessException;
+
+ /**
+ * Finalize the WorkerContext.
+ * This method is executed once on each Worker after the last
+ * superstep ends.
+ */
+ public abstract void postApplication();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker before each
+ * superstep starts.
+ */
+ public abstract void preSuperstep();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker after each
+ * superstep ends.
+ */
+ public abstract void postSuperstep();
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return graphState.getSuperstep();
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getNumVertices() {
+ return graphState.getNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getNumEdges() {
+ return graphState.getNumEdges();
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return graphState.getContext();
+ }
+
+ @Override
+ public final <A extends Writable> Aggregator<A> registerAggregator(
+ String name,
+ Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ registerAggregator(name, aggregatorClass);
+ }
+
+ @Override
+ public final Aggregator<? extends Writable> getAggregator(String name) {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ getAggregator(name);
+ }
+
+ @Override
+ public final boolean useAggregator(String name) {
+ return graphState.getGraphMapper().getAggregatorUsage().
+ useAggregator(name);
+ }
+}
\ No newline at end of file
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Mon Nov 7 22:09:35 2011
@@ -81,6 +81,8 @@ public class TestAutoCheckpoint extends
job.setVertexClass(SimpleCheckpointVertex.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
Path outputPath = new Path("/tmp/" + getCallingMethodName());
removeAndSetOutput(job, outputPath);
assertTrue(job.run(true));
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Nov 7 22:09:35 2011
@@ -273,12 +273,17 @@ public class TestBspBasic extends BspCas
GiraphJob job = new GiraphJob(getCallingMethodName());
setupConfiguration(job);
job.setVertexClass(SimplePageRankVertex.class);
+ job.setWorkerContextClass(
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
assertTrue(job.run(true));
if (getJobTracker() == null) {
- double maxPageRank = SimplePageRankVertex.finalMax;
- double minPageRank = SimplePageRankVertex.finalMin;
- long numVertices = SimplePageRankVertex.finalSum;
+ double maxPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMax;
+ double minPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMin;
+ long numVertices =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalSum;
System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
" minPageRank=" + minPageRank +
" numVertices=" + numVertices);
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Mon Nov 7 22:09:35 2011
@@ -68,6 +68,8 @@ public class TestManualCheckpoint extend
job.getConfiguration().setBoolean(
GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
Path outputPath = new Path("/tmp/" + getCallingMethodName());
@@ -78,7 +80,8 @@ public class TestManualCheckpoint extend
if (getJobTracker() == null) {
FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
fileLen = fileStatus.getLen();
- idSum = SimpleCheckpointVertex.finalSum;
+ idSum =
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.finalSum;
System.out.println("testBspCheckpoint: idSum = " + idSum +
" fileLen = " + fileLen);
}
@@ -94,6 +97,8 @@ public class TestManualCheckpoint extend
HDFS_CHECKPOINT_DIR);
restartedJob.getConfiguration().setLong(GiraphJob.RESTART_SUPERSTEP, 2);
restartedJob.setVertexClass(SimpleCheckpointVertex.class);
+ restartedJob.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
restartedJob.setVertexInputFormatClass(
SimpleSuperstepVertexInputFormat.class);
restartedJob.setVertexOutputFormatClass(
@@ -105,7 +110,8 @@ public class TestManualCheckpoint extend
FileStatus fileStatus = getSinglePartFileStatus(job, outputPath);
fileLen = fileStatus.getLen();
assertTrue(fileStatus.getLen() == fileLen);
- long idSumRestarted = SimpleCheckpointVertex.finalSum;
+ long idSumRestarted =
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.finalSum;
System.out.println("testBspCheckpoint: idSumRestarted = " +
idSumRestarted);
assertTrue(idSum == idSumRestarted);
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java Mon Nov 7 22:09:35 2011
@@ -60,6 +60,8 @@ public class TestMutateGraphVertex exten
GiraphJob job = new GiraphJob(getCallingMethodName());
setupConfiguration(job);
job.setVertexClass(SimpleMutateGraphVertex.class);
+ job.setWorkerContextClass(
+ SimpleMutateGraphVertex.SimpleMutateGraphVertexWorkerContext.class);
job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
Path outputPath = new Path("/tmp/" + getCallingMethodName());
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java?rev=1198972&r1=1198971&r2=1198972&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Mon Nov 7 22:09:35 2011
@@ -65,6 +65,8 @@ public class TestVertexRangeBalancer ext
GiraphJob job = new GiraphJob("testStaticBalancer");
setupConfiguration(job);
job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
Path outputPath = new Path("/tmp/testStaticBalancer");
@@ -86,6 +88,8 @@ public class TestVertexRangeBalancer ext
job = new GiraphJob("testSuperstepBalancer");
setupConfiguration(job);
job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
job.setVertexRangeBalancerClass(SuperstepBalancer.class);
@@ -106,6 +110,8 @@ public class TestVertexRangeBalancer ext
job = new GiraphJob("testAutoBalancer");
setupConfiguration(job);
job.setVertexClass(SimpleCheckpointVertex.class);
+ job.setWorkerContextClass(
+ SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
job.setVertexRangeBalancerClass(AutoBalancer.class);