You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [2/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Feb 16 22:12:31 2012
@@ -41,361 +41,386 @@ import java.util.Random;
  * Random Message Benchmark for evaluating the messaging performance.
  */
 public class RandomMessageBenchmark implements Tool {
-    /** Configuration from Configurable */
-    private Configuration conf;
+  /** How many supersteps to run */
+  public static final String SUPERSTEP_COUNT =
+      "RandomMessageBenchmark.superstepCount";
+  /** How many bytes per message */
+  public static final String NUM_BYTES_PER_MESSAGE =
+      "RandomMessageBenchmark.numBytesPerMessage";
+  /** Default bytes per message */
+  public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+  /** How many messages per edge */
+  public static final String NUM_MESSAGES_PER_EDGE =
+      "RandomMessageBenchmark.numMessagesPerEdge";
+  /** Default messages per edge */
+  public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+  /** All bytes sent during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+      "superstep total bytes sent";
+  /** All bytes sent during this application */
+  public static final String AGG_TOTAL_BYTES = "total bytes sent";
+  /** All messages during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+      "superstep total messages";
+  /** All messages during this application */
+  public static final String AGG_TOTAL_MESSAGES = "total messages";
+  /** All millis during this superstep */
+  public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+      "superstep total millis";
+  /** All millis during this application */
+  public static final String AGG_TOTAL_MILLIS = "total millis";
+  /** Workers for that superstep */
+  public static final String WORKERS = "workers";
+  /** Class logger */
+  private static final Logger LOG =
+    Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+  /** Configuration from Configurable */
+  private Configuration conf;
+
+  /**
+   * {@link WorkerContext} forRandomMessageBenchmark.
+   */
+  private static class RandomMessageBenchmarkWorkerContext extends
+      WorkerContext {
+    /** Class logger */
+    private static final Logger LOG =
+      Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+    /** Bytes to be sent */
+    private byte[] messageBytes;
+    /** Number of messages sent per edge */
+    private int numMessagesPerEdge = -1;
+    /** Number of supersteps */
+    private int numSupersteps = -1;
+    /** Random generator for random bytes message */
+    private final Random random = new Random(System.currentTimeMillis());
+    /** Start superstep millis */
+    private long startSuperstepMillis = 0;
+    /** Total bytes */
+    private long totalBytes = 0;
+    /** Total messages */
+    private long totalMessages = 0;
+    /** Total millis */
+    private long totalMillis = 0;
+    /** Class logger */
 
-    /** How many supersteps to run */
-    public static final String SUPERSTEP_COUNT =
-        "RandomMessageBenchmark.superstepCount";
-    /** How many bytes per message */
-    public static final String NUM_BYTES_PER_MESSAGE =
-        "RandomMessageBenchmark.numBytesPerMessage";
-    /** Default bytes per message */
-    public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
-    /** How many messages per edge */
-    public static final String NUM_MESSAGES_PER_EDGE=
-        "RandomMessageBenchmark.numMessagesPerEdge";
-    /** Default messages per edge */
-    public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
-    /** All bytes sent during this superstep */
-    public static final String AGG_SUPERSTEP_TOTAL_BYTES =
-        "superstep total bytes sent";
-    /** All bytes sent during this application */
-    public static final String AGG_TOTAL_BYTES = "total bytes sent";
-    /** All messages during this superstep */
-    public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
-        "superstep total messages";
-    /** All messages during this application */
-    public static final String AGG_TOTAL_MESSAGES = "total messages";
-    /** All millis during this superstep */
-    public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
-        "superstep total millis";
-    /** All millis during this application */
-    public static final String AGG_TOTAL_MILLIS = "total millis";
-    /** Workers for that superstep */
-    public static final String WORKERS = "workers";
-
-    /**
-     * {@link WorkerContext} forRandomMessageBenchmark.
-     */
-    private static class RandomMessageBenchmarkWorkerContext extends
-            WorkerContext {
-        /** Bytes to be sent */
-        private byte[] messageBytes;
-        /** Number of messages sent per edge */
-        private int numMessagesPerEdge = -1;
-        /** Number of supersteps */
-        private int numSupersteps = -1;
-        /** Random generator for random bytes message */
-        private final Random random = new Random(System.currentTimeMillis());
-        /** Start superstep millis */
-        private long startSuperstepMillis = 0;
-        /** Total bytes */
-        private long totalBytes = 0;
-        /** Total messages */
-        private long totalMessages = 0;
-        /** Total millis */
-        private long totalMillis = 0;
-        /** Class logger */
-        private static final Logger LOG =
-            Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
-
-        @Override
-        public void preApplication()
-                throws InstantiationException, IllegalAccessException {
-            messageBytes =
-                new byte[getContext().getConfiguration().
-                         getInt(NUM_BYTES_PER_MESSAGE,
-                               DEFAULT_NUM_BYTES_PER_MESSAGE)];
-            numMessagesPerEdge =
-                getContext().getConfiguration().
-                    getInt(NUM_MESSAGES_PER_EDGE,
-                           DEFAULT_NUM_MESSAGES_PER_EDGE);
-            numSupersteps = getContext().getConfiguration().
-                                getInt(SUPERSTEP_COUNT, -1);
-            registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
-                LongSumAggregator.class);
-            registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
-                LongSumAggregator.class);
-            registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
-                LongSumAggregator.class);
-            registerAggregator(WORKERS,
-                LongSumAggregator.class);
-        }
-
-        @Override
-        public void preSuperstep() {
-            LongSumAggregator superstepBytesAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-            LongSumAggregator superstepMessagesAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
-            LongSumAggregator superstepMillisAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
-            LongSumAggregator workersAggregator =
-                (LongSumAggregator) getAggregator(WORKERS);
-
-            // For timing and tracking the supersteps
-            // - superstep 0 starts the time, but cannot display any stats
-            //   since nothing has been aggregated yet
-            // - supersteps > 0 can display the stats
-            if (getSuperstep() == 0) {
-                startSuperstepMillis = System.currentTimeMillis();
-            } else {
-                totalBytes +=
-                        superstepBytesAggregator.getAggregatedValue().get();
-                totalMessages +=
-                        superstepMessagesAggregator.getAggregatedValue().get();
-                totalMillis +=
-                        superstepMillisAggregator.getAggregatedValue().get();
-                double superstepMegabytesPerSecond =
-                        superstepBytesAggregator.getAggregatedValue().get() *
-                        workersAggregator.getAggregatedValue().get() *
-                        1000d / 1024d / 1024d /
-                        superstepMillisAggregator.getAggregatedValue().get();
-                double megabytesPerSecond = totalBytes *
-                        workersAggregator.getAggregatedValue().get() *
-                        1000d / 1024d / 1024d / totalMillis;
-                double superstepMessagesPerSecond =
-                        superstepMessagesAggregator.getAggregatedValue().get() *
-                        workersAggregator.getAggregatedValue().get() * 1000d /
-                        superstepMillisAggregator.getAggregatedValue().get();
-                double messagesPerSecond = totalMessages *
-                        workersAggregator.getAggregatedValue().get() * 1000d /
-                        totalMillis;
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Outputing statistics for superstep " +
-                             getSuperstep());
-                    LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
-                             superstepBytesAggregator.getAggregatedValue());
-                    LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
-                    LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
-                             superstepMessagesAggregator.getAggregatedValue());
-                    LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
-                    LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
-                             superstepMillisAggregator.getAggregatedValue());
-                    LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
-                    LOG.info(WORKERS + " : " +
-                             workersAggregator.getAggregatedValue());
-                    LOG.info("Superstep megabytes / second = " +
-                             superstepMegabytesPerSecond);
-                    LOG.info("Total megabytes / second = " +
-                             megabytesPerSecond);
-                    LOG.info("Superstep messages / second = " +
-                             superstepMessagesPerSecond);
-                    LOG.info("Total messages / second = " +
-                             messagesPerSecond);
-                    LOG.info("Superstep megabytes / second / worker = " +
-                             superstepMegabytesPerSecond /
-                             workersAggregator.getAggregatedValue().get());
-                    LOG.info("Total megabytes / second / worker = " +
-                             megabytesPerSecond /
-                             workersAggregator.getAggregatedValue().get());
-                    LOG.info("Superstep messages / second / worker = " +
-                             superstepMessagesPerSecond /
-                             workersAggregator.getAggregatedValue().get());
-                    LOG.info("Total messages / second / worker = " +
-                             messagesPerSecond /
-                             workersAggregator.getAggregatedValue().get());
-                }
-            }
-
-            superstepBytesAggregator.setAggregatedValue(
-                new LongWritable(0L));
-            superstepMessagesAggregator.setAggregatedValue(
-                new LongWritable(0L));
-            workersAggregator.setAggregatedValue(
-                new LongWritable(1L));
-            useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-            useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
-            useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
-            useAggregator(WORKERS);
-        }
-
-        @Override
-        public void postSuperstep() {
-            LongSumAggregator superstepMillisAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
-            long endSuperstepMillis = System.currentTimeMillis();
-            long superstepMillis = endSuperstepMillis - startSuperstepMillis;
-            startSuperstepMillis = endSuperstepMillis;
-            superstepMillisAggregator.setAggregatedValue(
-                new LongWritable(superstepMillis));
-        }
-
-        @Override
-        public void postApplication() {}
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      messageBytes =
+        new byte[getContext().getConfiguration().
+                 getInt(NUM_BYTES_PER_MESSAGE,
+                 DEFAULT_NUM_BYTES_PER_MESSAGE)];
+      numMessagesPerEdge =
+          getContext().getConfiguration().
+          getInt(NUM_MESSAGES_PER_EDGE,
+              DEFAULT_NUM_MESSAGES_PER_EDGE);
+      numSupersteps = getContext().getConfiguration().
+          getInt(SUPERSTEP_COUNT, -1);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+          LongSumAggregator.class);
+      registerAggregator(WORKERS,
+          LongSumAggregator.class);
+    }
 
