You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Edward J. Yoon" <ed...@apache.org> on 2014/01/23 08:40:15 UTC

Re: svn commit: r1556691 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ graph/src/test/java/org/ap...

This patch is totally wrote wrongly. I'll revert this commit and
re-open the HAMA-838.

Please assign me.

On Thu, Jan 9, 2014 at 10:10 AM,  <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Thu Jan  9 01:10:59 2014
> New Revision: 1556691
>
> URL: http://svn.apache.org/r1556691
> Log:
> HAMA-838: Refactor aggregators
>
> Added:
>     hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
> Modified:
>     hama/trunk/CHANGES.txt
>     hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
>     hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
>     hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
>     hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
>
> Modified: hama/trunk/CHANGES.txt
> URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/CHANGES.txt (original)
> +++ hama/trunk/CHANGES.txt Thu Jan  9 01:10:59 2014
> @@ -20,6 +20,7 @@ Release 0.7.0 (unreleased changes)
>
>    IMPROVEMENTS
>
> +   HAMA-838: Refactor aggregators (Anastasis Andronidis)
>     HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)
>     HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes (Martin Illecker)
>     HAMA-808: Hama Pipes Testcase (Martin Illecker)
>
> Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
> URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
> +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Thu Jan  9 01:10:59 2014
> @@ -40,6 +40,7 @@ import org.apache.hama.graph.VertexInput
>   * Real pagerank with dangling node contribution.
>   */
>  public class PageRank {
> +  private static final String AVG_AGGREGATOR = "average.aggregator";
>
>    public static class PageRankVertex extends
>        Vertex<Text, NullWritable, DoubleWritable> {
> @@ -63,29 +64,29 @@ public class PageRank {
>      public void compute(Iterable<DoubleWritable> messages) throws IOException {
>        // initialize this vertex to 1 / count of global vertices in this graph
>        if (this.getSuperstepCount() == 0) {
> -        this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
> +        setValue(new DoubleWritable(1.0 / this.getNumVertices()));
>        } else if (this.getSuperstepCount() >= 1) {
>          double sum = 0;
>          for (DoubleWritable msg : messages) {
>            sum += msg.get();
>          }
>          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
> -        this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
> +        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
> +        aggregate(AVG_AGGREGATOR, this.getValue());
>        }
>
>        // if we have not reached our global error yet, then proceed.
> -      DoubleWritable globalError = getLastAggregatedValue(0);
> +      DoubleWritable globalError = (DoubleWritable) getAggregatedValue(AVG_AGGREGATOR);
> +
>        if (globalError != null && this.getSuperstepCount() > 2
>            && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
>          voteToHalt();
> -        return;
> +      } else {
> +        // in each superstep we are going to send a new rank to our neighbours
> +        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
> +            / this.getEdges().size()));
>        }
> -
> -      // in each superstep we are going to send a new rank to our neighbours
> -      sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
> -          / this.getEdges().size()));
>      }
> -
>    }
>
>    public static class PagerankSeqReader
> @@ -126,7 +127,7 @@ public class PageRank {
>      }
>
>      // error
> -    pageJob.setAggregatorClass(AverageAggregator.class);
> +    pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
>
>      // Vertex reader
>      pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
>
> Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
> URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java?rev=1556691&view=auto
> ==============================================================================
> --- hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java (added)
> +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java Thu Jan  9 01:10:59 2014
> @@ -0,0 +1,208 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hama.examples;
> +
> +import java.io.BufferedReader;
> +import java.io.IOException;
> +import java.io.InputStreamReader;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.fs.FileStatus;
> +import org.apache.hadoop.fs.FileSystem;
> +import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.io.DoubleWritable;
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.io.NullWritable;
> +import org.apache.hadoop.io.Text;
> +import org.apache.hama.HamaConfiguration;
> +import org.apache.hama.bsp.HashPartitioner;
> +import org.apache.hama.bsp.TextInputFormat;
> +import org.apache.hama.bsp.TextOutputFormat;
> +import org.apache.hama.graph.GraphJob;
> +import org.apache.hama.graph.SumAggregator;
> +import org.apache.hama.graph.Vertex;
> +import org.apache.hama.graph.VertexInputReader;
> +import org.junit.Test;
> +
> +/**
> + * Unit test for aggregators
> + */
> +public class AggregatorsTest extends TestCase {
> +  private static String OUTPUT = "/tmp/page-out";
> +  private Configuration conf = new HamaConfiguration();
> +  private FileSystem fs;
> +
> +  private void deleteTempDirs() {
> +    try {
> +      if (fs.exists(new Path(OUTPUT)))
> +        fs.delete(new Path(OUTPUT), true);
> +    } catch (IOException e) {
> +      e.printStackTrace();
> +    }
> +  }
> +
> +  private void verifyResult() throws IOException {
> +    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
> +    for (FileStatus fts : globStatus) {
> +      BufferedReader reader = new BufferedReader(new InputStreamReader(
> +          fs.open(fts.getPath())));
> +      String line = null;
> +
> +      String[] results = { "6.0", "2.0", "3.0", "4.0" };
> +
> +      for (int i = 1; i < 5; i++) {
> +        line = reader.readLine();
> +        String[] split = line.split("\t");
> +        assertTrue(split[0].equals(String.valueOf(i)));
> +        assertTrue(split[1].equals(results[i - 1]));
> +        System.out.println(split[0] + " : " + split[1]);
> +      }
> +    }
> +  }
> +
> +  @Override
> +  protected void setUp() throws Exception {
> +    super.setUp();
> +    fs = FileSystem.get(conf);
> +  }
> +
> +  @Test
> +  public void test() throws IOException, InterruptedException,
> +      ClassNotFoundException {
> +    try {
> +      CustomAggregators
> +          .main(new String[] { "src/test/resources/dg.txt", OUTPUT });
> +      verifyResult();
> +    } finally {
> +      deleteTempDirs();
> +    }
> +  }
> +
> +  static class CustomAggregators {
> +
> +    public static class GraphTextReader
> +        extends
> +        VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
> +
> +      @Override
> +      public boolean parseVertex(LongWritable key, Text value,
> +          Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
> +
> +        vertex.setVertexID(value);
> +        vertex
> +            .setValue(new DoubleWritable(Double.parseDouble(value.toString())));
> +
> +        return true;
> +      }
> +    }
> +
> +    public static class GraphVertex extends
> +        Vertex<Text, NullWritable, DoubleWritable> {
> +
> +      @Override
> +      public void compute(Iterable<DoubleWritable> msgs) throws IOException {
> +
> +        // We will send 2 custom messages on superstep 2 and 4 only!
> +        if (this.getSuperstepCount() == 2) {
> +          this.aggregate("mySum", new DoubleWritable(1.0));
> +        }
> +
> +        if (this.getSuperstepCount() == 4) {
> +          this.aggregate("mySum", new DoubleWritable(2.0));
> +        }
> +
> +        // We will get the first aggrigation result from our custom aggregator
> +        // on superstep 3,
> +        // and we will store the result only in vertex 4.
> +        // This vertex should have value = 4.
> +        if (this.getSuperstepCount() == 3
> +            && this.getVertexID().toString().equals("4")) {
> +          this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
> +        }
> +
> +        // By setting vertex number 3 to halt, we will see a change on the
> +        // aggregating results
> +        // in both custom and global aggregators.
> +        // This vertex should have value = 3.
> +        if (this.getSuperstepCount() == 3
> +            && this.getVertexID().toString().equals("3")) {
> +          this.voteToHalt();
> +        }
> +
> +        // This vertex should have value = 6 (3 vertices are working x 2 the
> +        // custom value)
> +        if (this.getSuperstepCount() == 5
> +            && this.getVertexID().toString().equals("1")) {
> +          this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
> +        }
> +
> +        if (this.getSuperstepCount() == 6) {
> +          this.voteToHalt();
> +        }
> +      }
> +    }
> +
> +    public static void main(String[] args) throws IOException,
> +        InterruptedException, ClassNotFoundException {
> +      if (args.length != 2) {
> +        printUsage();
> +      }
> +      HamaConfiguration conf = new HamaConfiguration(new Configuration());
> +      GraphJob graphJob = createJob(args, conf);
> +      long startTime = System.currentTimeMillis();
> +      if (graphJob.waitForCompletion(true)) {
> +        System.out.println("Job Finished in "
> +            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
> +      }
> +    }
> +
> +    private static void printUsage() {
> +      System.out.println("Usage: <input> <output>");
> +      System.exit(-1);
> +    }
> +
> +    private static GraphJob createJob(String[] args, HamaConfiguration conf)
> +        throws IOException {
> +      GraphJob graphJob = new GraphJob(conf, CustomAggregators.class);
> +      graphJob.setJobName("Custom Aggregators");
> +      graphJob.setVertexClass(GraphVertex.class);
> +
> +      graphJob.registerAggregator("mySum", SumAggregator.class);
> +
> +      graphJob.setInputPath(new Path(args[0]));
> +      graphJob.setOutputPath(new Path(args[1]));
> +
> +      graphJob.setVertexIDClass(Text.class);
> +      graphJob.setVertexValueClass(DoubleWritable.class);
> +      graphJob.setEdgeValueClass(NullWritable.class);
> +
> +      graphJob.setInputFormat(TextInputFormat.class);
> +
> +      graphJob.setVertexInputReaderClass(GraphTextReader.class);
> +      graphJob.setPartitioner(HashPartitioner.class);
> +
> +      graphJob.setOutputFormat(TextOutputFormat.class);
> +      graphJob.setOutputKeyClass(Text.class);
> +      graphJob.setOutputValueClass(DoubleWritable.class);
> +
> +      return graphJob;
> +    }
> +  }
> +}
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan  9 01:10:59 2014
> @@ -17,22 +17,20 @@
>   */
>  package org.apache.hama.graph;
>
> -import java.io.IOException;
> +import java.util.HashMap;
>  import java.util.HashSet;
> +import java.util.Map;
> +import java.util.Map.Entry;
>  import java.util.Set;
>
>  import org.apache.hadoop.conf.Configuration;
>  import org.apache.hadoop.io.IntWritable;
> -import org.apache.hadoop.io.LongWritable;
>  import org.apache.hadoop.io.MapWritable;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.io.Writable;
>  import org.apache.hadoop.io.WritableComparable;
>  import org.apache.hadoop.util.ReflectionUtils;
>  import org.apache.hama.bsp.BSPPeer;
> -import org.apache.hama.bsp.sync.SyncException;
> -
> -import com.google.common.base.Preconditions;
>
>  /**
>   * Runner class to do the tasks that need to be done if aggregation was
> @@ -41,117 +39,31 @@ import com.google.common.base.Preconditi
>   */
>  @SuppressWarnings("rawtypes")
>  public final class AggregationRunner<V extends WritableComparable, E extends Writable, M extends Writable> {
> +  private Map<String, Aggregator> Aggregators;
> +  private Map<String, Writable> aggregatorResults;
> +  private Set<String> aggregatorsUsed;
>
> -  // multiple aggregator arrays
> -  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
> -  private Set<Integer> skipAggregators;
> -  private Writable[] globalAggregatorResult;
> -  private IntWritable[] globalAggregatorIncrement;
> -  private boolean[] isAbstractAggregator;
> -  private String[] aggregatorClassNames;
> -  private Text[] aggregatorValueFlag;
> -  private Text[] aggregatorIncrementFlag;
> -  // aggregator on the master side
> -  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
> -
> -  private boolean enabled = false;
>    private Configuration conf;
> +  private Text textWrap = new Text();
>
> -  @SuppressWarnings("unchecked")
>    public void setupAggregators(
>        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
>      this.conf = peer.getConfiguration();
> -    String aggregatorClasses = peer.getConfiguration().get(
> +
> +    this.aggregatorResults = new HashMap<String, Writable>(4);
> +    this.Aggregators = new HashMap<String, Aggregator>(4);
> +    this.aggregatorsUsed = new HashSet<String>(4);
> +
> +    String customAggregatorClasses = peer.getConfiguration().get(
>          GraphJob.AGGREGATOR_CLASS_ATTR);
> -    this.skipAggregators = new HashSet<Integer>();
> -    if (aggregatorClasses != null) {
> -      enabled = true;
> -      aggregatorClassNames = aggregatorClasses.split(";");
> -      // init to the split size
> -      aggregators = new Aggregator[aggregatorClassNames.length];
> -      globalAggregatorResult = new Writable[aggregatorClassNames.length];
> -      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
> -      isAbstractAggregator = new boolean[aggregatorClassNames.length];
> -      aggregatorValueFlag = new Text[aggregatorClassNames.length];
> -      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
> -      if (GraphJobRunner.isMasterTask(peer)) {
> -        masterAggregator = new Aggregator[aggregatorClassNames.length];
> -      }
> -      for (int i = 0; i < aggregatorClassNames.length; i++) {
> -        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
> -        aggregatorValueFlag[i] = new Text(
> -            GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
> -        aggregatorIncrementFlag[i] = new Text(
> -            GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
> -        if (aggregators[i] instanceof AbstractAggregator) {
> -          isAbstractAggregator[i] = true;
> -        }
> -        if (GraphJobRunner.isMasterTask(peer)) {
> -          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
> -        }
> -      }
> -    }
> -  }
>
> -  /**
> -   * Runs the aggregators by sending their values to the master task.
> -   * @param changedVertexCnt
> -   */
> -  public void sendAggregatorValues(
> -      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
> -      int activeVertices, int changedVertexCnt) throws IOException {
> -    // send msgCounts to the master task
> -    MapWritable updatedCnt = new MapWritable();
> -    updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
> -        activeVertices));
> -    // send total number of vertices changes
> -    updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
> -        changedVertexCnt));
> -    // also send aggregated values to the master
> -    if (aggregators != null) {
> -      for (int i = 0; i < this.aggregators.length; i++) {
> -        if (!this.skipAggregators.contains(i)) {
> -          updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
> -          if (isAbstractAggregator[i]) {
> -            updatedCnt.put(aggregatorIncrementFlag[i],
> -                ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
> -                    .getTimesAggregated());
> -          }
> -        }
> -      }
> -      for (int i = 0; i < aggregators.length; i++) {
> -        if (!this.skipAggregators.contains(i)) {
> -          // now create new aggregators for the next iteration
> -          aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
> -          if (GraphJobRunner.isMasterTask(peer)) {
> -            masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
> -          }
> -        }
> -      }
> -    }
> -    peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
> -        updatedCnt));
> -  }
> +    if (customAggregatorClasses != null) {
> +      String[] custAggrs = customAggregatorClasses.split(";");
>
> -  /**
> -   * Aggregates the last value before computation and the value after the
> -   * computation.
> -   *
> -   * @param lastValue the value before compute().
> -   * @param v the vertex.
> -   */
> -  public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
> -    if (isEnabled()) {
> -      for (int i = 0; i < this.aggregators.length; i++) {
> -        if (!this.skipAggregators.contains(i)) {
> -          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
> -          aggregator.aggregate(v, v.getValue());
> -          if (isAbstractAggregator[i]) {
> -            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
> -            intern.aggregate(v, lastValue, v.getValue());
> -            intern.aggregateInternal();
> -          }
> -        }
> +      for (String aggr : custAggrs) {
> +        String[] Name_AggrClass = aggr.split("@", 2);
> +        this.Aggregators.put(Name_AggrClass[0],
> +            getNewAggregator(Name_AggrClass[1]));
>        }
>      }
>    }
> @@ -161,26 +73,20 @@ public final class AggregationRunner<V e
>     * peer and updates the given map accordingly.
>     */
>    public void doMasterAggregation(MapWritable updatedCnt) {
> -    if (isEnabled()) {
> -      // work through the master aggregators
> -      for (int i = 0; i < masterAggregator.length; i++) {
> -        if (!this.skipAggregators.contains(i)) {
> -          Writable lastAggregatedValue = masterAggregator[i].getValue();
> -          if (isAbstractAggregator[i]) {
> -            final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
> -            final Writable finalizeAggregation = intern.finalizeAggregation();
> -            if (intern.finalizeAggregation() != null) {
> -              lastAggregatedValue = finalizeAggregation;
> -            }
> -            // this count is usually the times of active
> -            // vertices in the graph
> -            updatedCnt.put(aggregatorIncrementFlag[i],
> -                intern.getTimesAggregated());
> -          }
> -          updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
> -        }
> -      }
> +    // Get results only from used aggregators.
> +    for (String name : this.aggregatorsUsed) {
> +      updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
> +    }
> +    this.aggregatorsUsed.clear();
> +
> +    // Reset all custom aggregators. TODO: Change the aggregation interface to
> +    // include clean() method.
> +    Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
> +    for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
> +      String aggClass = e.getValue().getClass().getName();
> +      tmp.put(e.getKey(), getNewAggregator(aggClass));
>      }
> +    this.Aggregators = tmp;
>    }
>
>    /**
> @@ -190,13 +96,20 @@ public final class AggregationRunner<V e
>     *         we haven't seen any messages anymore.
>     */
>    public boolean receiveAggregatedValues(MapWritable updatedValues,
> -      long iteration) throws IOException, SyncException, InterruptedException {
> -    // map is the first value that is in the queue
> -    for (int i = 0; i < aggregators.length; i++) {
> -        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
> -        globalAggregatorIncrement[i] = (IntWritable) updatedValues
> -            .get(aggregatorIncrementFlag[i]);
> +      long iteration) {
> +    // In every superstep, we create a new result collection as we don't save
> +    // history.
> +    // If a value is missing, the user will take a null result. By creating a
> +    // new collection
> +    // every time, we can reduce the network cost (because we send less
> +    // information by skipping null values)
> +    // But we are losing in GC.
> +    this.aggregatorResults = new HashMap<String, Writable>(4);
> +    for (String name : this.Aggregators.keySet()) {
> +      this.textWrap.set(name);
> +      this.aggregatorResults.put(name, updatedValues.get(textWrap));
>      }
> +
>      IntWritable count = (IntWritable) updatedValues
>          .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
>      if (count != null && count.get() == Integer.MIN_VALUE) {
> @@ -206,47 +119,16 @@ public final class AggregationRunner<V e
>    }
>
>    /**
> -   * @return true if aggregators were defined. Normally used by the internal
> -   *         stateful methods, outside shouldn't use it too extensively.
> +   * Method to let the custom master aggregator read messages from peers and
> +   * aggregate a value.
>     */
> -  public boolean isEnabled() {
> -    return enabled;
> -  }
> -
> -  /**
> -   * Method to let the master read messages from peers and aggregate a value.
> -   */
> -  public void masterReadAggregatedValue(Text textIndex, M value) {
> -    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
> -    masterAggregator[index].aggregate(null, value);
> -  }
> -
> -  /**
> -   * Method to let the master read messages from peers and aggregate the
> -   * incremental value.
> -   */
> -  public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
> -    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
> -    if (isAbstractAggregator[index]) {
> -      ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
> -          .addTimesAggregated(((IntWritable) value).get());
> -    }
> -  }
> +  @SuppressWarnings("unchecked")
> +  public void masterAggregation(Text name, Writable value) {
> +    String nameIdx = name.toString().split(";", 2)[1];
> +    this.Aggregators.get(nameIdx).aggregate(null, value);
>
> -  /**
> -   * This method adds an id of an aggregator that will be skipped in the current
> -   * superstep.
> -   */
> -  public void addSkipAggregator(int index) {
> -    this.skipAggregators.add(index);
> -  }
> -
> -  /**
> -   * This method adds an id of an aggregator that will be skipped in the current
> -   * superstep.
> -   */
> -  void resetSkipAggregators() {
> -    this.skipAggregators.clear();
> +    // When it's time to send the values, we can see which aggregators are used.
> +    this.aggregatorsUsed.add(nameIdx);
>    }
>
>    @SuppressWarnings("unchecked")
> @@ -261,13 +143,7 @@ public final class AggregationRunner<V e
>          + " could not be found or instantiated!");
>    }
>
> -  public final Writable getLastAggregatedValue(int index) {
> -    return globalAggregatorResult[Preconditions.checkPositionIndex(index,
> -        globalAggregatorResult.length)];
> -  }
> -
> -  public final IntWritable getNumLastAggregatedVertices(int index) {
> -    return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
> -        globalAggregatorIncrement.length)];
> +  public final Writable getAggregatedValue(String name) {
> +    return this.aggregatorResults.get(name);
>    }
>  }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan  9 01:10:59 2014
> @@ -102,23 +102,20 @@ public class GraphJob extends BSPJob {
>    }
>
>    /**
> -   * Set the aggregator for the job.
> -   */
> -  @SuppressWarnings({ "rawtypes", "unchecked" })
> -  public void setAggregatorClass(Class<? extends Aggregator> cls) {
> -    this.setAggregatorClass(new Class[] { cls });
> -  }
> -
> -  /**
> -   * Sets multiple aggregators for the job.
> -   */
> +  * Custom aggregator registration. Add a custom aggregator
> +  * that will aggregate massages sent from the user.
> +  *
> +  * @param name identifies an aggregator
> +  * @param aggregatorClass the aggregator class
> +  */
>    @SuppressWarnings("rawtypes")
> -  public void setAggregatorClass(Class<? extends Aggregator>... cls) {
> -    String classNames = "";
> -    for (Class<? extends Aggregator> cl : cls) {
> -      classNames += cl.getName() + ";";
> -    }
> -    conf.set(AGGREGATOR_CLASS_ATTR, classNames);
> +  public void registerAggregator(String name, Class<? extends
> +      Aggregator> aggregatorClass) {
> +    String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
> +
> +    prevAggrs += name + "@" + aggregatorClass.getName() + ";";
> +
> +    this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
>    }
>
>    /**
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Jan  9 01:10:59 2014
> @@ -92,7 +92,6 @@ public final class GraphJobMessage imple
>      } else {
>        vertexId.write(out);
>      }
> -
>    }
>
>    public void fastReadFields(DataInput in) throws IOException {
> @@ -217,6 +216,7 @@ public final class GraphJobMessage imple
>        buffer = new DataInputBuffer();
>      }
>
> +    @Override
>      public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
>          int s2, int l2) {
>        try {
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan  9 01:10:59 2014
> @@ -64,12 +64,11 @@ public final class GraphJobRunner<V exte
>    // make sure that these values don't collide with the vertex names
>    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
>    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
> -  public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
> -  public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
> -  public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
> -  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
> -  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
> -  public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
> +  public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
> +  public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
> +  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
> +  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
> +
>    public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
>    public static final Text FLAG_VERTEX_INCREASE = new Text(
>        S_FLAG_VERTEX_INCREASE);
> @@ -79,8 +78,6 @@ public final class GraphJobRunner<V exte
>        S_FLAG_VERTEX_ALTER_COUNTER);
>    public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
>        S_FLAG_VERTEX_TOTAL_VERTICES);
> -  public static final Text FLAG_AGGREGATOR_SKIP = new Text(
> -      S_FLAG_AGGREGATOR_SKIP);
>
>    public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
>    public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
> @@ -183,22 +180,10 @@ public final class GraphJobRunner<V exte
>        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
>        throws IOException, SyncException, InterruptedException {
>
> -    if (isMasterTask(peer) && iteration == 1) {
> -      MapWritable updatedCnt = new MapWritable();
> -      updatedCnt.put(
> -          FLAG_VERTEX_TOTAL_VERTICES,
> -          new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
> -              .getCounter())));
> -      // send the updates from the master tasks back to the slaves
> -      for (String peerName : peer.getAllPeerNames()) {
> -        peer.send(peerName, new GraphJobMessage(updatedCnt));
> -      }
> -    }
> -
> -    // this is only done in every second iteration
> -    if (isMasterTask(peer) && iteration > 1) {
> +    // This run only on master
> +    if (isMasterTask(peer)) {
>        MapWritable updatedCnt = new MapWritable();
> -      // send total number of vertices.
> +      // send total number of vertices
>        updatedCnt.put(
>            FLAG_VERTEX_TOTAL_VERTICES,
>            new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
> @@ -214,26 +199,26 @@ public final class GraphJobRunner<V exte
>          peer.send(peerName, new GraphJobMessage(updatedCnt));
>        }
>      }
> -    if (getAggregationRunner().isEnabled() && iteration > 1) {
> -      // in case we need to sync, we need to replay the messages that already
> -      // are added to the queue. This prevents loosing messages when using
> -      // aggregators.
> -      if (firstVertexMessage != null) {
> -        peer.send(peer.getPeerName(), firstVertexMessage);
> -      }
> -      GraphJobMessage msg = null;
> -      while ((msg = peer.getCurrentMessage()) != null) {
> -        peer.send(peer.getPeerName(), msg);
> -      }
> -      // now sync
> -      peer.sync();
> -      // now the map message must be read that might be send from the master
> -      updated = getAggregationRunner().receiveAggregatedValues(
> -          peer.getCurrentMessage().getMap(), iteration);
> -      // set the first vertex message back to the message it had before sync
> -      firstVertexMessage = peer.getCurrentMessage();
> +
> +    // in case we need to sync, we need to replay the messages that already
> +    // are added to the queue. This prevents loosing messages when using
> +    // aggregators.
> +    if (firstVertexMessage != null) {
> +      peer.send(peer.getPeerName(), firstVertexMessage);
>      }
> -    this.aggregationRunner.resetSkipAggregators();
> +
> +    GraphJobMessage msg = null;
> +    while ((msg = peer.getCurrentMessage()) != null) {
> +      peer.send(peer.getPeerName(), msg);
> +    }
> +
> +    // now sync
> +    peer.sync();
> +    // now the map message must be read that might be send from the master
> +    updated = getAggregationRunner().receiveAggregatedValues(
> +        peer.getCurrentMessage().getMap(), iteration);
> +    // set the first vertex message back to the message it had before sync
> +    firstVertexMessage = peer.getCurrentMessage();
>      return firstVertexMessage;
>    }
>
> @@ -274,7 +259,6 @@ public final class GraphJobRunner<V exte
>        }
>
>        if (!vertex.isHalted()) {
> -        M lastValue = vertex.getValue();
>          if (iterable == null) {
>            vertex.compute(Collections.<M> emptyList());
>          } else {
> @@ -286,7 +270,6 @@ public final class GraphJobRunner<V exte
>            }
>            currentMessage = iterable.getOverflowMessage();
>          }
> -        getAggregationRunner().aggregateVertex(lastValue, vertex);
>          activeVertices++;
>        }
>
> @@ -296,8 +279,7 @@ public final class GraphJobRunner<V exte
>      }
>      vertices.finishSuperstep();
>
> -    getAggregationRunner().sendAggregatorValues(peer, activeVertices,
> -        this.changedVertexCnt);
> +    sendControllValues(activeVertices, this.changedVertexCnt);
>      iteration++;
>    }
>
> @@ -357,15 +339,13 @@ public final class GraphJobRunner<V exte
>      while (skippingIterator.hasNext()) {
>        Vertex<V, E, M> vertex = skippingIterator.next();
>
> -      M lastValue = vertex.getValue();
>        // Calls setup method.
>        vertex.setup(conf);
>        vertex.compute(Collections.singleton(vertex.getValue()));
> -      getAggregationRunner().aggregateVertex(lastValue, vertex);
>        vertices.finishVertexComputation(vertex);
>      }
>      vertices.finishSuperstep();
> -    getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
> +    sendControllValues(1, this.changedVertexCnt);
>      iteration++;
>    }
>
> @@ -594,14 +574,10 @@ public final class GraphJobRunner<V exte
>              } else {
>                globalUpdateCounts += ((IntWritable) e.getValue()).get();
>              }
> -          } else if (getAggregationRunner().isEnabled()
> -              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
> -            getAggregationRunner().masterReadAggregatedValue(vertexID,
> -                (M) e.getValue());
> -          } else if (getAggregationRunner().isEnabled()
> -              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
> -            getAggregationRunner().masterReadAggregatedIncrementalValue(
> -                vertexID, (M) e.getValue());
> +
> +          } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
> +            this.getAggregationRunner().masterAggregation(vertexID,
> +                e.getValue());
>            } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
>              dynamicAdditions = true;
>              addVertex((Vertex<V, E, M>) e.getValue());
> @@ -619,21 +595,11 @@ public final class GraphJobRunner<V exte
>                    "A message to increase vertex count is in a wrong place: "
>                        + peer);
>              }
> -          } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
> -            if (isMasterTask(peer)) {
> -              this.getAggregationRunner().addSkipAggregator(
> -                  ((IntWritable) e.getValue()).get());
> -            } else {
> -              throw new UnsupportedOperationException(
> -                  "A message to skip aggregators is in a wrong peer: " + peer);
> -            }
>            }
>          }
> -
>        } else {
>          throw new UnsupportedOperationException("Unknown message type: " + msg);
>        }
> -
>      }
>
>      // If we applied any changes to vertices, we need to call finishAdditions
> @@ -677,23 +643,20 @@ public final class GraphJobRunner<V exte
>    }
>
>    /**
> -   * Gets the last aggregated value at the given index. The index is dependend
> -   * on how the aggregators were configured during job setup phase.
> -   *
> -   * @return the value of the aggregator, or null if none was defined.
> -   */
> -  public final Writable getLastAggregatedValue(int index) {
> -    return getAggregationRunner().getLastAggregatedValue(index);
> -  }
> -
> -  /**
> -   * Gets the last aggregated number of vertices at the given index. The index
> -   * is dependend on how the aggregators were configured during job setup phase.
> +   * Runs internal aggregators and send their values to the master task.
>     *
> -   * @return the value of the aggregator, or null if none was defined.
> +   * @param activeVertices number of active vertices in this peer
> +   * @param changedVertexCnt number of added/removed vertices in a superstep
>     */
> -  public final IntWritable getNumLastAggregatedVertices(int index) {
> -    return getAggregationRunner().getNumLastAggregatedVertices(index);
> +  private void sendControllValues(int activeVertices, int changedVertexCnt)
> +      throws IOException {
> +    // send msgCounts to the master task
> +    MapWritable updatedCnt = new MapWritable();
> +    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
> +    // send total number of vertices changes
> +    updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
> +        new LongWritable(changedVertexCnt));
> +    peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
>    }
>
>    /**
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Thu Jan  9 01:10:59 2014
> @@ -82,6 +82,7 @@ public class OffHeapVerticesInfo<V exten
>      vertices.dump();
>    }
>
> +  @Override
>    public void addVertex(Vertex<V, E, M> vertex) {
>      vertices.put(vertex.getVertexID(), vertex);
>    }
> @@ -108,6 +109,7 @@ public class OffHeapVerticesInfo<V exten
>      vertices.clear();
>    }
>
> +  @Override
>    public int size() {
>      return (int) this.vertices.entries();
>    }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan  9 01:10:59 2014
> @@ -27,8 +27,8 @@ import java.io.IOException;
>  import java.util.ArrayList;
>  import java.util.List;
>
> -import org.apache.hadoop.io.IntWritable;
>  import org.apache.hadoop.io.MapWritable;
> +import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.io.Writable;
>  import org.apache.hadoop.io.WritableComparable;
>  import org.apache.hama.HamaConfiguration;
> @@ -75,7 +75,7 @@ public abstract class Vertex<V extends W
>    @Override
>    public void setup(HamaConfiguration conf) {
>    }
> -
> +
>    @Override
>    public void sendMessage(Edge<V, E> e, M msg) throws IOException {
>      runner.getPeer().send(getDestinationPeerName(e),
> @@ -120,24 +120,26 @@ public abstract class Vertex<V extends W
>    private void alterVertexCounter(int i) throws IOException {
>      this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
>    }
> -
> +
>    @Override
> -  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException {
> +  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
> +      throws IOException {
>      MapWritable msg = new MapWritable();
>      // Create the new vertex.
> -    Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
> +    Vertex<V, E, M> vertex = GraphJobRunner
> +        .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
>      vertex.setEdges(edges);
>      vertex.setValue(value);
>      vertex.setVertexID(vertexID);
> -
> +
>      msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
>      // Find the proper partition to host the new vertex.
> -    int partition = getPartitioner().getPartition(vertexID, value,
> +    int partition = getPartitioner().getPartition(vertexID, value,
>          runner.getPeer().getNumPeers());
>      String destPeer = runner.getPeer().getAllPeerNames()[partition];
> -
> +
>      runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> -
> +
>      alterVertexCounter(1);
>    }
>
> @@ -145,11 +147,11 @@ public abstract class Vertex<V extends W
>    public void remove() throws IOException {
>      MapWritable msg = new MapWritable();
>      msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
> -
> +
>      // Get master task peer.
>      String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
>      runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> -
> +
>      alterVertexCounter(-1);
>    }
>
> @@ -192,31 +194,6 @@ public abstract class Vertex<V extends W
>      return runner.getMaxIteration();
>    }
>
> -  /**
> -   * Get the last aggregated value of the defined aggregator, null if nothing
> -   * was configured or not returned a result. You have to supply an index, the
> -   * index is defined by the order you set the aggregator classes in
> -   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
> -   * so if you have a single aggregator you can retrieve it via
> -   * {@link #getLastAggregatedValue}(0).
> -   */
> -  @SuppressWarnings("unchecked")
> -  public M getLastAggregatedValue(int index) {
> -    return (M) runner.getLastAggregatedValue(index);
> -  }
> -
> -  /**
> -   * Get the number of aggregated vertices in the last superstep. Or null if no
> -   * aggregator is available.You have to supply an index, the index is defined
> -   * by the order you set the aggregator classes in
> -   * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
> -   * so if you have a single aggregator you can retrieve it via
> -   * {@link #getNumLastAggregatedVertices}(0).
> -   */
> -  public IntWritable getNumLastAggregatedVertices(int index) {
> -    return runner.getNumLastAggregatedVertices(index);
> -  }
> -
>    public int getNumPeers() {
>      return runner.getPeer().getNumPeers();
>    }
> @@ -245,21 +222,6 @@ public abstract class Vertex<V extends W
>      this.votedToHalt = true;
>    }
>
> -  /**
> -   * Disable an aggregator for the next superstep. The returning value of
> -   * the aggregator will be null.
> -   */
> -  public void skipAggregator(int index) throws IOException {
> -    MapWritable msg = new MapWritable();
> -    msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
> -
> -    this.runner.getAggregationRunner().addSkipAggregator(index);
> -
> -    // Get master task peer.
> -    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
> -    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> -  }
> -
>    void setActive() {
>      this.votedToHalt = false;
>    }
> @@ -314,7 +276,7 @@ public abstract class Vertex<V extends W
>        }
>        this.value.readFields(in);
>      }
> -
> +
>      this.edges = new ArrayList<Edge<V, E>>();
>      if (in.readBoolean()) {
>        int num = in.readInt();
> @@ -345,7 +307,7 @@ public abstract class Vertex<V extends W
>        out.writeBoolean(true);
>        vertexID.write(out);
>      }
> -
> +
>      if (value == null) {
>        out.writeBoolean(false);
>      } else {
> @@ -399,6 +361,30 @@ public abstract class Vertex<V extends W
>
>    }
>
> +  /**
> +   * Provides a value to the specified aggregator.
> +   *
> +   * @throws IOException
> +   *
> +   * @param name identifies an aggregator
> +   * @param value value to be aggregated
> +   */
> +  @Override
> +  public void aggregate(String name, M value) throws IOException {
> +    MapWritable msg = new MapWritable();
> +    msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
> +        value);
> +
> +    // Get master task peer.
> +    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
> +    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> +  }
> +
> +  @Override
> +  public Writable getAggregatedValue(String name) {
> +    return this.runner.getAggregationRunner().getAggregatedValue(name);
> +  }
> +
>    protected void setRunner(GraphJobRunner<V, E, M> runner) {
>      this.runner = runner;
>    }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jan  9 01:10:59 2014
> @@ -112,4 +112,19 @@ public interface VertexInterface<V exten
>     */
>    public M getValue();
>
> +  /**
> +   * Provides a value to the specified aggregator.
> +   *
> +   * @throws IOException
> +   *
> +   * @param name identifies a aggregator
> +   * @param value value to be aggregated
> +   */
> +  public void aggregate(String name, M value) throws IOException;
> +
> +  /**
> +   * Returns the value of the specified aggregator.
> +   */
> +  public Writable getAggregatedValue(String name);
> +
>  }
>
> Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
> +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Jan  9 01:10:59 2014
> @@ -44,7 +44,7 @@ import org.junit.Before;
>  public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
>
>    String[] input = new String[] { "stackoverflow.com\tyahoo.com",
> -      "facebook.com\ttwitter.com",
> +      "facebook.com\ttwitter.com",
>        "facebook.com\tgoogle.com\tnasa.gov",
>        "yahoo.com\tnasa.gov\tstackoverflow.com",
>        "twitter.com\tgoogle.com\tfacebook.com",
> @@ -56,6 +56,7 @@ public class TestSubmitGraphJob extends
>    @SuppressWarnings("rawtypes")
>    private static final List<Class<? extends VerticesInfo>> vi = new ArrayList<Class<? extends VerticesInfo>>();
>
> +  @Override
>    @Before
>    public void setUp() throws Exception {
>      super.setUp();
> @@ -84,7 +85,7 @@ public class TestSubmitGraphJob extends
>      // set the defaults
>      bsp.setMaxIteration(30);
>
> -    bsp.setAggregatorClass(AverageAggregator.class);
> +    bsp.registerAggregator("avg", AverageAggregator.class);
>
>      bsp.setInputFormat(SequenceFileInputFormat.class);
>      bsp.setInputKeyClass(Text.class);
>
> Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
> +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Thu Jan  9 01:10:59 2014
> @@ -43,7 +43,7 @@ public class PageRank {
>
>    public static class PageRankVertex extends
>        Vertex<Text, NullWritable, DoubleWritable> {
> -
> +
>      static double DAMPING_FACTOR = 0.85;
>      static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
>
> @@ -74,7 +74,7 @@ public class PageRank {
>        }
>
>        // if we have not reached our global error yet, then proceed.
> -      DoubleWritable globalError = getLastAggregatedValue(0);
> +      DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg");
>        if (globalError != null && this.getSuperstepCount() > 2
>            && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
>          voteToHalt();
> @@ -84,6 +84,8 @@ public class PageRank {
>        // in each superstep we are going to send a new rank to our neighbours
>        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
>            / this.getEdges().size()));
> +
> +      this.aggregate("avg", this.getValue());
>      }
>
>    }
> @@ -126,7 +128,7 @@ public class PageRank {
>      }
>
>      // error
> -    pageJob.setAggregatorClass(AverageAggregator.class);
> +    pageJob.registerAggregator("avg", AverageAggregator.class);
>
>      // Vertex reader
>      pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
>
>



-- 
Best Regards, Edward J. Yoon
@eddieyoon