You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Edward J. Yoon" <ed...@apache.org> on 2014/01/23 08:40:15 UTC
Re: svn commit: r1556691 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/
examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/
graph/src/test/java/org/apache/hama/graph/ graph/src/test/java/org/ap...
This patch is totally wrote wrongly. I'll revert this commit and
re-open the HAMA-838.
Please assign me.
On Thu, Jan 9, 2014 at 10:10 AM, <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Thu Jan 9 01:10:59 2014
> New Revision: 1556691
>
> URL: http://svn.apache.org/r1556691
> Log:
> HAMA-838: Refactor aggregators
>
> Added:
> hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
> Modified:
> hama/trunk/CHANGES.txt
> hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
> hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
> hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
> hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
>
> Modified: hama/trunk/CHANGES.txt
> URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/CHANGES.txt (original)
> +++ hama/trunk/CHANGES.txt Thu Jan 9 01:10:59 2014
> @@ -20,6 +20,7 @@ Release 0.7.0 (unreleased changes)
>
> IMPROVEMENTS
>
> + HAMA-838: Refactor aggregators (Anastasis Andronidis)
> HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)
> HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes (Martin Illecker)
> HAMA-808: Hama Pipes Testcase (Martin Illecker)
>
> Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
> URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
> +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Thu Jan 9 01:10:59 2014
> @@ -40,6 +40,7 @@ import org.apache.hama.graph.VertexInput
> * Real pagerank with dangling node contribution.
> */
> public class PageRank {
> + private static final String AVG_AGGREGATOR = "average.aggregator";
>
> public static class PageRankVertex extends
> Vertex<Text, NullWritable, DoubleWritable> {
> @@ -63,29 +64,29 @@ public class PageRank {
> public void compute(Iterable<DoubleWritable> messages) throws IOException {
> // initialize this vertex to 1 / count of global vertices in this graph
> if (this.getSuperstepCount() == 0) {
> - this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
> + setValue(new DoubleWritable(1.0 / this.getNumVertices()));
> } else if (this.getSuperstepCount() >= 1) {
> double sum = 0;
> for (DoubleWritable msg : messages) {
> sum += msg.get();
> }
> double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
> - this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
> + setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
> + aggregate(AVG_AGGREGATOR, this.getValue());
> }
>
> // if we have not reached our global error yet, then proceed.
> - DoubleWritable globalError = getLastAggregatedValue(0);
> + DoubleWritable globalError = (DoubleWritable) getAggregatedValue(AVG_AGGREGATOR);
> +
> if (globalError != null && this.getSuperstepCount() > 2
> && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
> voteToHalt();
> - return;
> + } else {
> + // in each superstep we are going to send a new rank to our neighbours
> + sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
> + / this.getEdges().size()));
> }
> -
> - // in each superstep we are going to send a new rank to our neighbours
> - sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
> - / this.getEdges().size()));
> }
> -
> }
>
> public static class PagerankSeqReader
> @@ -126,7 +127,7 @@ public class PageRank {
> }
>
> // error
> - pageJob.setAggregatorClass(AverageAggregator.class);
> + pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
>
> // Vertex reader
> pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
>
> Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
> URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java?rev=1556691&view=auto
> ==============================================================================
> --- hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java (added)
> +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java Thu Jan 9 01:10:59 2014
> @@ -0,0 +1,208 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hama.examples;
> +
> +import java.io.BufferedReader;
> +import java.io.IOException;
> +import java.io.InputStreamReader;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.fs.FileStatus;
> +import org.apache.hadoop.fs.FileSystem;
> +import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.io.DoubleWritable;
> +import org.apache.hadoop.io.LongWritable;
> +import org.apache.hadoop.io.NullWritable;
> +import org.apache.hadoop.io.Text;
> +import org.apache.hama.HamaConfiguration;
> +import org.apache.hama.bsp.HashPartitioner;
> +import org.apache.hama.bsp.TextInputFormat;
> +import org.apache.hama.bsp.TextOutputFormat;
> +import org.apache.hama.graph.GraphJob;
> +import org.apache.hama.graph.SumAggregator;
> +import org.apache.hama.graph.Vertex;
> +import org.apache.hama.graph.VertexInputReader;
> +import org.junit.Test;
> +
> +/**
> + * Unit test for aggregators
> + */
> +public class AggregatorsTest extends TestCase {
> + private static String OUTPUT = "/tmp/page-out";
> + private Configuration conf = new HamaConfiguration();
> + private FileSystem fs;
> +
> + private void deleteTempDirs() {
> + try {
> + if (fs.exists(new Path(OUTPUT)))
> + fs.delete(new Path(OUTPUT), true);
> + } catch (IOException e) {
> + e.printStackTrace();
> + }
> + }
> +
> + private void verifyResult() throws IOException {
> + FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
> + for (FileStatus fts : globStatus) {
> + BufferedReader reader = new BufferedReader(new InputStreamReader(
> + fs.open(fts.getPath())));
> + String line = null;
> +
> + String[] results = { "6.0", "2.0", "3.0", "4.0" };
> +
> + for (int i = 1; i < 5; i++) {
> + line = reader.readLine();
> + String[] split = line.split("\t");
> + assertTrue(split[0].equals(String.valueOf(i)));
> + assertTrue(split[1].equals(results[i - 1]));
> + System.out.println(split[0] + " : " + split[1]);
> + }
> + }
> + }
> +
> + @Override
> + protected void setUp() throws Exception {
> + super.setUp();
> + fs = FileSystem.get(conf);
> + }
> +
> + @Test
> + public void test() throws IOException, InterruptedException,
> + ClassNotFoundException {
> + try {
> + CustomAggregators
> + .main(new String[] { "src/test/resources/dg.txt", OUTPUT });
> + verifyResult();
> + } finally {
> + deleteTempDirs();
> + }
> + }
> +
> + static class CustomAggregators {
> +
> + public static class GraphTextReader
> + extends
> + VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
> +
> + @Override
> + public boolean parseVertex(LongWritable key, Text value,
> + Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
> +
> + vertex.setVertexID(value);
> + vertex
> + .setValue(new DoubleWritable(Double.parseDouble(value.toString())));
> +
> + return true;
> + }
> + }
> +
> + public static class GraphVertex extends
> + Vertex<Text, NullWritable, DoubleWritable> {
> +
> + @Override
> + public void compute(Iterable<DoubleWritable> msgs) throws IOException {
> +
> + // We will send 2 custom messages on superstep 2 and 4 only!
> + if (this.getSuperstepCount() == 2) {
> + this.aggregate("mySum", new DoubleWritable(1.0));
> + }
> +
> + if (this.getSuperstepCount() == 4) {
> + this.aggregate("mySum", new DoubleWritable(2.0));
> + }
> +
> + // We will get the first aggrigation result from our custom aggregator
> + // on superstep 3,
> + // and we will store the result only in vertex 4.
> + // This vertex should have value = 4.
> + if (this.getSuperstepCount() == 3
> + && this.getVertexID().toString().equals("4")) {
> + this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
> + }
> +
> + // By setting vertex number 3 to halt, we will see a change on the
> + // aggregating results
> + // in both custom and global aggregators.
> + // This vertex should have value = 3.
> + if (this.getSuperstepCount() == 3
> + && this.getVertexID().toString().equals("3")) {
> + this.voteToHalt();
> + }
> +
> + // This vertex should have value = 6 (3 vertices are working x 2 the
> + // custom value)
> + if (this.getSuperstepCount() == 5
> + && this.getVertexID().toString().equals("1")) {
> + this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
> + }
> +
> + if (this.getSuperstepCount() == 6) {
> + this.voteToHalt();
> + }
> + }
> + }
> +
> + public static void main(String[] args) throws IOException,
> + InterruptedException, ClassNotFoundException {
> + if (args.length != 2) {
> + printUsage();
> + }
> + HamaConfiguration conf = new HamaConfiguration(new Configuration());
> + GraphJob graphJob = createJob(args, conf);
> + long startTime = System.currentTimeMillis();
> + if (graphJob.waitForCompletion(true)) {
> + System.out.println("Job Finished in "
> + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
> + }
> + }
> +
> + private static void printUsage() {
> + System.out.println("Usage: <input> <output>");
> + System.exit(-1);
> + }
> +
> + private static GraphJob createJob(String[] args, HamaConfiguration conf)
> + throws IOException {
> + GraphJob graphJob = new GraphJob(conf, CustomAggregators.class);
> + graphJob.setJobName("Custom Aggregators");
> + graphJob.setVertexClass(GraphVertex.class);
> +
> + graphJob.registerAggregator("mySum", SumAggregator.class);
> +
> + graphJob.setInputPath(new Path(args[0]));
> + graphJob.setOutputPath(new Path(args[1]));
> +
> + graphJob.setVertexIDClass(Text.class);
> + graphJob.setVertexValueClass(DoubleWritable.class);
> + graphJob.setEdgeValueClass(NullWritable.class);
> +
> + graphJob.setInputFormat(TextInputFormat.class);
> +
> + graphJob.setVertexInputReaderClass(GraphTextReader.class);
> + graphJob.setPartitioner(HashPartitioner.class);
> +
> + graphJob.setOutputFormat(TextOutputFormat.class);
> + graphJob.setOutputKeyClass(Text.class);
> + graphJob.setOutputValueClass(DoubleWritable.class);
> +
> + return graphJob;
> + }
> + }
> +}
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Thu Jan 9 01:10:59 2014
> @@ -17,22 +17,20 @@
> */
> package org.apache.hama.graph;
>
> -import java.io.IOException;
> +import java.util.HashMap;
> import java.util.HashSet;
> +import java.util.Map;
> +import java.util.Map.Entry;
> import java.util.Set;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.io.IntWritable;
> -import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.MapWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.io.Writable;
> import org.apache.hadoop.io.WritableComparable;
> import org.apache.hadoop.util.ReflectionUtils;
> import org.apache.hama.bsp.BSPPeer;
> -import org.apache.hama.bsp.sync.SyncException;
> -
> -import com.google.common.base.Preconditions;
>
> /**
> * Runner class to do the tasks that need to be done if aggregation was
> @@ -41,117 +39,31 @@ import com.google.common.base.Preconditi
> */
> @SuppressWarnings("rawtypes")
> public final class AggregationRunner<V extends WritableComparable, E extends Writable, M extends Writable> {
> + private Map<String, Aggregator> Aggregators;
> + private Map<String, Writable> aggregatorResults;
> + private Set<String> aggregatorsUsed;
>
> - // multiple aggregator arrays
> - private Aggregator<M, Vertex<V, E, M>>[] aggregators;
> - private Set<Integer> skipAggregators;
> - private Writable[] globalAggregatorResult;
> - private IntWritable[] globalAggregatorIncrement;
> - private boolean[] isAbstractAggregator;
> - private String[] aggregatorClassNames;
> - private Text[] aggregatorValueFlag;
> - private Text[] aggregatorIncrementFlag;
> - // aggregator on the master side
> - private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
> -
> - private boolean enabled = false;
> private Configuration conf;
> + private Text textWrap = new Text();
>
> - @SuppressWarnings("unchecked")
> public void setupAggregators(
> BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
> this.conf = peer.getConfiguration();
> - String aggregatorClasses = peer.getConfiguration().get(
> +
> + this.aggregatorResults = new HashMap<String, Writable>(4);
> + this.Aggregators = new HashMap<String, Aggregator>(4);
> + this.aggregatorsUsed = new HashSet<String>(4);
> +
> + String customAggregatorClasses = peer.getConfiguration().get(
> GraphJob.AGGREGATOR_CLASS_ATTR);
> - this.skipAggregators = new HashSet<Integer>();
> - if (aggregatorClasses != null) {
> - enabled = true;
> - aggregatorClassNames = aggregatorClasses.split(";");
> - // init to the split size
> - aggregators = new Aggregator[aggregatorClassNames.length];
> - globalAggregatorResult = new Writable[aggregatorClassNames.length];
> - globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
> - isAbstractAggregator = new boolean[aggregatorClassNames.length];
> - aggregatorValueFlag = new Text[aggregatorClassNames.length];
> - aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
> - if (GraphJobRunner.isMasterTask(peer)) {
> - masterAggregator = new Aggregator[aggregatorClassNames.length];
> - }
> - for (int i = 0; i < aggregatorClassNames.length; i++) {
> - aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
> - aggregatorValueFlag[i] = new Text(
> - GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
> - aggregatorIncrementFlag[i] = new Text(
> - GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
> - if (aggregators[i] instanceof AbstractAggregator) {
> - isAbstractAggregator[i] = true;
> - }
> - if (GraphJobRunner.isMasterTask(peer)) {
> - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
> - }
> - }
> - }
> - }
>
> - /**
> - * Runs the aggregators by sending their values to the master task.
> - * @param changedVertexCnt
> - */
> - public void sendAggregatorValues(
> - BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
> - int activeVertices, int changedVertexCnt) throws IOException {
> - // send msgCounts to the master task
> - MapWritable updatedCnt = new MapWritable();
> - updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
> - activeVertices));
> - // send total number of vertices changes
> - updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
> - changedVertexCnt));
> - // also send aggregated values to the master
> - if (aggregators != null) {
> - for (int i = 0; i < this.aggregators.length; i++) {
> - if (!this.skipAggregators.contains(i)) {
> - updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
> - if (isAbstractAggregator[i]) {
> - updatedCnt.put(aggregatorIncrementFlag[i],
> - ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
> - .getTimesAggregated());
> - }
> - }
> - }
> - for (int i = 0; i < aggregators.length; i++) {
> - if (!this.skipAggregators.contains(i)) {
> - // now create new aggregators for the next iteration
> - aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
> - if (GraphJobRunner.isMasterTask(peer)) {
> - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
> - }
> - }
> - }
> - }
> - peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
> - updatedCnt));
> - }
> + if (customAggregatorClasses != null) {
> + String[] custAggrs = customAggregatorClasses.split(";");
>
> - /**
> - * Aggregates the last value before computation and the value after the
> - * computation.
> - *
> - * @param lastValue the value before compute().
> - * @param v the vertex.
> - */
> - public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
> - if (isEnabled()) {
> - for (int i = 0; i < this.aggregators.length; i++) {
> - if (!this.skipAggregators.contains(i)) {
> - Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
> - aggregator.aggregate(v, v.getValue());
> - if (isAbstractAggregator[i]) {
> - AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
> - intern.aggregate(v, lastValue, v.getValue());
> - intern.aggregateInternal();
> - }
> - }
> + for (String aggr : custAggrs) {
> + String[] Name_AggrClass = aggr.split("@", 2);
> + this.Aggregators.put(Name_AggrClass[0],
> + getNewAggregator(Name_AggrClass[1]));
> }
> }
> }
> @@ -161,26 +73,20 @@ public final class AggregationRunner<V e
> * peer and updates the given map accordingly.
> */
> public void doMasterAggregation(MapWritable updatedCnt) {
> - if (isEnabled()) {
> - // work through the master aggregators
> - for (int i = 0; i < masterAggregator.length; i++) {
> - if (!this.skipAggregators.contains(i)) {
> - Writable lastAggregatedValue = masterAggregator[i].getValue();
> - if (isAbstractAggregator[i]) {
> - final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
> - final Writable finalizeAggregation = intern.finalizeAggregation();
> - if (intern.finalizeAggregation() != null) {
> - lastAggregatedValue = finalizeAggregation;
> - }
> - // this count is usually the times of active
> - // vertices in the graph
> - updatedCnt.put(aggregatorIncrementFlag[i],
> - intern.getTimesAggregated());
> - }
> - updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
> - }
> - }
> + // Get results only from used aggregators.
> + for (String name : this.aggregatorsUsed) {
> + updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
> + }
> + this.aggregatorsUsed.clear();
> +
> + // Reset all custom aggregators. TODO: Change the aggregation interface to
> + // include clean() method.
> + Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
> + for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
> + String aggClass = e.getValue().getClass().getName();
> + tmp.put(e.getKey(), getNewAggregator(aggClass));
> }
> + this.Aggregators = tmp;
> }
>
> /**
> @@ -190,13 +96,20 @@ public final class AggregationRunner<V e
> * we haven't seen any messages anymore.
> */
> public boolean receiveAggregatedValues(MapWritable updatedValues,
> - long iteration) throws IOException, SyncException, InterruptedException {
> - // map is the first value that is in the queue
> - for (int i = 0; i < aggregators.length; i++) {
> - globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
> - globalAggregatorIncrement[i] = (IntWritable) updatedValues
> - .get(aggregatorIncrementFlag[i]);
> + long iteration) {
> + // In every superstep, we create a new result collection as we don't save
> + // history.
> + // If a value is missing, the user will take a null result. By creating a
> + // new collection
> + // every time, we can reduce the network cost (because we send less
> + // information by skipping null values)
> + // But we are losing in GC.
> + this.aggregatorResults = new HashMap<String, Writable>(4);
> + for (String name : this.Aggregators.keySet()) {
> + this.textWrap.set(name);
> + this.aggregatorResults.put(name, updatedValues.get(textWrap));
> }
> +
> IntWritable count = (IntWritable) updatedValues
> .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
> if (count != null && count.get() == Integer.MIN_VALUE) {
> @@ -206,47 +119,16 @@ public final class AggregationRunner<V e
> }
>
> /**
> - * @return true if aggregators were defined. Normally used by the internal
> - * stateful methods, outside shouldn't use it too extensively.
> + * Method to let the custom master aggregator read messages from peers and
> + * aggregate a value.
> */
> - public boolean isEnabled() {
> - return enabled;
> - }
> -
> - /**
> - * Method to let the master read messages from peers and aggregate a value.
> - */
> - public void masterReadAggregatedValue(Text textIndex, M value) {
> - int index = Integer.parseInt(textIndex.toString().split(";")[1]);
> - masterAggregator[index].aggregate(null, value);
> - }
> -
> - /**
> - * Method to let the master read messages from peers and aggregate the
> - * incremental value.
> - */
> - public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
> - int index = Integer.parseInt(textIndex.toString().split(";")[1]);
> - if (isAbstractAggregator[index]) {
> - ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
> - .addTimesAggregated(((IntWritable) value).get());
> - }
> - }
> + @SuppressWarnings("unchecked")
> + public void masterAggregation(Text name, Writable value) {
> + String nameIdx = name.toString().split(";", 2)[1];
> + this.Aggregators.get(nameIdx).aggregate(null, value);
>
> - /**
> - * This method adds an id of an aggregator that will be skipped in the current
> - * superstep.
> - */
> - public void addSkipAggregator(int index) {
> - this.skipAggregators.add(index);
> - }
> -
> - /**
> - * This method adds an id of an aggregator that will be skipped in the current
> - * superstep.
> - */
> - void resetSkipAggregators() {
> - this.skipAggregators.clear();
> + // When it's time to send the values, we can see which aggregators are used.
> + this.aggregatorsUsed.add(nameIdx);
> }
>
> @SuppressWarnings("unchecked")
> @@ -261,13 +143,7 @@ public final class AggregationRunner<V e
> + " could not be found or instantiated!");
> }
>
> - public final Writable getLastAggregatedValue(int index) {
> - return globalAggregatorResult[Preconditions.checkPositionIndex(index,
> - globalAggregatorResult.length)];
> - }
> -
> - public final IntWritable getNumLastAggregatedVertices(int index) {
> - return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
> - globalAggregatorIncrement.length)];
> + public final Writable getAggregatedValue(String name) {
> + return this.aggregatorResults.get(name);
> }
> }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan 9 01:10:59 2014
> @@ -102,23 +102,20 @@ public class GraphJob extends BSPJob {
> }
>
> /**
> - * Set the aggregator for the job.
> - */
> - @SuppressWarnings({ "rawtypes", "unchecked" })
> - public void setAggregatorClass(Class<? extends Aggregator> cls) {
> - this.setAggregatorClass(new Class[] { cls });
> - }
> -
> - /**
> - * Sets multiple aggregators for the job.
> - */
> + * Custom aggregator registration. Add a custom aggregator
> + * that will aggregate massages sent from the user.
> + *
> + * @param name identifies an aggregator
> + * @param aggregatorClass the aggregator class
> + */
> @SuppressWarnings("rawtypes")
> - public void setAggregatorClass(Class<? extends Aggregator>... cls) {
> - String classNames = "";
> - for (Class<? extends Aggregator> cl : cls) {
> - classNames += cl.getName() + ";";
> - }
> - conf.set(AGGREGATOR_CLASS_ATTR, classNames);
> + public void registerAggregator(String name, Class<? extends
> + Aggregator> aggregatorClass) {
> + String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
> +
> + prevAggrs += name + "@" + aggregatorClass.getName() + ";";
> +
> + this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
> }
>
> /**
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Jan 9 01:10:59 2014
> @@ -92,7 +92,6 @@ public final class GraphJobMessage imple
> } else {
> vertexId.write(out);
> }
> -
> }
>
> public void fastReadFields(DataInput in) throws IOException {
> @@ -217,6 +216,7 @@ public final class GraphJobMessage imple
> buffer = new DataInputBuffer();
> }
>
> + @Override
> public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
> int s2, int l2) {
> try {
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan 9 01:10:59 2014
> @@ -64,12 +64,11 @@ public final class GraphJobRunner<V exte
> // make sure that these values don't collide with the vertex names
> public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
> public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
> - public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
> - public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
> - public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
> - public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
> - public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
> - public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
> + public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
> + public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
> + public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
> + public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
> +
> public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
> public static final Text FLAG_VERTEX_INCREASE = new Text(
> S_FLAG_VERTEX_INCREASE);
> @@ -79,8 +78,6 @@ public final class GraphJobRunner<V exte
> S_FLAG_VERTEX_ALTER_COUNTER);
> public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
> S_FLAG_VERTEX_TOTAL_VERTICES);
> - public static final Text FLAG_AGGREGATOR_SKIP = new Text(
> - S_FLAG_AGGREGATOR_SKIP);
>
> public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
> public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
> @@ -183,22 +180,10 @@ public final class GraphJobRunner<V exte
> BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
> throws IOException, SyncException, InterruptedException {
>
> - if (isMasterTask(peer) && iteration == 1) {
> - MapWritable updatedCnt = new MapWritable();
> - updatedCnt.put(
> - FLAG_VERTEX_TOTAL_VERTICES,
> - new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
> - .getCounter())));
> - // send the updates from the master tasks back to the slaves
> - for (String peerName : peer.getAllPeerNames()) {
> - peer.send(peerName, new GraphJobMessage(updatedCnt));
> - }
> - }
> -
> - // this is only done in every second iteration
> - if (isMasterTask(peer) && iteration > 1) {
> + // This run only on master
> + if (isMasterTask(peer)) {
> MapWritable updatedCnt = new MapWritable();
> - // send total number of vertices.
> + // send total number of vertices
> updatedCnt.put(
> FLAG_VERTEX_TOTAL_VERTICES,
> new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
> @@ -214,26 +199,26 @@ public final class GraphJobRunner<V exte
> peer.send(peerName, new GraphJobMessage(updatedCnt));
> }
> }
> - if (getAggregationRunner().isEnabled() && iteration > 1) {
> - // in case we need to sync, we need to replay the messages that already
> - // are added to the queue. This prevents loosing messages when using
> - // aggregators.
> - if (firstVertexMessage != null) {
> - peer.send(peer.getPeerName(), firstVertexMessage);
> - }
> - GraphJobMessage msg = null;
> - while ((msg = peer.getCurrentMessage()) != null) {
> - peer.send(peer.getPeerName(), msg);
> - }
> - // now sync
> - peer.sync();
> - // now the map message must be read that might be send from the master
> - updated = getAggregationRunner().receiveAggregatedValues(
> - peer.getCurrentMessage().getMap(), iteration);
> - // set the first vertex message back to the message it had before sync
> - firstVertexMessage = peer.getCurrentMessage();
> +
> + // in case we need to sync, we need to replay the messages that already
> + // are added to the queue. This prevents loosing messages when using
> + // aggregators.
> + if (firstVertexMessage != null) {
> + peer.send(peer.getPeerName(), firstVertexMessage);
> }
> - this.aggregationRunner.resetSkipAggregators();
> +
> + GraphJobMessage msg = null;
> + while ((msg = peer.getCurrentMessage()) != null) {
> + peer.send(peer.getPeerName(), msg);
> + }
> +
> + // now sync
> + peer.sync();
> + // now the map message must be read that might be send from the master
> + updated = getAggregationRunner().receiveAggregatedValues(
> + peer.getCurrentMessage().getMap(), iteration);
> + // set the first vertex message back to the message it had before sync
> + firstVertexMessage = peer.getCurrentMessage();
> return firstVertexMessage;
> }
>
> @@ -274,7 +259,6 @@ public final class GraphJobRunner<V exte
> }
>
> if (!vertex.isHalted()) {
> - M lastValue = vertex.getValue();
> if (iterable == null) {
> vertex.compute(Collections.<M> emptyList());
> } else {
> @@ -286,7 +270,6 @@ public final class GraphJobRunner<V exte
> }
> currentMessage = iterable.getOverflowMessage();
> }
> - getAggregationRunner().aggregateVertex(lastValue, vertex);
> activeVertices++;
> }
>
> @@ -296,8 +279,7 @@ public final class GraphJobRunner<V exte
> }
> vertices.finishSuperstep();
>
> - getAggregationRunner().sendAggregatorValues(peer, activeVertices,
> - this.changedVertexCnt);
> + sendControllValues(activeVertices, this.changedVertexCnt);
> iteration++;
> }
>
> @@ -357,15 +339,13 @@ public final class GraphJobRunner<V exte
> while (skippingIterator.hasNext()) {
> Vertex<V, E, M> vertex = skippingIterator.next();
>
> - M lastValue = vertex.getValue();
> // Calls setup method.
> vertex.setup(conf);
> vertex.compute(Collections.singleton(vertex.getValue()));
> - getAggregationRunner().aggregateVertex(lastValue, vertex);
> vertices.finishVertexComputation(vertex);
> }
> vertices.finishSuperstep();
> - getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
> + sendControllValues(1, this.changedVertexCnt);
> iteration++;
> }
>
> @@ -594,14 +574,10 @@ public final class GraphJobRunner<V exte
> } else {
> globalUpdateCounts += ((IntWritable) e.getValue()).get();
> }
> - } else if (getAggregationRunner().isEnabled()
> - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
> - getAggregationRunner().masterReadAggregatedValue(vertexID,
> - (M) e.getValue());
> - } else if (getAggregationRunner().isEnabled()
> - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
> - getAggregationRunner().masterReadAggregatedIncrementalValue(
> - vertexID, (M) e.getValue());
> +
> + } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
> + this.getAggregationRunner().masterAggregation(vertexID,
> + e.getValue());
> } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
> dynamicAdditions = true;
> addVertex((Vertex<V, E, M>) e.getValue());
> @@ -619,21 +595,11 @@ public final class GraphJobRunner<V exte
> "A message to increase vertex count is in a wrong place: "
> + peer);
> }
> - } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
> - if (isMasterTask(peer)) {
> - this.getAggregationRunner().addSkipAggregator(
> - ((IntWritable) e.getValue()).get());
> - } else {
> - throw new UnsupportedOperationException(
> - "A message to skip aggregators is in a wrong peer: " + peer);
> - }
> }
> }
> -
> } else {
> throw new UnsupportedOperationException("Unknown message type: " + msg);
> }
> -
> }
>
> // If we applied any changes to vertices, we need to call finishAdditions
> @@ -677,23 +643,20 @@ public final class GraphJobRunner<V exte
> }
>
> /**
> - * Gets the last aggregated value at the given index. The index is dependend
> - * on how the aggregators were configured during job setup phase.
> - *
> - * @return the value of the aggregator, or null if none was defined.
> - */
> - public final Writable getLastAggregatedValue(int index) {
> - return getAggregationRunner().getLastAggregatedValue(index);
> - }
> -
> - /**
> - * Gets the last aggregated number of vertices at the given index. The index
> - * is dependend on how the aggregators were configured during job setup phase.
> + * Runs internal aggregators and send their values to the master task.
> *
> - * @return the value of the aggregator, or null if none was defined.
> + * @param activeVertices number of active vertices in this peer
> + * @param changedVertexCnt number of added/removed vertices in a superstep
> */
> - public final IntWritable getNumLastAggregatedVertices(int index) {
> - return getAggregationRunner().getNumLastAggregatedVertices(index);
> + private void sendControllValues(int activeVertices, int changedVertexCnt)
> + throws IOException {
> + // send msgCounts to the master task
> + MapWritable updatedCnt = new MapWritable();
> + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
> + // send total number of vertices changes
> + updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
> + new LongWritable(changedVertexCnt));
> + peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
> }
>
> /**
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Thu Jan 9 01:10:59 2014
> @@ -82,6 +82,7 @@ public class OffHeapVerticesInfo<V exten
> vertices.dump();
> }
>
> + @Override
> public void addVertex(Vertex<V, E, M> vertex) {
> vertices.put(vertex.getVertexID(), vertex);
> }
> @@ -108,6 +109,7 @@ public class OffHeapVerticesInfo<V exten
> vertices.clear();
> }
>
> + @Override
> public int size() {
> return (int) this.vertices.entries();
> }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 9 01:10:59 2014
> @@ -27,8 +27,8 @@ import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
>
> -import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.MapWritable;
> +import org.apache.hadoop.io.Text;
> import org.apache.hadoop.io.Writable;
> import org.apache.hadoop.io.WritableComparable;
> import org.apache.hama.HamaConfiguration;
> @@ -75,7 +75,7 @@ public abstract class Vertex<V extends W
> @Override
> public void setup(HamaConfiguration conf) {
> }
> -
> +
> @Override
> public void sendMessage(Edge<V, E> e, M msg) throws IOException {
> runner.getPeer().send(getDestinationPeerName(e),
> @@ -120,24 +120,26 @@ public abstract class Vertex<V extends W
> private void alterVertexCounter(int i) throws IOException {
> this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
> }
> -
> +
> @Override
> - public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException {
> + public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
> + throws IOException {
> MapWritable msg = new MapWritable();
> // Create the new vertex.
> - Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
> + Vertex<V, E, M> vertex = GraphJobRunner
> + .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
> vertex.setEdges(edges);
> vertex.setValue(value);
> vertex.setVertexID(vertexID);
> -
> +
> msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
> // Find the proper partition to host the new vertex.
> - int partition = getPartitioner().getPartition(vertexID, value,
> + int partition = getPartitioner().getPartition(vertexID, value,
> runner.getPeer().getNumPeers());
> String destPeer = runner.getPeer().getAllPeerNames()[partition];
> -
> +
> runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> -
> +
> alterVertexCounter(1);
> }
>
> @@ -145,11 +147,11 @@ public abstract class Vertex<V extends W
> public void remove() throws IOException {
> MapWritable msg = new MapWritable();
> msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
> -
> +
> // Get master task peer.
> String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
> runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> -
> +
> alterVertexCounter(-1);
> }
>
> @@ -192,31 +194,6 @@ public abstract class Vertex<V extends W
> return runner.getMaxIteration();
> }
>
> - /**
> - * Get the last aggregated value of the defined aggregator, null if nothing
> - * was configured or not returned a result. You have to supply an index, the
> - * index is defined by the order you set the aggregator classes in
> - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
> - * so if you have a single aggregator you can retrieve it via
> - * {@link #getLastAggregatedValue}(0).
> - */
> - @SuppressWarnings("unchecked")
> - public M getLastAggregatedValue(int index) {
> - return (M) runner.getLastAggregatedValue(index);
> - }
> -
> - /**
> - * Get the number of aggregated vertices in the last superstep. Or null if no
> - * aggregator is available.You have to supply an index, the index is defined
> - * by the order you set the aggregator classes in
> - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
> - * so if you have a single aggregator you can retrieve it via
> - * {@link #getNumLastAggregatedVertices}(0).
> - */
> - public IntWritable getNumLastAggregatedVertices(int index) {
> - return runner.getNumLastAggregatedVertices(index);
> - }
> -
> public int getNumPeers() {
> return runner.getPeer().getNumPeers();
> }
> @@ -245,21 +222,6 @@ public abstract class Vertex<V extends W
> this.votedToHalt = true;
> }
>
> - /**
> - * Disable an aggregator for the next superstep. The returning value of
> - * the aggregator will be null.
> - */
> - public void skipAggregator(int index) throws IOException {
> - MapWritable msg = new MapWritable();
> - msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
> -
> - this.runner.getAggregationRunner().addSkipAggregator(index);
> -
> - // Get master task peer.
> - String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
> - runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> - }
> -
> void setActive() {
> this.votedToHalt = false;
> }
> @@ -314,7 +276,7 @@ public abstract class Vertex<V extends W
> }
> this.value.readFields(in);
> }
> -
> +
> this.edges = new ArrayList<Edge<V, E>>();
> if (in.readBoolean()) {
> int num = in.readInt();
> @@ -345,7 +307,7 @@ public abstract class Vertex<V extends W
> out.writeBoolean(true);
> vertexID.write(out);
> }
> -
> +
> if (value == null) {
> out.writeBoolean(false);
> } else {
> @@ -399,6 +361,30 @@ public abstract class Vertex<V extends W
>
> }
>
> + /**
> + * Provides a value to the specified aggregator.
> + *
> + * @throws IOException
> + *
> + * @param name identifies an aggregator
> + * @param value value to be aggregated
> + */
> + @Override
> + public void aggregate(String name, M value) throws IOException {
> + MapWritable msg = new MapWritable();
> + msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
> + value);
> +
> + // Get master task peer.
> + String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
> + runner.getPeer().send(destPeer, new GraphJobMessage(msg));
> + }
> +
> + @Override
> + public Writable getAggregatedValue(String name) {
> + return this.runner.getAggregationRunner().getAggregatedValue(name);
> + }
> +
> protected void setRunner(GraphJobRunner<V, E, M> runner) {
> this.runner = runner;
> }
>
> Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
> +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Thu Jan 9 01:10:59 2014
> @@ -112,4 +112,19 @@ public interface VertexInterface<V exten
> */
> public M getValue();
>
> + /**
> + * Provides a value to the specified aggregator.
> + *
> + * @throws IOException
> + *
> + * @param name identifies a aggregator
> + * @param value value to be aggregated
> + */
> + public void aggregate(String name, M value) throws IOException;
> +
> + /**
> + * Returns the value of the specified aggregator.
> + */
> + public Writable getAggregatedValue(String name);
> +
> }
>
> Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
> +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Jan 9 01:10:59 2014
> @@ -44,7 +44,7 @@ import org.junit.Before;
> public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
>
> String[] input = new String[] { "stackoverflow.com\tyahoo.com",
> - "facebook.com\ttwitter.com",
> + "facebook.com\ttwitter.com",
> "facebook.com\tgoogle.com\tnasa.gov",
> "yahoo.com\tnasa.gov\tstackoverflow.com",
> "twitter.com\tgoogle.com\tfacebook.com",
> @@ -56,6 +56,7 @@ public class TestSubmitGraphJob extends
> @SuppressWarnings("rawtypes")
> private static final List<Class<? extends VerticesInfo>> vi = new ArrayList<Class<? extends VerticesInfo>>();
>
> + @Override
> @Before
> public void setUp() throws Exception {
> super.setUp();
> @@ -84,7 +85,7 @@ public class TestSubmitGraphJob extends
> // set the defaults
> bsp.setMaxIteration(30);
>
> - bsp.setAggregatorClass(AverageAggregator.class);
> + bsp.registerAggregator("avg", AverageAggregator.class);
>
> bsp.setInputFormat(SequenceFileInputFormat.class);
> bsp.setInputKeyClass(Text.class);
>
> Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
> URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
> ==============================================================================
> --- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
> +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Thu Jan 9 01:10:59 2014
> @@ -43,7 +43,7 @@ public class PageRank {
>
> public static class PageRankVertex extends
> Vertex<Text, NullWritable, DoubleWritable> {
> -
> +
> static double DAMPING_FACTOR = 0.85;
> static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
>
> @@ -74,7 +74,7 @@ public class PageRank {
> }
>
> // if we have not reached our global error yet, then proceed.
> - DoubleWritable globalError = getLastAggregatedValue(0);
> + DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg");
> if (globalError != null && this.getSuperstepCount() > 2
> && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
> voteToHalt();
> @@ -84,6 +84,8 @@ public class PageRank {
> // in each superstep we are going to send a new rank to our neighbours
> sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
> / this.getEdges().size()));
> +
> + this.aggregate("avg", this.getValue());
> }
>
> }
> @@ -126,7 +128,7 @@ public class PageRank {
> }
>
> // error
> - pageJob.setAggregatorClass(AverageAggregator.class);
> + pageJob.registerAggregator("avg", AverageAggregator.class);
>
> // Vertex reader
> pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
>
>
--
Best Regards, Edward J. Yoon
@eddieyoon