-        public byte[] getMessageBytes() {
-            return messageBytes;
-        }
+    @Override
+    public void preSuperstep() {
+      LongSumAggregator superstepBytesAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+      LongSumAggregator superstepMessagesAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+      LongSumAggregator superstepMillisAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+      LongSumAggregator workersAggregator =
+          (LongSumAggregator) getAggregator(WORKERS);
+
+      // For timing and tracking the supersteps
+      // - superstep 0 starts the time, but cannot display any stats
+      //   since nothing has been aggregated yet
+      // - supersteps > 0 can display the stats
+      if (getSuperstep() == 0) {
+        startSuperstepMillis = System.currentTimeMillis();
+      } else {
+        totalBytes +=
+            superstepBytesAggregator.getAggregatedValue().get();
+        totalMessages +=
+            superstepMessagesAggregator.getAggregatedValue().get();
+        totalMillis +=
+            superstepMillisAggregator.getAggregatedValue().get();
+        double superstepMegabytesPerSecond =
+            superstepBytesAggregator.getAggregatedValue().get() *
+            workersAggregator.getAggregatedValue().get() *
+            1000d / 1024d / 1024d /
+            superstepMillisAggregator.getAggregatedValue().get();
+        double megabytesPerSecond = totalBytes *
+            workersAggregator.getAggregatedValue().get() *
+            1000d / 1024d / 1024d / totalMillis;
+        double superstepMessagesPerSecond =
+            superstepMessagesAggregator.getAggregatedValue().get() *
+            workersAggregator.getAggregatedValue().get() * 1000d /
+            superstepMillisAggregator.getAggregatedValue().get();
+        double messagesPerSecond = totalMessages *
+            workersAggregator.getAggregatedValue().get() * 1000d /
+            totalMillis;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Outputing statistics for superstep " +
+              getSuperstep());
+          LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
+              superstepBytesAggregator.getAggregatedValue());
+          LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+          LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
+              superstepMessagesAggregator.getAggregatedValue());
+          LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+          LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
+              superstepMillisAggregator.getAggregatedValue());
+          LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+          LOG.info(WORKERS + " : " +
+              workersAggregator.getAggregatedValue());
+          LOG.info("Superstep megabytes / second = " +
+              superstepMegabytesPerSecond);
+          LOG.info("Total megabytes / second = " +
+              megabytesPerSecond);
+          LOG.info("Superstep messages / second = " +
+              superstepMessagesPerSecond);
+          LOG.info("Total messages / second = " +
+              messagesPerSecond);
+          LOG.info("Superstep megabytes / second / worker = " +
+              superstepMegabytesPerSecond /
+              workersAggregator.getAggregatedValue().get());
+          LOG.info("Total megabytes / second / worker = " +
+              megabytesPerSecond /
+              workersAggregator.getAggregatedValue().get());
+          LOG.info("Superstep messages / second / worker = " +
+              superstepMessagesPerSecond /
+              workersAggregator.getAggregatedValue().get());
+          LOG.info("Total messages / second / worker = " +
+              messagesPerSecond /
+              workersAggregator.getAggregatedValue().get());
+        }
+      }
+
+      superstepBytesAggregator.setAggregatedValue(
+          new LongWritable(0L));
+      superstepMessagesAggregator.setAggregatedValue(
+          new LongWritable(0L));
+      workersAggregator.setAggregatedValue(
+          new LongWritable(1L));
+      useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+      useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+      useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+      useAggregator(WORKERS);
+    }
 
