You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [2/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Feb 16 22:12:31 2012
@@ -41,361 +41,386 @@ import java.util.Random;
* Random Message Benchmark for evaluating the messaging performance.
*/
public class RandomMessageBenchmark implements Tool {
- /** Configuration from Configurable */
- private Configuration conf;
+ /** How many supersteps to run */
+ public static final String SUPERSTEP_COUNT =
+ "RandomMessageBenchmark.superstepCount";
+ /** How many bytes per message */
+ public static final String NUM_BYTES_PER_MESSAGE =
+ "RandomMessageBenchmark.numBytesPerMessage";
+ /** Default bytes per message */
+ public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+ /** How many messages per edge */
+ public static final String NUM_MESSAGES_PER_EDGE =
+ "RandomMessageBenchmark.numMessagesPerEdge";
+ /** Default messages per edge */
+ public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+ /** All bytes sent during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+ "superstep total bytes sent";
+ /** All bytes sent during this application */
+ public static final String AGG_TOTAL_BYTES = "total bytes sent";
+ /** All messages during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+ "superstep total messages";
+ /** All messages during this application */
+ public static final String AGG_TOTAL_MESSAGES = "total messages";
+ /** All millis during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+ "superstep total millis";
+ /** All millis during this application */
+ public static final String AGG_TOTAL_MILLIS = "total millis";
+ /** Workers for that superstep */
+ public static final String WORKERS = "workers";
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+ /** Configuration from Configurable */
+ private Configuration conf;
+
+ /**
+ * {@link WorkerContext} forRandomMessageBenchmark.
+ */
+ private static class RandomMessageBenchmarkWorkerContext extends
+ WorkerContext {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+ /** Bytes to be sent */
+ private byte[] messageBytes;
+ /** Number of messages sent per edge */
+ private int numMessagesPerEdge = -1;
+ /** Number of supersteps */
+ private int numSupersteps = -1;
+ /** Random generator for random bytes message */
+ private final Random random = new Random(System.currentTimeMillis());
+ /** Start superstep millis */
+ private long startSuperstepMillis = 0;
+ /** Total bytes */
+ private long totalBytes = 0;
+ /** Total messages */
+ private long totalMessages = 0;
+ /** Total millis */
+ private long totalMillis = 0;
+ /** Class logger */
- /** How many supersteps to run */
- public static final String SUPERSTEP_COUNT =
- "RandomMessageBenchmark.superstepCount";
- /** How many bytes per message */
- public static final String NUM_BYTES_PER_MESSAGE =
- "RandomMessageBenchmark.numBytesPerMessage";
- /** Default bytes per message */
- public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
- /** How many messages per edge */
- public static final String NUM_MESSAGES_PER_EDGE=
- "RandomMessageBenchmark.numMessagesPerEdge";
- /** Default messages per edge */
- public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
- /** All bytes sent during this superstep */
- public static final String AGG_SUPERSTEP_TOTAL_BYTES =
- "superstep total bytes sent";
- /** All bytes sent during this application */
- public static final String AGG_TOTAL_BYTES = "total bytes sent";
- /** All messages during this superstep */
- public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
- "superstep total messages";
- /** All messages during this application */
- public static final String AGG_TOTAL_MESSAGES = "total messages";
- /** All millis during this superstep */
- public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
- "superstep total millis";
- /** All millis during this application */
- public static final String AGG_TOTAL_MILLIS = "total millis";
- /** Workers for that superstep */
- public static final String WORKERS = "workers";
-
- /**
- * {@link WorkerContext} forRandomMessageBenchmark.
- */
- private static class RandomMessageBenchmarkWorkerContext extends
- WorkerContext {
- /** Bytes to be sent */
- private byte[] messageBytes;
- /** Number of messages sent per edge */
- private int numMessagesPerEdge = -1;
- /** Number of supersteps */
- private int numSupersteps = -1;
- /** Random generator for random bytes message */
- private final Random random = new Random(System.currentTimeMillis());
- /** Start superstep millis */
- private long startSuperstepMillis = 0;
- /** Total bytes */
- private long totalBytes = 0;
- /** Total messages */
- private long totalMessages = 0;
- /** Total millis */
- private long totalMillis = 0;
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- messageBytes =
- new byte[getContext().getConfiguration().
- getInt(NUM_BYTES_PER_MESSAGE,
- DEFAULT_NUM_BYTES_PER_MESSAGE)];
- numMessagesPerEdge =
- getContext().getConfiguration().
- getInt(NUM_MESSAGES_PER_EDGE,
- DEFAULT_NUM_MESSAGES_PER_EDGE);
- numSupersteps = getContext().getConfiguration().
- getInt(SUPERSTEP_COUNT, -1);
- registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
- LongSumAggregator.class);
- registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
- LongSumAggregator.class);
- registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
- LongSumAggregator.class);
- registerAggregator(WORKERS,
- LongSumAggregator.class);
- }
-
- @Override
- public void preSuperstep() {
- LongSumAggregator superstepBytesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- LongSumAggregator superstepMessagesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
- LongSumAggregator superstepMillisAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
- LongSumAggregator workersAggregator =
- (LongSumAggregator) getAggregator(WORKERS);
-
- // For timing and tracking the supersteps
- // - superstep 0 starts the time, but cannot display any stats
- // since nothing has been aggregated yet
- // - supersteps > 0 can display the stats
- if (getSuperstep() == 0) {
- startSuperstepMillis = System.currentTimeMillis();
- } else {
- totalBytes +=
- superstepBytesAggregator.getAggregatedValue().get();
- totalMessages +=
- superstepMessagesAggregator.getAggregatedValue().get();
- totalMillis +=
- superstepMillisAggregator.getAggregatedValue().get();
- double superstepMegabytesPerSecond =
- superstepBytesAggregator.getAggregatedValue().get() *
- workersAggregator.getAggregatedValue().get() *
- 1000d / 1024d / 1024d /
- superstepMillisAggregator.getAggregatedValue().get();
- double megabytesPerSecond = totalBytes *
- workersAggregator.getAggregatedValue().get() *
- 1000d / 1024d / 1024d / totalMillis;
- double superstepMessagesPerSecond =
- superstepMessagesAggregator.getAggregatedValue().get() *
- workersAggregator.getAggregatedValue().get() * 1000d /
- superstepMillisAggregator.getAggregatedValue().get();
- double messagesPerSecond = totalMessages *
- workersAggregator.getAggregatedValue().get() * 1000d /
- totalMillis;
- if (LOG.isInfoEnabled()) {
- LOG.info("Outputing statistics for superstep " +
- getSuperstep());
- LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
- superstepBytesAggregator.getAggregatedValue());
- LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
- LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
- superstepMessagesAggregator.getAggregatedValue());
- LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
- LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
- superstepMillisAggregator.getAggregatedValue());
- LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
- LOG.info(WORKERS + " : " +
- workersAggregator.getAggregatedValue());
- LOG.info("Superstep megabytes / second = " +
- superstepMegabytesPerSecond);
- LOG.info("Total megabytes / second = " +
- megabytesPerSecond);
- LOG.info("Superstep messages / second = " +
- superstepMessagesPerSecond);
- LOG.info("Total messages / second = " +
- messagesPerSecond);
- LOG.info("Superstep megabytes / second / worker = " +
- superstepMegabytesPerSecond /
- workersAggregator.getAggregatedValue().get());
- LOG.info("Total megabytes / second / worker = " +
- megabytesPerSecond /
- workersAggregator.getAggregatedValue().get());
- LOG.info("Superstep messages / second / worker = " +
- superstepMessagesPerSecond /
- workersAggregator.getAggregatedValue().get());
- LOG.info("Total messages / second / worker = " +
- messagesPerSecond /
- workersAggregator.getAggregatedValue().get());
- }
- }
-
- superstepBytesAggregator.setAggregatedValue(
- new LongWritable(0L));
- superstepMessagesAggregator.setAggregatedValue(
- new LongWritable(0L));
- workersAggregator.setAggregatedValue(
- new LongWritable(1L));
- useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
- useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
- useAggregator(WORKERS);
- }
-
- @Override
- public void postSuperstep() {
- LongSumAggregator superstepMillisAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
- long endSuperstepMillis = System.currentTimeMillis();
- long superstepMillis = endSuperstepMillis - startSuperstepMillis;
- startSuperstepMillis = endSuperstepMillis;
- superstepMillisAggregator.setAggregatedValue(
- new LongWritable(superstepMillis));
- }
-
- @Override
- public void postApplication() {}
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ messageBytes =
+ new byte[getContext().getConfiguration().
+ getInt(NUM_BYTES_PER_MESSAGE,
+ DEFAULT_NUM_BYTES_PER_MESSAGE)];
+ numMessagesPerEdge =
+ getContext().getConfiguration().
+ getInt(NUM_MESSAGES_PER_EDGE,
+ DEFAULT_NUM_MESSAGES_PER_EDGE);
+ numSupersteps = getContext().getConfiguration().
+ getInt(SUPERSTEP_COUNT, -1);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+ LongSumAggregator.class);
+ registerAggregator(WORKERS,
+ LongSumAggregator.class);
+ }
- public byte[] getMessageBytes() {
- return messageBytes;
- }
+ @Override
+ public void preSuperstep() {
+ LongSumAggregator superstepBytesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ LongSumAggregator superstepMessagesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ LongSumAggregator superstepMillisAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ LongSumAggregator workersAggregator =
+ (LongSumAggregator) getAggregator(WORKERS);
+
+ // For timing and tracking the supersteps
+ // - superstep 0 starts the time, but cannot display any stats
+ // since nothing has been aggregated yet
+ // - supersteps > 0 can display the stats
+ if (getSuperstep() == 0) {
+ startSuperstepMillis = System.currentTimeMillis();
+ } else {
+ totalBytes +=
+ superstepBytesAggregator.getAggregatedValue().get();
+ totalMessages +=
+ superstepMessagesAggregator.getAggregatedValue().get();
+ totalMillis +=
+ superstepMillisAggregator.getAggregatedValue().get();
+ double superstepMegabytesPerSecond =
+ superstepBytesAggregator.getAggregatedValue().get() *
+ workersAggregator.getAggregatedValue().get() *
+ 1000d / 1024d / 1024d /
+ superstepMillisAggregator.getAggregatedValue().get();
+ double megabytesPerSecond = totalBytes *
+ workersAggregator.getAggregatedValue().get() *
+ 1000d / 1024d / 1024d / totalMillis;
+ double superstepMessagesPerSecond =
+ superstepMessagesAggregator.getAggregatedValue().get() *
+ workersAggregator.getAggregatedValue().get() * 1000d /
+ superstepMillisAggregator.getAggregatedValue().get();
+ double messagesPerSecond = totalMessages *
+ workersAggregator.getAggregatedValue().get() * 1000d /
+ totalMillis;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Outputing statistics for superstep " +
+ getSuperstep());
+ LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
+ superstepBytesAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
+ superstepMessagesAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
+ superstepMillisAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+ LOG.info(WORKERS + " : " +
+ workersAggregator.getAggregatedValue());
+ LOG.info("Superstep megabytes / second = " +
+ superstepMegabytesPerSecond);
+ LOG.info("Total megabytes / second = " +
+ megabytesPerSecond);
+ LOG.info("Superstep messages / second = " +
+ superstepMessagesPerSecond);
+ LOG.info("Total messages / second = " +
+ messagesPerSecond);
+ LOG.info("Superstep megabytes / second / worker = " +
+ superstepMegabytesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Total megabytes / second / worker = " +
+ megabytesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Superstep messages / second / worker = " +
+ superstepMessagesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Total messages / second / worker = " +
+ messagesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ }
+ }
+
+ superstepBytesAggregator.setAggregatedValue(
+ new LongWritable(0L));
+ superstepMessagesAggregator.setAggregatedValue(
+ new LongWritable(0L));
+ workersAggregator.setAggregatedValue(
+ new LongWritable(1L));
+ useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ useAggregator(WORKERS);
+ }
- public int getNumMessagePerEdge() {
- return numMessagesPerEdge;
- }
+ @Override
+ public void postSuperstep() {
+ LongSumAggregator superstepMillisAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ long endSuperstepMillis = System.currentTimeMillis();
+ long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+ startSuperstepMillis = endSuperstepMillis;
+ superstepMillisAggregator.setAggregatedValue(
+ new LongWritable(superstepMillis));
+ }
- public int getNumSupersteps() {
- return numSupersteps;
- }
+ @Override
+ public void postApplication() { }
- public void randomizeMessageBytes() {
- random.nextBytes(messageBytes);
- }
+ /**
+ * Get the message bytes to be used for sending.
+ *
+ * @return Byte array used for messages.
+ */
+ public byte[] getMessageBytes() {
+ return messageBytes;
}
/**
- * Actual message computation (messaging in this case)
+ * Get the number of edges per message.
+ *
+ * @return Messages per edge.
*/
- public static class RandomMessageVertex extends EdgeListVertex<
- LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
-
- @Override
- public void compute(Iterator<BytesWritable> msgIterator) {
- RandomMessageBenchmarkWorkerContext workerContext =
- (RandomMessageBenchmarkWorkerContext) getWorkerContext();
- LongSumAggregator superstepBytesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
- LongSumAggregator superstepMessagesAggregator =
- (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
- if (getSuperstep() < workerContext.getNumSupersteps()) {
- for (int i = 0; i < workerContext.getNumMessagePerEdge();
- i++) {
- workerContext.randomizeMessageBytes();
- sendMsgToAllEdges(
- new BytesWritable(workerContext.getMessageBytes()));
- long bytesSent = workerContext.getMessageBytes().length *
- getNumOutEdges();
- superstepBytesAggregator.aggregate(bytesSent);
- superstepMessagesAggregator.aggregate(getNumOutEdges());
- }
- } else {
- voteToHalt();
- }
- }
+ public int getNumMessagePerEdge() {
+ return numMessagesPerEdge;
}
- @Override
- public Configuration getConf() {
- return conf;
+ /**
+ * Get the number of supersteps.
+ *
+ * @return Number of supersteps.
+ */
+ public int getNumSupersteps() {
+ return numSupersteps;
}
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
+ /**
+ * Randomize the message bytes.
+ */
+ public void randomizeMessageBytes() {
+ random.nextBytes(messageBytes);
}
+ }
+ /**
+ * Actual message computation (messaging in this case)
+ */
+ public static class RandomMessageVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
@Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("w",
- "workers",
- true,
- "Number of workers");
- options.addOption("b",
- "bytes",
- true,
- "Message bytes per memssage");
- options.addOption("n",
- "number",
- true,
- "Number of messages per edge");
- options.addOption("s",
- "supersteps",
- true,
- "Supersteps to execute before finishing");
- options.addOption("V",
- "aggregateVertices",
- true,
- "Aggregate vertices");
- options.addOption("e",
- "edgesPerVertex",
- true,
- "Edges per vertex");
- options.addOption("f",
- "flusher",
- true,
- "Number of flush threads");
-
- HelpFormatter formatter = new HelpFormatter();
- if (args.length == 0) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(options, args);
- if (cmd.hasOption('h')) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- if (!cmd.hasOption('w')) {
- System.out.println("Need to choose the number of workers (-w)");
- return -1;
- }
- if (!cmd.hasOption('s')) {
- System.out.println("Need to set the number of supersteps (-s)");
- return -1;
- }
- if (!cmd.hasOption('V')) {
- System.out.println("Need to set the aggregate vertices (-V)");
- return -1;
- }
- if (!cmd.hasOption('e')) {
- System.out.println("Need to set the number of edges " +
- "per vertex (-e)");
- return -1;
- }
- if (!cmd.hasOption('b')) {
- System.out.println("Need to set the number of message bytes (-b)");
- return -1;
- }
- if (!cmd.hasOption('n')) {
- System.out.println("Need to set the number of messages per edge (-n)");
- return -1;
- }
- int workers = Integer.parseInt(cmd.getOptionValue('w'));
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
- job.setVertexClass(RandomMessageVertex.class);
- job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
- job.setWorkerConfiguration(workers, workers, 100.0f);
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
- Long.parseLong(cmd.getOptionValue('V')));
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
- Long.parseLong(cmd.getOptionValue('e')));
- job.getConfiguration().setInt(
- SUPERSTEP_COUNT,
- Integer.parseInt(cmd.getOptionValue('s')));
- job.getConfiguration().setInt(
- RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
- Integer.parseInt(cmd.getOptionValue('b')));
- job.getConfiguration().setInt(
- RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
- Integer.parseInt(cmd.getOptionValue('n')));
-
- boolean isVerbose = false;
- if (cmd.hasOption('v')) {
- isVerbose = true;
- }
- if (cmd.hasOption('s')) {
- getConf().setInt(SUPERSTEP_COUNT,
- Integer.parseInt(cmd.getOptionValue('s')));
- }
- if (cmd.hasOption('f')) {
- job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
- Integer.parseInt(cmd.getOptionValue('f')));
- }
- if (job.run(isVerbose) == true) {
- return 0;
- } else {
- return -1;
- }
+ public void compute(Iterator<BytesWritable> msgIterator) {
+ RandomMessageBenchmarkWorkerContext workerContext =
+ (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+ LongSumAggregator superstepBytesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ LongSumAggregator superstepMessagesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ if (getSuperstep() < workerContext.getNumSupersteps()) {
+ for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
+ workerContext.randomizeMessageBytes();
+ sendMsgToAllEdges(
+ new BytesWritable(workerContext.getMessageBytes()));
+ long bytesSent = workerContext.getMessageBytes().length *
+ getNumOutEdges();
+ superstepBytesAggregator.aggregate(bytesSent);
+ superstepMessagesAggregator.aggregate(getNumOutEdges());
+ }
+ } else {
+ voteToHalt();
+ }
}
+ }
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("b",
+ "bytes",
+ true,
+ "Message bytes per memssage");
+ options.addOption("n",
+ "number",
+ true,
+ "Number of messages per edge");
+ options.addOption("s",
+ "supersteps",
+ true,
+ "Supersteps to execute before finishing");
+ options.addOption("V",
+ "aggregateVertices",
+ true,
+ "Aggregate vertices");
+ options.addOption("e",
+ "edgesPerVertex",
+ true,
+ "Edges per vertex");
+ options.addOption("f",
+ "flusher",
+ true,
+ "Number of flush threads");
+
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('s')) {
+ LOG.info("Need to set the number of supersteps (-s)");
+ return -1;
+ }
+ if (!cmd.hasOption('V')) {
+ LOG.info("Need to set the aggregate vertices (-V)");
+ return -1;
+ }
+ if (!cmd.hasOption('e')) {
+ LOG.info("Need to set the number of edges " +
+ "per vertex (-e)");
+ return -1;
+ }
+ if (!cmd.hasOption('b')) {
+ LOG.info("Need to set the number of message bytes (-b)");
+ return -1;
}
+ if (!cmd.hasOption('n')) {
+ LOG.info("Need to set the number of messages per edge (-n)");
+ return -1;
+ }
+ int workers = Integer.parseInt(cmd.getOptionValue('w'));
+ GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
+ job.setVertexClass(RandomMessageVertex.class);
+ job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+ job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
+ job.setWorkerConfiguration(workers, workers, 100.0f);
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+ Long.parseLong(cmd.getOptionValue('V')));
+ job.getConfiguration().setLong(
+ PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+ Long.parseLong(cmd.getOptionValue('e')));
+ job.getConfiguration().setInt(
+ SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ job.getConfiguration().setInt(
+ RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+ Integer.parseInt(cmd.getOptionValue('b')));
+ job.getConfiguration().setInt(
+ RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
+ Integer.parseInt(cmd.getOptionValue('n')));
+
+ boolean isVerbose = false;
+ if (cmd.hasOption('v')) {
+ isVerbose = true;
+ }
+ if (cmd.hasOption('s')) {
+ getConf().setInt(SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ }
+ if (cmd.hasOption('f')) {
+ job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ Integer.parseInt(cmd.getOptionValue('f')));
+ }
+ if (job.run(isVerbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Execute the benchmark.
+ *
+ * @param args Typically, this is the command line arguments.
+ * @throws Exception Any exception thrown during computation.
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+ }
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of benchmarks for performance testing and optimization
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.benchmark;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java Thu Feb 16 22:12:31 2012
@@ -22,8 +22,12 @@ package org.apache.giraph.bsp;
* State of the BSP application
*/
public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
+ /** Shouldn't be seen, just an initial state */
+ UNKNOWN,
+ /** Start from a desired superstep */
+ START_SUPERSTEP,
+ /** Unrecoverable */
+ FAILED,
+ /** Successful completion */
+ FINISHED,
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java Thu Feb 16 22:12:31 2012
@@ -39,52 +39,55 @@ import org.apache.log4j.Logger;
* separate. It is not meant to do any meaningful split of user-data.
*/
public class BspInputFormat extends InputFormat<Text, Text> {
- /** Logger */
- private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
+ /** Class Logger */
+ private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
- /**
- * Get the correct number of mappers based on the configuration
- *
- * @param conf Configuration to determine the number of mappers
- */
- public static int getMaxTasks(Configuration conf) {
- int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
- boolean splitMasterWorker =
- conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
- GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
- int maxTasks = maxWorkers;
- if (splitMasterWorker) {
- int zkServers =
- conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
- maxTasks += zkServers;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
- ", split master/worker = " + splitMasterWorker +
- ", total max tasks = " + maxTasks);
- }
- return maxTasks;
+ /**
+ * Get the correct number of mappers based on the configuration
+ *
+ * @param conf Configuration to determine the number of mappers
+ * @return Maximum number of tasks
+ */
+ public static int getMaxTasks(Configuration conf) {
+ int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
+ boolean splitMasterWorker =
+ conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
+ GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+ int maxTasks = maxWorkers;
+ if (splitMasterWorker) {
+ int zkServers =
+ conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+ GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ maxTasks += zkServers;
}
-
- public List<InputSplit> getSplits(JobContext context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- int maxTasks = getMaxTasks(conf);
- if (maxTasks <= 0) {
- throw new InterruptedException(
- "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
- }
- List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
- for (int i = 0; i < maxTasks; ++i) {
- inputSplitList.add(new BspInputSplit());
- }
- return inputSplitList;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
+ ", split master/worker = " + splitMasterWorker +
+ ", total max tasks = " + maxTasks);
}
+ return maxTasks;
+ }
- public RecordReader<Text, Text>
- createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new BspRecordReader();
+ @Override
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int maxTasks = getMaxTasks(conf);
+ if (maxTasks <= 0) {
+ throw new InterruptedException(
+ "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
+ }
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < maxTasks; ++i) {
+ inputSplitList.add(new BspInputSplit());
}
+ return inputSplitList;
+ }
+
+ @Override
+ public RecordReader<Text, Text>
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BspRecordReader();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java Thu Feb 16 22:12:31 2012
@@ -32,51 +32,70 @@ import org.apache.hadoop.mapreduce.Input
* directly.
*/
public class BspInputSplit extends InputSplit implements Writable {
- /** Number of splits */
- private int numSplits = -1;
- /** Split index */
- private int splitIndex = -1;
-
- public BspInputSplit() {}
-
- public BspInputSplit(int splitIndex, int numSplits) {
- this.splitIndex = splitIndex;
- this.numSplits = numSplits;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- splitIndex = in.readInt();
- numSplits = in.readInt();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(splitIndex);
- out.writeInt(numSplits);
- }
-
- public int getSplitIndex() {
- return splitIndex;
- }
-
- public int getNumSplits() {
- return numSplits;
- }
-
- @Override
- public String toString() {
- return "'" + getClass().getCanonicalName() +
- ", index=" + getSplitIndex() + ", num=" + getNumSplits();
- }
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
+
+ /**
+ * Reflection constructor.
+ */
+ public BspInputSplit() { }
+
+ /**
+ * Constructor used by {@link BspInputFormat}.
+ *
+ * @param splitIndex Index of this split.
+ * @param numSplits Total number of splits.
+ */
+ public BspInputSplit(int splitIndex, int numSplits) {
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
+
+ /**
+ * Get the index of this split.
+ *
+ * @return Index of this split.
+ */
+ public int getSplitIndex() {
+ return splitIndex;
+ }
+
+ /**
+ * Get the number of splits for this application.
+ *
+ * @return Total number of splits.
+ */
+ public int getNumSplits() {
+ return numSplits;
+ }
+
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() +
+ ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -34,38 +34,38 @@ import org.apache.log4j.Logger;
* to be called as if a normal Hadoop job.
*/
public class BspOutputFormat extends OutputFormat<Text, Text> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
- @Override
- public void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException {
- if (BspUtils.getVertexOutputFormatClass(context.getConfiguration())
- == null) {
- LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
- " will not check anything");
- return;
- }
- BspUtils.createVertexOutputFormat(context.getConfiguration()).
- checkOutputSpecs(context);
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+ null) {
+ LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
+ " will not check anything");
+ return;
}
+ BspUtils.createVertexOutputFormat(context.getConfiguration()).
+ checkOutputSpecs(context);
+ }
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- if (BspUtils.getVertexOutputFormatClass(context.getConfiguration())
- == null) {
- LOG.warn("getOutputCommitter: Returning " +
- "ImmutableOutputCommiter (does nothing).");
- return new ImmutableOutputCommitter();
- }
- return BspUtils.createVertexOutputFormat(context.getConfiguration()).
- getOutputCommitter(context);
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+ null) {
+ LOG.warn("getOutputCommitter: Returning " +
+ "ImmutableOutputCommiter (does nothing).");
+ return new ImmutableOutputCommitter();
}
+ return BspUtils.createVertexOutputFormat(context.getConfiguration()).
+ getOutputCommitter(context);
+ }
- @Override
- public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new BspRecordWriter();
- }
+ @Override
+ public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BspRecordWriter();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java Thu Feb 16 22:12:31 2012
@@ -29,40 +29,45 @@ import org.apache.hadoop.io.Text;
* Only returns a single key-value pair so that the map() can run.
*/
class BspRecordReader extends RecordReader<Text, Text> {
-
- private static final Text ONLY_KEY = new Text("only key");
- private static final Text ONLY_VALUE = new Text("only value");
-
- /** Has the one record been seen? */
- private boolean seenRecord = false;
-
- @Override
- public void close() throws IOException {
- return;
- }
-
- @Override
- public float getProgress() throws IOException {
- return (seenRecord ? 1f : 0f);
- }
-
- @Override
- public Text getCurrentKey() throws IOException, InterruptedException {
- return ONLY_KEY;
- }
-
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException {
- return ONLY_VALUE;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return (seenRecord ? false : (seenRecord = true));
+ /** Singular key object */
+ private static final Text ONLY_KEY = new Text("only key");
+ /** Single value object */
+ private static final Text ONLY_VALUE = new Text("only value");
+
+ /** Has the one record been seen? */
+ private boolean seenRecord = false;
+
+ @Override
+ public void close() throws IOException {
+ return;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return seenRecord ? 1f : 0f;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return ONLY_KEY;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return ONLY_VALUE;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!seenRecord) {
+ seenRecord = true;
+ return true;
}
+ return false;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java Thu Feb 16 22:12:31 2012
@@ -31,17 +31,17 @@ import org.apache.hadoop.mapreduce.TaskA
*/
public class BspRecordWriter extends RecordWriter<Text, Text> {
- @Override
- public void close(TaskAttemptContext context)
- throws IOException, InterruptedException {
- // Do nothing
- }
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // Do nothing
+ }
- @Override
- public void write(Text key, Text value)
- throws IOException, InterruptedException {
- throw new IOException("write: Cannot write with " +
- getClass().getName() +
- ". Should never be called");
- }
+ @Override
+ public void write(Text key, Text value)
+ throws IOException, InterruptedException {
+ throw new IOException("write: Cannot write with " +
+ getClass().getName() +
+ ". Should never be called");
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Thu Feb 16 22:12:31 2012
@@ -26,45 +26,48 @@ import java.io.IOException;
/**
* Basic service interface shared by both {@link CentralizedServiceMaster} and
* {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface CentralizedService<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> {
- /**
- * Setup (must be called prior to any other function)
- */
- void setup();
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Setup (must be called prior to any other function)
+ */
+ void setup();
- /**
- * Get the current global superstep of the application to work on.
- *
- * @return global superstep (begins at INPUT_SUPERSTEP)
- */
- long getSuperstep();
+ /**
+ * Get the current global superstep of the application to work on.
+ *
+ * @return global superstep (begins at INPUT_SUPERSTEP)
+ */
+ long getSuperstep();
- /**
- * Get the restarted superstep
- *
- * @return -1 if not manually restarted, otherwise the superstep id
- */
- long getRestartedSuperstep();
+ /**
+ * Get the restarted superstep
+ *
+ * @return -1 if not manually restarted, otherwise the superstep id
+ */
+ long getRestartedSuperstep();
- /**
- * Given a superstep, should it be checkpointed based on the
- * checkpoint frequency?
- *
- * @param superstep superstep to check against frequency
- * @return true if checkpoint frequency met or superstep is 1.
- */
- boolean checkpointFrequencyMet(long superstep);
+ /**
+ * Given a superstep, should it be checkpointed based on the
+ * checkpoint frequency?
+ *
+ * @param superstep superstep to check against frequency
+ * @return true if checkpoint frequency met or superstep is 1.
+ */
+ boolean checkpointFrequencyMet(long superstep);
- /**
- * Clean up the service (no calls may be issued after this)
- *
- * @throws IOException
- * @throws InterruptedException
- */
- void cleanup() throws IOException, InterruptedException;
+ /**
+ * Clean up the service (no calls may be issued after this)
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void cleanup() throws IOException, InterruptedException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Feb 16 22:12:31 2012
@@ -22,70 +22,73 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.zookeeper.KeeperException;
/**
* At most, there will be one active master at a time, but many threads can
* be trying to be the active master.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface CentralizedServiceMaster<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends CentralizedService<I, V, E, M> {
- /**
- * Become the master.
- * @return true if became the master, false if the application is done.
- */
- boolean becomeMaster();
-
- /**
- * Create the {@link InputSplit} objects from the index range based on the
- * user-defined VertexInputFormat. The {@link InputSplit} objects will
- * processed by the workers later on during the INPUT_SUPERSTEP.
- *
- * @return Number of partitions. Returns -1 on failure to create
- * valid input splits.
- */
- int createInputSplits();
-
- /**
- * Master coordinates the superstep
- *
- * @return State of the application as a result of this superstep
- * @throws InterruptedException
- * @throws KeeperException
- */
- SuperstepState coordinateSuperstep()
- throws KeeperException, InterruptedException;
-
- /**
- * Master can decide to restart from the last good checkpoint if a
- * worker fails during a superstep.
- *
- * @param checkpoint Checkpoint to restart from
- */
- void restartFromCheckpoint(long checkpoint);
-
- /**
- * Get the last known good checkpoint
- * @throws IOException
- */
- long getLastGoodCheckpoint() throws IOException;
-
- /**
- * If the master decides that this job doesn't have the resources to
- * continue, it can fail the job. It can also designate what to do next.
- * Typically this is mainly informative.
- *
- * @param state
- * @param applicationAttempt attempt to start on
- * @param desiredSuperstep Superstep to restart from (if applicable)
- */
- void setJobState(ApplicationState state,
- long applicationAttempt,
- long desiredSuperstep);
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M extends Writable> extends CentralizedService<I, V, E, M> {
+ /**
+ * Become the master.
+ * @return true if became the master, false if the application is done.
+ */
+ boolean becomeMaster();
+
+ /**
+ * Create the {@link InputSplit} objects from the index range based on the
+ * user-defined VertexInputFormat. The {@link InputSplit} objects will
+ * processed by the workers later on during the INPUT_SUPERSTEP.
+ *
+ * @return Number of partitions. Returns -1 on failure to create
+ * valid input splits.
+ */
+ int createInputSplits();
+
+ /**
+ * Master coordinates the superstep
+ *
+ * @return State of the application as a result of this superstep
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ SuperstepState coordinateSuperstep()
+ throws KeeperException, InterruptedException;
+
+ /**
+ * Master can decide to restart from the last good checkpoint if a
+ * worker fails during a superstep.
+ *
+ * @param checkpoint Checkpoint to restart from
+ */
+ void restartFromCheckpoint(long checkpoint);
+
+ /**
+ * Get the last known good checkpoint
+ *
+ * @return Last good superstep number
+ * @throws IOException
+ */
+ long getLastGoodCheckpoint() throws IOException;
+
+ /**
+ * If the master decides that this job doesn't have the resources to
+ * continue, it can fail the job. It can also designate what to do next.
+ * Typically this is mainly informative.
+ *
+ * @param state State of the application.
+ * @param applicationAttempt Attempt to start on
+ * @param desiredSuperstep Superstep to restart from (if applicable)
+ */
+ void setJobState(ApplicationState state,
+ long applicationAttempt,
+ long desiredSuperstep);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Feb 16 22:12:31 2012
@@ -31,141 +31,143 @@ import org.apache.giraph.graph.GraphMapp
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.WorkerContext;
/**
* All workers should have access to this centralized service to
* execute the following methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface CentralizedServiceWorker<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends CentralizedService<I, V, E, M>, AggregatorUsage {
- /**
- * Get the worker information
- *
- * @return Worker information
- */
- WorkerInfo getWorkerInfo();
-
- /**
- *
- * @return worker's WorkerContext
- */
- WorkerContext getWorkerContext();
-
- /**
- * Get a map of the partition id to the partition for this worker.
- * The partitions contain the vertices for
- * this worker and can be used to run compute() for the vertices or do
- * checkpointing.
- *
- * @return List of partitions that this worker owns.
- */
- Map<Integer, Partition<I, V, E, M>> getPartitionMap();
-
- /**
- * Get a collection of all the partition owners.
- *
- * @return Collection of all the partition owners.
- */
- Collection<? extends PartitionOwner> getPartitionOwners();
-
- /**
- * Both the vertices and the messages need to be checkpointed in order
- * for them to be used. This is done after all messages have been
- * delivered, but prior to a superstep starting.
- */
- void storeCheckpoint() throws IOException;
-
- /**
- * Load the vertices, edges, messages from the beginning of a superstep.
- * Will load the vertex partitions as designated by the master and set the
- * appropriate superstep.
- *
- * @param superstep which checkpoint to use
- * @throws IOException
- */
- void loadCheckpoint(long superstep) throws IOException;
-
- /**
- * Take all steps prior to actually beginning the computation of a
- * superstep.
- *
- * @return Collection of all the partition owners from the master for this
- * superstep.
- */
- Collection<? extends PartitionOwner> startSuperstep();
-
- /**
- * Worker is done with its portion of the superstep. Report the
- * worker level statistics after the computation.
- *
- * @param partitionStatsList All the partition stats for this worker
- * @return true if this is the last superstep, false otherwise
- */
- boolean finishSuperstep(List<PartitionStats> partitionStatsList);
- /**
- * Get the partition that a vertex index would belong to
- *
- * @param vertexIndex Index of the vertex that is used to find the correct
- * partition.
- * @return Correct partition if exists on this worker, null otherwise.
- */
- public Partition<I, V, E, M> getPartition(I vertexIndex);
-
- /**
- * Every client will need to get a partition owner from a vertex id so that
- * they know which worker to sent the request to.
- *
- * @param vertexIndex Vertex index to look for
- * @return PartitionOnwer that should contain this vertex if it exists
- */
- PartitionOwner getVertexPartitionOwner(I vertexIndex);
-
- /**
- * Look up a vertex on a worker given its vertex index.
- *
- * @param vertexIndex Vertex index to look for
- * @return Vertex if it exists on this worker.
- */
- BasicVertex<I, V, E, M> getVertex(I vertexIndex);
-
- /**
- * If desired by the user, vertex partitions are redistributed among
- * workers according to the chosen {@link WorkerGraphPartitioner}.
- *
- * @param masterSetPartitionOwners Partition owner info passed from the
- * master.
- */
- void exchangeVertexPartitions(
- Collection<? extends PartitionOwner> masterSetPartitionOwners);
-
- /**
- * Assign messages to a vertex (bypasses package-private access to
- * setMessages() for internal classes).
- *
- * @param vertex Vertex (owned by worker)
- * @param messageIterator Messages to assign to the vertex
- */
- void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
- Iterable<M> messageIterator);
-
- /**
- * Get the GraphMapper that this service is using. Vertices need to know
- * this.
- *
- * @return BspMapper
- */
- GraphMapper<I, V, E, M> getGraphMapper();
-
- /**
- * Operations that will be called if there is a failure by a worker.
- */
- void failureCleanup();
+public interface CentralizedServiceWorker<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends CentralizedService<I, V, E, M>, AggregatorUsage {
+ /**
+ * Get the worker information
+ *
+ * @return Worker information
+ */
+ WorkerInfo getWorkerInfo();
+
+ /**
+ *
+ * @return worker's WorkerContext
+ */
+ WorkerContext getWorkerContext();
+
+ /**
+ * Get a map of the partition id to the partition for this worker.
+ * The partitions contain the vertices for
+ * this worker and can be used to run compute() for the vertices or do
+ * checkpointing.
+ *
+ * @return List of partitions that this worker owns.
+ */
+ Map<Integer, Partition<I, V, E, M>> getPartitionMap();
+
+ /**
+ * Get a collection of all the partition owners.
+ *
+ * @return Collection of all the partition owners.
+ */
+ Collection<? extends PartitionOwner> getPartitionOwners();
+
+ /**
+ * Both the vertices and the messages need to be checkpointed in order
+ * for them to be used. This is done after all messages have been
+ * delivered, but prior to a superstep starting.
+ */
+ void storeCheckpoint() throws IOException;
+
+ /**
+ * Load the vertices, edges, messages from the beginning of a superstep.
+ * Will load the vertex partitions as designated by the master and set the
+ * appropriate superstep.
+ *
+ * @param superstep which checkpoint to use
+ * @throws IOException
+ */
+ void loadCheckpoint(long superstep) throws IOException;
+
+ /**
+ * Take all steps prior to actually beginning the computation of a
+ * superstep.
+ *
+ * @return Collection of all the partition owners from the master for this
+ * superstep.
+ */
+ Collection<? extends PartitionOwner> startSuperstep();
+
+ /**
+ * Worker is done with its portion of the superstep. Report the
+ * worker level statistics after the computation.
+ *
+ * @param partitionStatsList All the partition stats for this worker
+ * @return true if this is the last superstep, false otherwise
+ */
+ boolean finishSuperstep(List<PartitionStats> partitionStatsList);
+
+ /**
+ * Get the partition that a vertex index would belong to
+ *
+ * @param vertexIndex Index of the vertex that is used to find the correct
+ * partition.
+ * @return Correct partition if exists on this worker, null otherwise.
+ */
+ Partition<I, V, E, M> getPartition(I vertexIndex);
+
+ /**
+ * Every client will need to get a partition owner from a vertex id so that
+ * they know which worker to sent the request to.
+ *
+ * @param vertexIndex Vertex index to look for
+ * @return PartitionOnwer that should contain this vertex if it exists
+ */
+ PartitionOwner getVertexPartitionOwner(I vertexIndex);
+
+ /**
+ * Look up a vertex on a worker given its vertex index.
+ *
+ * @param vertexIndex Vertex index to look for
+ * @return Vertex if it exists on this worker.
+ */
+ BasicVertex<I, V, E, M> getVertex(I vertexIndex);
+
+ /**
+ * If desired by the user, vertex partitions are redistributed among
+ * workers according to the chosen {@link WorkerGraphPartitioner}.
+ *
+ * @param masterSetPartitionOwners Partition owner info passed from the
+ * master.
+ */
+ void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners);
+
+ /**
+ * Assign messages to a vertex (bypasses package-private access to
+ * setMessages() for internal classes).
+ *
+ * @param vertex Vertex (owned by worker)
+ * @param messageIterator Messages to assign to the vertex
+ */
+ void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
+ Iterable<M> messageIterator);
+
+ /**
+ * Get the GraphMapper that this service is using. Vertices need to know
+ * this.
+ *
+ * @return BspMapper
+ */
+ GraphMapper<I, V, E, M> getGraphMapper();
+
+ /**
+ * Operations that will be called if there is a failure by a worker.
+ */
+ void failureCleanup();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java Thu Feb 16 22:12:31 2012
@@ -30,35 +30,35 @@ import org.apache.hadoop.mapreduce.TaskA
* FileOutputCommitter.
*/
public class ImmutableOutputCommitter extends OutputCommitter {
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- }
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ }
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- }
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ }
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context)
- throws IOException {
- return false;
- }
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context)
+ throws IOException {
+ return false;
+ }
- @Override
- public void setupJob(JobContext context) throws IOException {
- }
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ }
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- }
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ }
- /*if[HADOOP_NON_SECURE]
+ /*if[HADOOP_NON_SECURE]
@Override
public void cleanupJob(JobContext jobContext) throws IOException {
}
else[HADOOP_NON_SECURE]*/
- @Override
- /*end[HADOOP_NON_SECURE]*/
- public void commitJob(JobContext jobContext) throws IOException {
- }
+ @Override
+ /*end[HADOOP_NON_SECURE]*/
+ public void commitJob(JobContext jobContext) throws IOException {
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java Thu Feb 16 22:12:31 2012
@@ -22,8 +22,12 @@ package org.apache.giraph.bsp;
* State of a coordinated superstep
*/
public enum SuperstepState {
- INITIAL, ///< Nothing happened yet
- WORKER_FAILURE, ///< A worker died during this superstep
- THIS_SUPERSTEP_DONE, ///< This superstep completed correctly
- ALL_SUPERSTEPS_DONE, ///< All supersteps are complete
+ /** Nothing happened yet */
+ INITIAL,
+ /** A worker died during this superstep */
+ WORKER_FAILURE,
+ /** This superstep completed correctly */
+ THIS_SUPERSTEP_DONE,
+ /** All supersteps are complete */
+ ALL_SUPERSTEPS_DONE,
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of generic bulk synchronous processing objects.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.bsp;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java Thu Feb 16 22:12:31 2012
@@ -30,84 +30,96 @@ import org.apache.hadoop.util.Reflection
/**
* A Writable for ListArray containing instances of a class.
+ *
+ * @param <M> Message data
*/
public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M>
- implements Writable, Configurable {
- /** Used for instantiation */
- private Class<M> refClass = null;
- /** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 1L;
- /** Configuration */
- private Configuration conf;
-
- /**
- * Using the default constructor requires that the user implement
- * setClass(), guaranteed to be invoked prior to instantiation in
- * readFields()
- */
- public ArrayListWritable() {
- }
-
- public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
- super(arrayListWritable);
- }
-
- /**
- * This constructor allows setting the refClass during construction.
- *
- * @param refClass internal type class
- */
- public ArrayListWritable(Class<M> refClass) {
- super();
- this.refClass = refClass;
- }
-
- /**
- * This is a one-time operation to set the class type
- *
- * @param refClass internal type class
- */
- public void setClass(Class<M> refClass) {
- if (this.refClass != null) {
- throw new RuntimeException(
- "setClass: refClass is already set to " +
- this.refClass.getName());
- }
- this.refClass = refClass;
- }
-
- /**
- * Subclasses must set the class type appropriately and can use
- * setClass(Class<M> refClass) to do it.
- */
- public abstract void setClass();
-
- public void readFields(DataInput in) throws IOException {
- if (this.refClass == null) {
- setClass();
- }
- int numValues = in.readInt(); // read number of values
- ensureCapacity(numValues);
- for (int i = 0; i < numValues; i++) {
- M value = ReflectionUtils.newInstance(refClass, conf);
- value.readFields(in); // read a value
- add(value); // store it in values
- }
- }
-
- public void write(DataOutput out) throws IOException {
- int numValues = size();
- out.writeInt(numValues); // write number of values
- for (int i = 0; i < numValues; i++) {
- get(i).write(out);
- }
- }
-
- public final Configuration getConf() {
- return conf;
- }
-
- public final void setConf(Configuration conf) {
- this.conf = conf;
- }
+ implements Writable, Configurable {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 1L;
+ /** Used for instantiation */
+ private Class<M> refClass = null;
+
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Using the default constructor requires that the user implement
+ * setClass(), guaranteed to be invoked prior to instantiation in
+ * readFields()
+ */
+ public ArrayListWritable() {
+ }
+
+ /**
+ * Constructor with another {@link ArrayListWritable}.
+ *
+ * @param arrayListWritable Array list to be used internally.
+ */
+ public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
+ super(arrayListWritable);
+ }
+
+ /**
+ * This constructor allows setting the refClass during construction.
+ *
+ * @param refClass internal type class
+ */
+ public ArrayListWritable(Class<M> refClass) {
+ super();
+ this.refClass = refClass;
+ }
+
+ /**
+ * This is a one-time operation to set the class type
+ *
+ * @param refClass internal type class
+ */
+ public void setClass(Class<M> refClass) {
+ if (this.refClass != null) {
+ throw new RuntimeException(
+ "setClass: refClass is already set to " +
+ this.refClass.getName());
+ }
+ this.refClass = refClass;
+ }
+
+ /**
+ * Subclasses must set the class type appropriately and can use
+ * setClass(Class<M> refClass) to do it.
+ */
+ public abstract void setClass();
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (this.refClass == null) {
+ setClass();
+ }
+ int numValues = in.readInt(); // read number of values
+ ensureCapacity(numValues);
+ for (int i = 0; i < numValues; i++) {
+ M value = ReflectionUtils.newInstance(refClass, conf);
+ value.readFields(in); // read a value
+ add(value); // store it in values
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int numValues = size();
+ out.writeInt(numValues); // write number of values
+ for (int i = 0; i < numValues; i++) {
+ get(i).write(out);
+ }
+ }
+
+ @Override
+ public final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}