-        public int getNumMessagePerEdge() {
-            return numMessagesPerEdge;
-        }
+    @Override
+    public void postSuperstep() {
+      LongSumAggregator superstepMillisAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+      long endSuperstepMillis = System.currentTimeMillis();
+      long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+      startSuperstepMillis = endSuperstepMillis;
+      superstepMillisAggregator.setAggregatedValue(
+          new LongWritable(superstepMillis));
+    }
 
-        public int getNumSupersteps() {
-            return numSupersteps;
-        }
+    @Override
+    public void postApplication() { }
 
-        public void randomizeMessageBytes() {
-            random.nextBytes(messageBytes);
-        }
+    /**
+     * Get the message bytes to be used for sending.
+     *
+     * @return Byte array used for messages.
+     */
+    public byte[] getMessageBytes() {
+      return messageBytes;
     }
 
     /**
-     * Actual message computation (messaging in this case)
+     * Get the number of edges per message.
+     *
+     * @return Messages per edge.
      */
-    public static class RandomMessageVertex extends EdgeListVertex<
-            LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
-
-        @Override
-        public void compute(Iterator<BytesWritable> msgIterator) {
-            RandomMessageBenchmarkWorkerContext workerContext =
-                (RandomMessageBenchmarkWorkerContext) getWorkerContext();
-            LongSumAggregator superstepBytesAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-            LongSumAggregator superstepMessagesAggregator =
-                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
-            if (getSuperstep() < workerContext.getNumSupersteps()) {
-                for (int i = 0; i < workerContext.getNumMessagePerEdge();
-                        i++) {
-                    workerContext.randomizeMessageBytes();
-                    sendMsgToAllEdges(
-                        new BytesWritable(workerContext.getMessageBytes()));
-                    long bytesSent = workerContext.getMessageBytes().length *
-                        getNumOutEdges();
-                    superstepBytesAggregator.aggregate(bytesSent);
-                    superstepMessagesAggregator.aggregate(getNumOutEdges());
-                }
-            } else {
-                voteToHalt();
-            }
-        }
+    public int getNumMessagePerEdge() {
+      return numMessagesPerEdge;
     }
 
-    @Override
-    public Configuration getConf() {
-        return conf;
+    /**
+     * Get the number of supersteps.
+     *
+     * @return Number of supersteps.
+     */
+    public int getNumSupersteps() {
+      return numSupersteps;
     }
 
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
+    /**
+     * Randomize the message bytes.
+     */
+    public void randomizeMessageBytes() {
+      random.nextBytes(messageBytes);
     }
+  }
 
+  /**
+   * Actual message computation (messaging in this case)
+   */
+  public static class RandomMessageVertex extends EdgeListVertex<
+      LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
     @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        options.addOption("h", "help", false, "Help");
-        options.addOption("v", "verbose", false, "Verbose");
-        options.addOption("w",
-                "workers",
-                true,
-                "Number of workers");
-        options.addOption("b",
-                "bytes",
-                true,
-                "Message bytes per memssage");
-        options.addOption("n",
-                "number",
-                true,
-                "Number of messages per edge");
-        options.addOption("s",
-                "supersteps",
-                true,
-                "Supersteps to execute before finishing");
-        options.addOption("V",
-                "aggregateVertices",
-                true,
-                "Aggregate vertices");
-        options.addOption("e",
-                "edgesPerVertex",
-                true,
-                "Edges per vertex");
-        options.addOption("f",
-                "flusher",
-                true,
-                "Number of flush threads");
-
-        HelpFormatter formatter = new HelpFormatter();
-        if (args.length == 0) {
-            formatter.printHelp(getClass().getName(), options, true);
-            return 0;
-        }
-        CommandLineParser parser = new PosixParser();
-        CommandLine cmd = parser.parse(options, args);
-        if (cmd.hasOption('h')) {
-            formatter.printHelp(getClass().getName(), options, true);
-            return 0;
-        }
-        if (!cmd.hasOption('w')) {
-            System.out.println("Need to choose the number of workers (-w)");
-            return -1;
-        }
-        if (!cmd.hasOption('s')) {
-            System.out.println("Need to set the number of supersteps (-s)");
-            return -1;
-        }
-        if (!cmd.hasOption('V')) {
-            System.out.println("Need to set the aggregate vertices (-V)");
-            return -1;
-        }
-        if (!cmd.hasOption('e')) {
-            System.out.println("Need to set the number of edges " +
-                               "per vertex (-e)");
-            return -1;
-        }
-        if (!cmd.hasOption('b')) {
-            System.out.println("Need to set the number of message bytes (-b)");
-            return -1;
-        }
-        if (!cmd.hasOption('n')) {
-            System.out.println("Need to set the number of messages per edge (-n)");
-            return -1;
-        }
-        int workers = Integer.parseInt(cmd.getOptionValue('w'));
-        GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-        job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
-        job.setVertexClass(RandomMessageVertex.class);
-        job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-        job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
-        job.setWorkerConfiguration(workers, workers, 100.0f);
-        job.getConfiguration().setLong(
-            PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
-            Long.parseLong(cmd.getOptionValue('V')));
-        job.getConfiguration().setLong(
-            PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
-            Long.parseLong(cmd.getOptionValue('e')));
-        job.getConfiguration().setInt(
-            SUPERSTEP_COUNT,
-            Integer.parseInt(cmd.getOptionValue('s')));
-        job.getConfiguration().setInt(
-            RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
-            Integer.parseInt(cmd.getOptionValue('b')));
-        job.getConfiguration().setInt(
-            RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
-            Integer.parseInt(cmd.getOptionValue('n')));
-
-        boolean isVerbose = false;
-        if (cmd.hasOption('v')) {
-            isVerbose = true;
-        }
-        if (cmd.hasOption('s')) {
-            getConf().setInt(SUPERSTEP_COUNT,
-                             Integer.parseInt(cmd.getOptionValue('s')));
-        }
-        if (cmd.hasOption('f')) {
-            job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
-                Integer.parseInt(cmd.getOptionValue('f')));
-        }
-        if (job.run(isVerbose) == true) {
-            return 0;
-        } else {
-            return -1;
-        }
+    public void compute(Iterator<BytesWritable> msgIterator) {
+      RandomMessageBenchmarkWorkerContext workerContext =
+          (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+      LongSumAggregator superstepBytesAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+      LongSumAggregator superstepMessagesAggregator =
+          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+      if (getSuperstep() < workerContext.getNumSupersteps()) {
+        for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
+          workerContext.randomizeMessageBytes();
+          sendMsgToAllEdges(
+              new BytesWritable(workerContext.getMessageBytes()));
+          long bytesSent = workerContext.getMessageBytes().length *
+              getNumOutEdges();
+          superstepBytesAggregator.aggregate(bytesSent);
+          superstepMessagesAggregator.aggregate(getNumOutEdges());
+        }
+      } else {
+        voteToHalt();
+      }
     }
+  }
 
-    public static void main(String[] args) throws Exception {
-        System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("b",
+        "bytes",
+        true,
+        "Message bytes per memssage");
+    options.addOption("n",
+        "number",
+        true,
+        "Number of messages per edge");
+    options.addOption("s",
+        "supersteps",
+        true,
+        "Supersteps to execute before finishing");
+    options.addOption("V",
+        "aggregateVertices",
+        true,
+        "Aggregate vertices");
+    options.addOption("e",
+        "edgesPerVertex",
+        true,
+        "Edges per vertex");
+    options.addOption("f",
+        "flusher",
+        true,
+        "Number of flush threads");
+
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('s')) {
+      LOG.info("Need to set the number of supersteps (-s)");
+      return -1;
+    }
+    if (!cmd.hasOption('V')) {
+      LOG.info("Need to set the aggregate vertices (-V)");
+      return -1;
+    }
+    if (!cmd.hasOption('e')) {
+      LOG.info("Need to set the number of edges " +
+          "per vertex (-e)");
+      return -1;
+    }
+    if (!cmd.hasOption('b')) {
+      LOG.info("Need to set the number of message bytes (-b)");
+      return -1;
     }
+    if (!cmd.hasOption('n')) {
+      LOG.info("Need to set the number of messages per edge (-n)");
+      return -1;
+    }
+    int workers = Integer.parseInt(cmd.getOptionValue('w'));
+    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+    job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
+    job.setVertexClass(RandomMessageVertex.class);
+    job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+    job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
+    job.setWorkerConfiguration(workers, workers, 100.0f);
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
+        Long.parseLong(cmd.getOptionValue('V')));
+    job.getConfiguration().setLong(
+        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX,
+        Long.parseLong(cmd.getOptionValue('e')));
+    job.getConfiguration().setInt(
+        SUPERSTEP_COUNT,
+        Integer.parseInt(cmd.getOptionValue('s')));
+    job.getConfiguration().setInt(
+        RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
+        Integer.parseInt(cmd.getOptionValue('b')));
+    job.getConfiguration().setInt(
+        RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
+        Integer.parseInt(cmd.getOptionValue('n')));
+
+    boolean isVerbose = false;
+    if (cmd.hasOption('v')) {
+      isVerbose = true;
+    }
+    if (cmd.hasOption('s')) {
+      getConf().setInt(SUPERSTEP_COUNT,
+          Integer.parseInt(cmd.getOptionValue('s')));
+    }
+    if (cmd.hasOption('f')) {
+      job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+          Integer.parseInt(cmd.getOptionValue('f')));
+    }
+    if (job.run(isVerbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically, this is the command line arguments.
+   * @throws Exception Any exception thrown during computation.
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of benchmarks for performance testing and optimization
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.benchmark;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java Thu Feb 16 22:12:31 2012
@@ -22,8 +22,12 @@ package org.apache.giraph.bsp;
  *  State of the BSP application
  */
 public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
+  /** Shouldn't be seen, just an initial state */
+  UNKNOWN,
+  /** Start from a desired superstep */
+  START_SUPERSTEP,
+  /** Unrecoverable */
+  FAILED,
+  /** Successful completion */
+  FINISHED,
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputFormat.java Thu Feb 16 22:12:31 2012
@@ -39,52 +39,55 @@ import org.apache.log4j.Logger;
  * separate.  It is not meant to do any meaningful split of user-data.
  */
 public class BspInputFormat extends InputFormat<Text, Text> {
-    /** Logger */
-    private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
+  /** Class Logger */
+  private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
 
-    /**
-     * Get the correct number of mappers based on the configuration
-     *
-     * @param conf Configuration to determine the number of mappers
-     */
-    public static int getMaxTasks(Configuration conf) {
-        int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
-        boolean splitMasterWorker =
-            conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
-                            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
-        int maxTasks = maxWorkers;
-        if (splitMasterWorker) {
-            int zkServers =
-                conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-                            GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
-            maxTasks += zkServers;
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
-                      ", split master/worker = " + splitMasterWorker +
-                      ", total max tasks = " + maxTasks);
-        }
-        return maxTasks;
+  /**
+   * Get the correct number of mappers based on the configuration
+   *
+   * @param conf Configuration to determine the number of mappers
+   * @return Maximum number of tasks
+   */
+  public static int getMaxTasks(Configuration conf) {
+    int maxWorkers = conf.getInt(GiraphJob.MAX_WORKERS, 0);
+    boolean splitMasterWorker =
+        conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
+            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+    int maxTasks = maxWorkers;
+    if (splitMasterWorker) {
+      int zkServers =
+          conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+              GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+      maxTasks += zkServers;
     }
-
-    public List<InputSplit> getSplits(JobContext context)
-        throws IOException, InterruptedException {
-        Configuration conf = context.getConfiguration();
-        int maxTasks = getMaxTasks(conf);
-        if (maxTasks <= 0) {
-            throw new InterruptedException(
-                "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
-        }
-        List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-        for (int i = 0; i < maxTasks; ++i) {
-            inputSplitList.add(new BspInputSplit());
-        }
-        return inputSplitList;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
+          ", split master/worker = " + splitMasterWorker +
+          ", total max tasks = " + maxTasks);
     }
+    return maxTasks;
+  }
 
-    public RecordReader<Text, Text>
-        createRecordReader(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-        return new BspRecordReader();
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    int maxTasks = getMaxTasks(conf);
+    if (maxTasks <= 0) {
+      throw new InterruptedException(
+          "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
+    }
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < maxTasks; ++i) {
+      inputSplitList.add(new BspInputSplit());
     }
+    return inputSplitList;
+  }
+
+  @Override
+  public RecordReader<Text, Text>
+  createRecordReader(InputSplit split, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new BspRecordReader();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspInputSplit.java Thu Feb 16 22:12:31 2012
@@ -32,51 +32,70 @@ import org.apache.hadoop.mapreduce.Input
  * directly.
  */
 public class BspInputSplit extends InputSplit implements Writable {
-    /** Number of splits */
-    private int numSplits = -1;
-    /** Split index */
-    private int splitIndex = -1;
-
-    public BspInputSplit() {}
-
-    public BspInputSplit(int splitIndex, int numSplits) {
-        this.splitIndex = splitIndex;
-        this.numSplits = numSplits;
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[]{};
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        splitIndex = in.readInt();
-        numSplits = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(splitIndex);
-        out.writeInt(numSplits);
-    }
-
-    public int getSplitIndex() {
-        return splitIndex;
-    }
-
-    public int getNumSplits() {
-        return numSplits;
-    }
-
-    @Override
-    public String toString() {
-        return "'" + getClass().getCanonicalName() +
-            ", index=" + getSplitIndex() + ", num=" + getNumSplits();
-    }
+  /** Number of splits */
+  private int numSplits = -1;
+  /** Split index */
+  private int splitIndex = -1;
+
+  /**
+   * Reflection constructor.
+   */
+  public BspInputSplit() { }
+
+  /**
+   * Constructor used by {@link BspInputFormat}.
+   *
+   * @param splitIndex Index of this split.
+   * @param numSplits Total number of splits.
+   */
+  public BspInputSplit(int splitIndex, int numSplits) {
+    this.splitIndex = splitIndex;
+    this.numSplits = numSplits;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return new String[]{};
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    splitIndex = in.readInt();
+    numSplits = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(splitIndex);
+    out.writeInt(numSplits);
+  }
+
+  /**
+   * Get the index of this split.
+   *
+   * @return Index of this split.
+   */
+  public int getSplitIndex() {
+    return splitIndex;
+  }
+
+  /**
+   * Get the number of splits for this application.
+   *
+   * @return Total number of splits.
+   */
+  public int getNumSplits() {
+    return numSplits;
+  }
+
+  @Override
+  public String toString() {
+    return "'" + getClass().getCanonicalName() +
+      ", index=" + getSplitIndex() + ", num=" + getNumSplits();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -34,38 +34,38 @@ import org.apache.log4j.Logger;
  * to be called as if a normal Hadoop job.
  */
 public class BspOutputFormat extends OutputFormat<Text, Text> {
-    /** Class logger */
-    private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(BspOutputFormat.class);
 
-    @Override
-    public void checkOutputSpecs(JobContext context)
-            throws IOException, InterruptedException {
-        if (BspUtils.getVertexOutputFormatClass(context.getConfiguration())
-                == null) {
-            LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
-                     " will not check anything");
-            return;
-        }
-        BspUtils.createVertexOutputFormat(context.getConfiguration()).
-            checkOutputSpecs(context);
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+        null) {
+      LOG.warn("checkOutputSpecs: ImmutableOutputCommiter" +
+          " will not check anything");
+      return;
     }
+    BspUtils.createVertexOutputFormat(context.getConfiguration()).
+      checkOutputSpecs(context);
+  }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        if (BspUtils.getVertexOutputFormatClass(context.getConfiguration())
-                == null) {
-            LOG.warn("getOutputCommitter: Returning " +
-                     "ImmutableOutputCommiter (does nothing).");
-            return new ImmutableOutputCommitter();
-        }
-        return BspUtils.createVertexOutputFormat(context.getConfiguration()).
-            getOutputCommitter(context);
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    if (BspUtils.getVertexOutputFormatClass(context.getConfiguration()) ==
+        null) {
+      LOG.warn("getOutputCommitter: Returning " +
+          "ImmutableOutputCommiter (does nothing).");
+      return new ImmutableOutputCommitter();
     }
+    return BspUtils.createVertexOutputFormat(context.getConfiguration()).
+        getOutputCommitter(context);
+  }
 
-    @Override
-    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        return new BspRecordWriter();
-    }
+  @Override
+  public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new BspRecordWriter();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordReader.java Thu Feb 16 22:12:31 2012
@@ -29,40 +29,45 @@ import org.apache.hadoop.io.Text;
  * Only returns a single key-value pair so that the map() can run.
  */
 class BspRecordReader extends RecordReader<Text, Text> {
-
-    private static final Text ONLY_KEY = new Text("only key");
-    private static final Text ONLY_VALUE = new Text("only value");
-
-    /** Has the one record been seen? */
-    private boolean seenRecord = false;
-
-    @Override
-    public void close() throws IOException {
-        return;
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-        return (seenRecord ? 1f : 0f);
-    }
-
-    @Override
-    public Text getCurrentKey() throws IOException, InterruptedException {
-        return ONLY_KEY;
-    }
-
-    @Override
-    public Text getCurrentValue() throws IOException, InterruptedException {
-        return ONLY_VALUE;
-    }
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-	return (seenRecord ? false : (seenRecord = true));
+  /** Singular key object */
+  private static final Text ONLY_KEY = new Text("only key");
+  /** Single value object */
+  private static final Text ONLY_VALUE = new Text("only value");
+
+  /** Has the one record been seen? */
+  private boolean seenRecord = false;
+
+  @Override
+  public void close() throws IOException {
+    return;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return seenRecord ? 1f : 0f;
+  }
+
+  @Override
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    return ONLY_KEY;
+  }
+
+  @Override
+  public Text getCurrentValue() throws IOException, InterruptedException {
+    return ONLY_VALUE;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!seenRecord) {
+      seenRecord = true;
+      return true;
     }
+    return false;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/BspRecordWriter.java Thu Feb 16 22:12:31 2012
@@ -31,17 +31,17 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 public class BspRecordWriter extends RecordWriter<Text, Text> {
 
-    @Override
-    public void close(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        // Do nothing
-    }
+  @Override
+  public void close(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    // Do nothing
+  }
 
-    @Override
-    public void write(Text key, Text value)
-            throws IOException, InterruptedException {
-        throw new IOException("write: Cannot write with " +
-                              getClass().getName() +
-                              ".  Should never be called");
-    }
+  @Override
+  public void write(Text key, Text value)
+    throws IOException, InterruptedException {
+    throw new IOException("write: Cannot write with " +
+        getClass().getName() +
+        ".  Should never be called");
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Thu Feb 16 22:12:31 2012
@@ -26,45 +26,48 @@ import java.io.IOException;
 /**
  * Basic service interface shared by both {@link CentralizedServiceMaster} and
  * {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface CentralizedService<I extends WritableComparable,
-                                    V extends Writable,
-                                    E extends Writable,
-                                    M extends Writable> {
-    /**
-     * Setup (must be called prior to any other function)
-     */
-    void setup();
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Setup (must be called prior to any other function)
+   */
+  void setup();
 
-    /**
-     * Get the current global superstep of the application to work on.
-     *
-     * @return global superstep (begins at INPUT_SUPERSTEP)
-     */
-    long getSuperstep();
+  /**
+   * Get the current global superstep of the application to work on.
+   *
+   * @return global superstep (begins at INPUT_SUPERSTEP)
+   */
+  long getSuperstep();
 
-    /**
-     * Get the restarted superstep
-     *
-     * @return -1 if not manually restarted, otherwise the superstep id
-     */
-    long getRestartedSuperstep();
+  /**
+   * Get the restarted superstep
+   *
+   * @return -1 if not manually restarted, otherwise the superstep id
+   */
+  long getRestartedSuperstep();
 
-    /**
-     * Given a superstep, should it be checkpointed based on the
-     * checkpoint frequency?
-     *
-     * @param superstep superstep to check against frequency
-     * @return true if checkpoint frequency met or superstep is 1.
-     */
-    boolean checkpointFrequencyMet(long superstep);
+  /**
+   * Given a superstep, should it be checkpointed based on the
+   * checkpoint frequency?
+   *
+   * @param superstep superstep to check against frequency
+   * @return true if checkpoint frequency met or superstep is 1.
+   */
+  boolean checkpointFrequencyMet(long superstep);
 
-    /**
-     * Clean up the service (no calls may be issued after this)
-     *
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void cleanup() throws IOException, InterruptedException;
+  /**
+   * Clean up the service (no calls may be issued after this)
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void cleanup() throws IOException, InterruptedException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Feb 16 22:12:31 2012
@@ -22,70 +22,73 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.zookeeper.KeeperException;
 
 /**
  * At most, there will be one active master at a time, but many threads can
  * be trying to be the active master.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceMaster<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends CentralizedService<I, V, E, M> {
-    /**
-     * Become the master.
-     * @return true if became the master, false if the application is done.
-     */
-    boolean becomeMaster();
-
-    /**
-     * Create the {@link InputSplit} objects from the index range based on the
-     * user-defined VertexInputFormat.  The {@link InputSplit} objects will
-     * processed by the workers later on during the INPUT_SUPERSTEP.
-     *
-     * @return Number of partitions. Returns -1 on failure to create
-     *         valid input splits.
-     */
-    int createInputSplits();
-
-    /**
-     * Master coordinates the superstep
-     *
-     * @return State of the application as a result of this superstep
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    SuperstepState coordinateSuperstep()
-        throws KeeperException, InterruptedException;
-
-    /**
-     * Master can decide to restart from the last good checkpoint if a
-     * worker fails during a superstep.
-     *
-     * @param checkpoint Checkpoint to restart from
-     */
-    void restartFromCheckpoint(long checkpoint);
-
-    /**
-     * Get the last known good checkpoint
-     * @throws IOException
-     */
-    long getLastGoodCheckpoint() throws IOException;
-
-    /**
-     * If the master decides that this job doesn't have the resources to
-     * continue, it can fail the job.  It can also designate what to do next.
-     * Typically this is mainly informative.
-     *
-     * @param state
-     * @param applicationAttempt attempt to start on
-     * @param desiredSuperstep Superstep to restart from (if applicable)
-     */
-    void setJobState(ApplicationState state,
-                     long applicationAttempt,
-                     long desiredSuperstep);
+  I extends WritableComparable, V extends Writable, E extends Writable,
+  M extends Writable> extends CentralizedService<I, V, E, M> {
+  /**
+   * Become the master.
+   * @return true if became the master, false if the application is done.
+   */
+  boolean becomeMaster();
+
+  /**
+   * Create the {@link InputSplit} objects from the index range based on the
+   * user-defined VertexInputFormat.  The {@link InputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of partitions. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createInputSplits();
+
+  /**
+   * Master coordinates the superstep
+   *
+   * @return State of the application as a result of this superstep
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  SuperstepState coordinateSuperstep()
+    throws KeeperException, InterruptedException;
+
+  /**
+   * Master can decide to restart from the last good checkpoint if a
+   * worker fails during a superstep.
+   *
+   * @param checkpoint Checkpoint to restart from
+   */
+  void restartFromCheckpoint(long checkpoint);
+
+  /**
+   * Get the last known good checkpoint
+   *
+   * @return Last good superstep number
+   * @throws IOException
+   */
+  long getLastGoodCheckpoint() throws IOException;
+
+  /**
+   * If the master decides that this job doesn't have the resources to
+   * continue, it can fail the job.  It can also designate what to do next.
+   * Typically this is mainly informative.
+   *
+   * @param state State of the application.
+   * @param applicationAttempt Attempt to start on
+   * @param desiredSuperstep Superstep to restart from (if applicable)
+   */
+  void setJobState(ApplicationState state,
+    long applicationAttempt,
+    long desiredSuperstep);
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Feb 16 22:12:31 2012
@@ -31,141 +31,143 @@ import org.apache.giraph.graph.GraphMapp
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.WorkerContext;
 
 /**
  * All workers should have access to this centralized service to
  * execute the following methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface CentralizedServiceWorker<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends CentralizedService<I, V, E, M>, AggregatorUsage {
-    /**
-     * Get the worker information
-     *
-     * @return Worker information
-     */
-    WorkerInfo getWorkerInfo();
-
-   /**
-    *
-    * @return worker's WorkerContext
-    */
-    WorkerContext getWorkerContext();
-
-    /**
-     * Get a map of the partition id to the partition for this worker.
-     * The partitions contain the vertices for
-     * this worker and can be used to run compute() for the vertices or do
-     * checkpointing.
-     *
-     * @return List of partitions that this worker owns.
-     */
-    Map<Integer, Partition<I, V, E, M>> getPartitionMap();
-
-    /**
-     * Get a collection of all the partition owners.
-     *
-     * @return Collection of all the partition owners.
-     */
-    Collection<? extends PartitionOwner> getPartitionOwners();
-
-    /**
-     *  Both the vertices and the messages need to be checkpointed in order
-     *  for them to be used.  This is done after all messages have been
-     *  delivered, but prior to a superstep starting.
-     */
-    void storeCheckpoint() throws IOException;
-
-    /**
-     * Load the vertices, edges, messages from the beginning of a superstep.
-     * Will load the vertex partitions as designated by the master and set the
-     * appropriate superstep.
-     *
-     * @param superstep which checkpoint to use
-     * @throws IOException
-     */
-    void loadCheckpoint(long superstep) throws IOException;
-
-    /**
-     * Take all steps prior to actually beginning the computation of a
-     * superstep.
-     *
-     * @return Collection of all the partition owners from the master for this
-     *         superstep.
-     */
-    Collection<? extends PartitionOwner> startSuperstep();
-
-    /**
-     * Worker is done with its portion of the superstep.  Report the
-     * worker level statistics after the computation.
-     *
-     * @param partitionStatsList All the partition stats for this worker
-     * @return true if this is the last superstep, false otherwise
-     */
-    boolean finishSuperstep(List<PartitionStats> partitionStatsList);
-    /**
-     * Get the partition that a vertex index would belong to
-     *
-     * @param vertexIndex Index of the vertex that is used to find the correct
-     *        partition.
-     * @return Correct partition if exists on this worker, null otherwise.
-     */
-    public Partition<I, V, E, M> getPartition(I vertexIndex);
-
-    /**
-     * Every client will need to get a partition owner from a vertex id so that
-     * they know which worker to sent the request to.
-     *
-     * @param vertexIndex Vertex index to look for
-     * @return PartitionOnwer that should contain this vertex if it exists
-     */
-    PartitionOwner getVertexPartitionOwner(I vertexIndex);
-
-    /**
-     * Look up a vertex on a worker given its vertex index.
-     *
-     * @param vertexIndex Vertex index to look for
-     * @return Vertex if it exists on this worker.
-     */
-    BasicVertex<I, V, E, M> getVertex(I vertexIndex);
-
-    /**
-     * If desired by the user, vertex partitions are redistributed among
-     * workers according to the chosen {@link WorkerGraphPartitioner}.
-     *
-     * @param masterSetPartitionOwners Partition owner info passed from the
-     *        master.
-     */
-    void exchangeVertexPartitions(
-        Collection<? extends PartitionOwner> masterSetPartitionOwners);
-
-    /**
-     * Assign messages to a vertex (bypasses package-private access to
-     * setMessages() for internal classes).
-     *
-     * @param vertex Vertex (owned by worker)
-     * @param messageIterator Messages to assign to the vertex
-     */
-    void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
-                                Iterable<M> messageIterator);
-
-    /**
-     * Get the GraphMapper that this service is using.  Vertices need to know
-     * this.
-     *
-     * @return BspMapper
-     */
-    GraphMapper<I, V, E, M> getGraphMapper();
-
-    /**
-     * Operations that will be called if there is a failure by a worker.
-     */
-    void failureCleanup();
+public interface CentralizedServiceWorker<I extends WritableComparable,
+  V extends Writable, E extends Writable, M extends Writable>
+  extends CentralizedService<I, V, E, M>, AggregatorUsage {
+  /**
+   * Get the worker information
+   *
+   * @return Worker information
+   */
+  WorkerInfo getWorkerInfo();
+
+  /**
+   *
+   * @return worker's WorkerContext
+   */
+  WorkerContext getWorkerContext();
+
+  /**
+   * Get a map of the partition id to the partition for this worker.
+   * The partitions contain the vertices for
+   * this worker and can be used to run compute() for the vertices or do
+   * checkpointing.
+   *
+   * @return List of partitions that this worker owns.
+   */
+  Map<Integer, Partition<I, V, E, M>> getPartitionMap();
+
+  /**
+   * Get a collection of all the partition owners.
+   *
+   * @return Collection of all the partition owners.
+   */
+  Collection<? extends PartitionOwner> getPartitionOwners();
+
+  /**
+   *  Both the vertices and the messages need to be checkpointed in order
+   *  for them to be used.  This is done after all messages have been
+   *  delivered, but prior to a superstep starting.
+   */
+  void storeCheckpoint() throws IOException;
+
+  /**
+   * Load the vertices, edges, messages from the beginning of a superstep.
+   * Will load the vertex partitions as designated by the master and set the
+   * appropriate superstep.
+   *
+   * @param superstep which checkpoint to use
+   * @throws IOException
+   */
+  void loadCheckpoint(long superstep) throws IOException;
+
+  /**
+   * Take all steps prior to actually beginning the computation of a
+   * superstep.
+   *
+   * @return Collection of all the partition owners from the master for this
+   *         superstep.
+   */
+  Collection<? extends PartitionOwner> startSuperstep();
+
+  /**
+   * Worker is done with its portion of the superstep.  Report the
+   * worker level statistics after the computation.
+   *
+   * @param partitionStatsList All the partition stats for this worker
+   * @return true if this is the last superstep, false otherwise
+   */
+  boolean finishSuperstep(List<PartitionStats> partitionStatsList);
+
+  /**
+   * Get the partition that a vertex index would belong to
+   *
+   * @param vertexIndex Index of the vertex that is used to find the correct
+   *        partition.
+   * @return Correct partition if exists on this worker, null otherwise.
+   */
+  Partition<I, V, E, M> getPartition(I vertexIndex);
+
+  /**
+   * Every client will need to get a partition owner from a vertex id so that
+   * they know which worker to sent the request to.
+   *
+   * @param vertexIndex Vertex index to look for
+   * @return PartitionOnwer that should contain this vertex if it exists
+   */
+  PartitionOwner getVertexPartitionOwner(I vertexIndex);
+
+  /**
+   * Look up a vertex on a worker given its vertex index.
+   *
+   * @param vertexIndex Vertex index to look for
+   * @return Vertex if it exists on this worker.
+   */
+  BasicVertex<I, V, E, M> getVertex(I vertexIndex);
+
+  /**
+   * If desired by the user, vertex partitions are redistributed among
+   * workers according to the chosen {@link WorkerGraphPartitioner}.
+   *
+   * @param masterSetPartitionOwners Partition owner info passed from the
+   *        master.
+   */
+  void exchangeVertexPartitions(
+      Collection<? extends PartitionOwner> masterSetPartitionOwners);
+
+  /**
+   * Assign messages to a vertex (bypasses package-private access to
+   * setMessages() for internal classes).
+   *
+   * @param vertex Vertex (owned by worker)
+   * @param messageIterator Messages to assign to the vertex
+   */
+  void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
+      Iterable<M> messageIterator);
+
+  /**
+   * Get the GraphMapper that this service is using.  Vertices need to know
+   * this.
+   *
+   * @return BspMapper
+   */
+  GraphMapper<I, V, E, M> getGraphMapper();
+
+  /**
+   * Operations that will be called if there is a failure by a worker.
+   */
+  void failureCleanup();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java Thu Feb 16 22:12:31 2012
@@ -30,35 +30,35 @@ import org.apache.hadoop.mapreduce.TaskA
  * FileOutputCommitter.
  */
 public class ImmutableOutputCommitter extends OutputCommitter {
-    @Override
-    public void abortTask(TaskAttemptContext context) throws IOException {
-    }
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+  }
 
-    @Override
-    public void commitTask(TaskAttemptContext context) throws IOException {
-    }
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+  }
 
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext context)
-            throws IOException {
-        return false;
-    }
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context)
+    throws IOException {
+    return false;
+  }
 
-    @Override
-    public void setupJob(JobContext context) throws IOException {
-    }
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+  }
 
-    @Override
-    public void setupTask(TaskAttemptContext context) throws IOException {
-    }
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+  }
 
-    /*if[HADOOP_NON_SECURE]
+  /*if[HADOOP_NON_SECURE]
     @Override
     public void cleanupJob(JobContext jobContext)  throws IOException {
     }
     else[HADOOP_NON_SECURE]*/
-    @Override
-    /*end[HADOOP_NON_SECURE]*/
-    public void commitJob(JobContext jobContext) throws IOException {
-    }
+  @Override
+  /*end[HADOOP_NON_SECURE]*/
+  public void commitJob(JobContext jobContext) throws IOException {
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/SuperstepState.java Thu Feb 16 22:12:31 2012
@@ -22,8 +22,12 @@ package org.apache.giraph.bsp;
  * State of a coordinated superstep
  */
 public enum SuperstepState {
-    INITIAL, ///< Nothing happened yet
-    WORKER_FAILURE, ///< A worker died during this superstep
-    THIS_SUPERSTEP_DONE, ///< This superstep completed correctly
-    ALL_SUPERSTEPS_DONE, ///< All supersteps are complete
+  /** Nothing happened yet */
+  INITIAL,
+  /** A worker died during this superstep */
+  WORKER_FAILURE,
+  /** This superstep completed correctly */
+  THIS_SUPERSTEP_DONE,
+  /** All supersteps are complete */
+  ALL_SUPERSTEPS_DONE,
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of generic bulk synchronous processing objects.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.bsp;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java Thu Feb 16 22:12:31 2012
@@ -30,84 +30,96 @@ import org.apache.hadoop.util.Reflection
 
 /**
  * A Writable for ListArray containing instances of a class.
+ *
+ * @param <M> Message data
  */
 public abstract class ArrayListWritable<M extends Writable> extends ArrayList<M>
-          implements Writable, Configurable {
-    /** Used for instantiation */
-    private Class<M> refClass = null;
-    /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 1L;
-    /** Configuration */
-    private Configuration conf;
-
-    /**
-     * Using the default constructor requires that the user implement
-     * setClass(), guaranteed to be invoked prior to instantiation in
-     * readFields()
-     */
-    public ArrayListWritable() {
-    }
-
-    public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
-        super(arrayListWritable);
-    }
-
-    /**
-     * This constructor allows setting the refClass during construction.
-     *
-     * @param refClass internal type class
-     */
-    public ArrayListWritable(Class<M> refClass) {
-        super();
-        this.refClass = refClass;
-    }
-
-    /**
-     * This is a one-time operation to set the class type
-     *
-     * @param refClass internal type class
-     */
-    public void setClass(Class<M> refClass) {
-        if (this.refClass != null) {
-            throw new RuntimeException(
-                "setClass: refClass is already set to " +
-                this.refClass.getName());
-        }
-        this.refClass = refClass;
-    }
-
-    /**
-     * Subclasses must set the class type appropriately and can use
-     * setClass(Class<M> refClass) to do it.
-     */
-    public abstract void setClass();
-
-    public void readFields(DataInput in) throws IOException {
-        if (this.refClass == null) {
-            setClass();
-        }
-        int numValues = in.readInt();            // read number of values
-        ensureCapacity(numValues);
-        for (int i = 0; i < numValues; i++) {
-            M value = ReflectionUtils.newInstance(refClass, conf);
-            value.readFields(in);                // read a value
-            add(value);                          // store it in values
-        }
-    }
-
-    public void write(DataOutput out) throws IOException {
-        int numValues = size();
-        out.writeInt(numValues);                 // write number of values
-        for (int i = 0; i < numValues; i++) {
-            get(i).write(out);
-        }
-    }
-
-    public final Configuration getConf() {
-        return conf;
-    }
-
-    public final void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+  implements Writable, Configurable {
+  /** Defining a layout version for a serializable class. */
+  private static final long serialVersionUID = 1L;
+  /** Used for instantiation */
+  private Class<M> refClass = null;
+
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Using the default constructor requires that the user implement
+   * setClass(), guaranteed to be invoked prior to instantiation in
+   * readFields()
+   */
+  public ArrayListWritable() {
+  }
+
+  /**
+   * Constructor with another {@link ArrayListWritable}.
+   *
+   * @param arrayListWritable Array list to be used internally.
+   */
+  public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
+    super(arrayListWritable);
+  }
+
+  /**
+   * This constructor allows setting the refClass during construction.
+   *
+   * @param refClass internal type class
+   */
+  public ArrayListWritable(Class<M> refClass) {
+    super();
+    this.refClass = refClass;
+  }
+
+  /**
+   * This is a one-time operation to set the class type
+   *
+   * @param refClass internal type class
+   */
+  public void setClass(Class<M> refClass) {
+    if (this.refClass != null) {
+      throw new RuntimeException(
+          "setClass: refClass is already set to " +
+              this.refClass.getName());
+    }
+    this.refClass = refClass;
+  }
+
+  /**
+   * Subclasses must set the class type appropriately and can use
+   * setClass(Class<M> refClass) to do it.
+   */
+  public abstract void setClass();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (this.refClass == null) {
+      setClass();
+    }
+    int numValues = in.readInt();            // read number of values
+    ensureCapacity(numValues);
+    for (int i = 0; i < numValues; i++) {
+      M value = ReflectionUtils.newInstance(refClass, conf);
+      value.readFields(in);                // read a value
+      add(value);                          // store it in values
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int numValues = size();
+    out.writeInt(numValues);                 // write number of values
+    for (int i = 0; i < numValues; i++) {
+      get(i).write(out);
+    }
+  }
+
+  @Override
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }