You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/09 11:10:59 UTC

svn commit: r1371108 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/aggregators/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/gra...

Author: apresta
Date: Thu Aug  9 09:10:57 2012
New Revision: 1371108

URL: http://svn.apache.org/viewvc?rev=1371108&view=rev
Log:
GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)

Added:
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
    giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
Removed:
    giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
    giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
    giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
    giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
    giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java
    giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
    giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
    giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
    giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Aug  9 09:10:57 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
+
   GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of 
   vertices results in wide variance in RPC message sizes. (Eli Reisman via jghoman)
   

Added: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Abstract class for {@link Aggregator}.
+ * Implements get value, set value and reset methods and has internal value
+ * object.
+ *
+ * @param <A> Aggregated value
+ */
+public abstract class BasicAggregator<A extends Writable> implements
+    Aggregator<A> {
+  /** Internal value */
+  private A value;
+
+  /**
+   * Default constructor.
+   * Creates new value object and resets the aggregator
+   */
+  public BasicAggregator() {
+    value = createInitialValue();
+  }
+
+  @Override
+  public A getAggregatedValue() {
+    return value;
+  }
+
+  @Override
+  public void setAggregatedValue(A value) {
+    this.value = value;
+  }
+
+  @Override
+  public void reset() {
+    value = createInitialValue();
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanAndAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.BooleanWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating the AND function over boolean values.
  * The default value when nothing is aggregated is true.
  */
-public class BooleanAndAggregator implements Aggregator<BooleanWritable> {
-  /** Internal result */
-  private boolean result = true;
-
-  /**
-   * Aggregate with a primitive boolean.
-   *
-   * @param value Boolean value to aggregate.
-   */
-  public void aggregate(boolean value) {
-    result = result && value;
-  }
-
+public class BooleanAndAggregator extends BasicAggregator<BooleanWritable> {
   @Override
   public void aggregate(BooleanWritable value) {
-    result = result && value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive boolean.
-   *
-   * @param value Boolean value to set.
-   */
-  public void setAggregatedValue(boolean value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(BooleanWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public BooleanWritable getAggregatedValue() {
-    return new BooleanWritable(result);
+    getAggregatedValue().set(getAggregatedValue().get() && value.get());
   }
 
   @Override
-  public BooleanWritable createAggregatedValue() {
-    return new BooleanWritable();
+  public BooleanWritable createInitialValue() {
+    return new BooleanWritable(true);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOrAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.BooleanWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating the OR function over boolean values.
  * The default value when nothing is aggregated is false.
  */
-public class BooleanOrAggregator implements Aggregator<BooleanWritable> {
-  /** Internal result */
-  private boolean result = false;
-
-  /**
-   * Aggregate with a primitive boolean.
-   *
-   * @param value Boolean value to aggregate.
-   */
-  public void aggregate(boolean value) {
-    result = result || value;
-  }
-
+public class BooleanOrAggregator extends BasicAggregator<BooleanWritable> {
   @Override
   public void aggregate(BooleanWritable value) {
-    result = result || value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive boolean.
-   *
-   * @param value Boolean value to set.
-   */
-  public void setAggregatedValue(boolean value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(BooleanWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public BooleanWritable getAggregatedValue() {
-    return new BooleanWritable(result);
+    getAggregatedValue().set(getAggregatedValue().get() || value.get());
   }
 
   @Override
-  public BooleanWritable createAggregatedValue() {
-    return new BooleanWritable();
+  public BooleanWritable createInitialValue() {
+    return new BooleanWritable(false);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/BooleanOverwriteAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,8 +20,6 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.BooleanWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator that stores a value that is overwritten once another value is
  * aggregated. This aggregator is useful for one-to-many communication from
@@ -29,45 +27,15 @@ import org.apache.giraph.graph.Aggregato
  * to this aggregator, its behavior is non-deterministic. The default value
  * for this aggregator is false.
  */
-public class BooleanOverwriteAggregator implements Aggregator<BooleanWritable> {
-  /** Internal result */
-  private boolean result = false;
-
-  /**
-   * Aggregate with a primitive boolean.
-   *
-   * @param value Boolean value to aggregate.
-   */
-  public void aggregate(boolean value) {
-    result = value;
-  }
-
+public class BooleanOverwriteAggregator extends
+    BasicAggregator<BooleanWritable> {
   @Override
   public void aggregate(BooleanWritable value) {
-    result = value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive boolean.
-   *
-   * @param value Boolean value to set.
-   */
-  public void setAggregatedValue(boolean value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(BooleanWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public BooleanWritable getAggregatedValue() {
-    return new BooleanWritable(result);
+    getAggregatedValue().set(value.get());
   }
 
   @Override
-  public BooleanWritable createAggregatedValue() {
-    return new BooleanWritable();
+  public BooleanWritable createInitialValue() {
+    return new BooleanWritable(false);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMaxAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.DoubleWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting max double value.
- *
  */
-public class DoubleMaxAggregator implements Aggregator<DoubleWritable> {
-  /** Saved maximum value */
-  private double max = Double.MIN_VALUE;
-
-  /**
-   * Aggregate with a primitive double.
-   *
-   * @param value Double value to aggregate.
-   */
-  public void aggregate(double value) {
-    double val = value;
-    if (val > max) {
-      max = val;
-    }
-  }
-
+public class DoubleMaxAggregator extends BasicAggregator<DoubleWritable> {
   @Override
   public void aggregate(DoubleWritable value) {
-    double val = value.get();
-    if (val > max) {
-      max = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive double.
-   *
-   * @param value Double value to set.
-   */
-  public void setAggregatedValue(double value) {
-    max = value;
-  }
-
-  @Override
-  public void setAggregatedValue(DoubleWritable value) {
-    max = value.get();
-  }
-
-  @Override
-  public DoubleWritable getAggregatedValue() {
-    return new DoubleWritable(max);
+    getAggregatedValue().set(
+        Math.max(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public DoubleWritable createAggregatedValue() {
-    return new DoubleWritable();
+  public DoubleWritable createInitialValue() {
+    return new DoubleWritable(Double.MIN_VALUE);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleMinAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.DoubleWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting min double value.
  */
-public class DoubleMinAggregator implements Aggregator<DoubleWritable> {
-  /** Internal aggregator */
-  private double min = Double.MAX_VALUE;
-
-  /**
-   * Aggregate with a primitive double.
-   *
-   * @param value Double value to aggregate.
-   */
-  public void aggregate(double value) {
-    double val = value;
-    if (val < min) {
-      min = val;
-    }
-  }
-
+public class DoubleMinAggregator extends BasicAggregator<DoubleWritable> {
   @Override
   public void aggregate(DoubleWritable value) {
-    double val = value.get();
-    if (val < min) {
-      min = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive double.
-   *
-   * @param value Double value to set.
-   */
-  public void setAggregatedValue(double value) {
-    min = value;
+    getAggregatedValue().set(
+        Math.min(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public void setAggregatedValue(DoubleWritable value) {
-    min = value.get();
+  public DoubleWritable createInitialValue() {
+    return new DoubleWritable(Double.MAX_VALUE);
   }
-
-  @Override
-  public DoubleWritable getAggregatedValue() {
-    return new DoubleWritable(min);
-  }
-
-  @Override
-  public DoubleWritable createAggregatedValue() {
-    return new DoubleWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleOverwriteAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,53 +20,21 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.DoubleWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator that stores a value that is overwritten once another value is
  * aggregated. This aggregator is useful for one-to-many communication from
  * master.compute() or from a special vertex. In case multiple vertices write
  * to this aggregator, its behavior is non-deterministic.
  */
-public class DoubleOverwriteAggregator implements Aggregator<DoubleWritable> {
-  /** Internal result */
-  private double result = 0.0;
-
-  /**
-   * Aggregate with a primitive double.
-   *
-   * @param value Double value to aggregate.
-   */
-  public void aggregate(double value) {
-    result = value;
-  }
-
+public class DoubleOverwriteAggregator extends
+    BasicAggregator<DoubleWritable> {
   @Override
   public void aggregate(DoubleWritable value) {
-    result = value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive double.
-   *
-   * @param value Double value to set.
-   */
-  public void setAggregatedValue(double value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(DoubleWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public DoubleWritable getAggregatedValue() {
-    return new DoubleWritable(result);
+    getAggregatedValue().set(value.get());
   }
 
   @Override
-  public DoubleWritable createAggregatedValue() {
-    return new DoubleWritable();
+  public DoubleWritable createInitialValue() {
+    return new DoubleWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleProductAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.DoubleWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating products of double values.
  */
-public class DoubleProductAggregator implements Aggregator<DoubleWritable> {
-  /** Aggregated product */
-  private double product = 1.0;
-
-  /**
-   * Aggregate a primitive double.
-   *
-   * @param value Double value to aggregate.
-   */
-  public void aggregate(double value) {
-    product *= value;
-  }
-
+public class DoubleProductAggregator extends BasicAggregator<DoubleWritable> {
   @Override
   public void aggregate(DoubleWritable value) {
-    product *= value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive double.
-   *
-   * @param value Double value to set.
-   */
-  public void setAggregatedValue(double value) {
-    product = value;
+    getAggregatedValue().set(getAggregatedValue().get() * value.get());
   }
 
   @Override
-  public void setAggregatedValue(DoubleWritable value) {
-    product = value.get();
+  public DoubleWritable createInitialValue() {
+    return new DoubleWritable(1);
   }
-
-  @Override
-  public DoubleWritable getAggregatedValue() {
-    return new DoubleWritable(product);
-  }
-
-  @Override
-  public DoubleWritable createAggregatedValue() {
-    return new DoubleWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/DoubleSumAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,15 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.DoubleWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
-/**
- * Aggregator for summing up double values.
- */
-public class DoubleSumAggregator implements Aggregator<DoubleWritable> {
-  /** Aggregated sum */
-  private double sum = 0;
-
-  /**
-   * Aggregate a primitive double.
-   *
-   * @param value Double value to aggregate.
-   */
-  public void aggregate(double value) {
-    sum += value;
-  }
-
+/** Aggregator for summing up double values. */
+public class DoubleSumAggregator extends BasicAggregator<DoubleWritable> {
   @Override
   public void aggregate(DoubleWritable value) {
-    sum += value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive double.
-   *
-   * @param value Double value to set.
-   */
-  public void setAggregatedValue(double value) {
-    sum = value;
+    getAggregatedValue().set(getAggregatedValue().get() + value.get());
   }
 
   @Override
-  public void setAggregatedValue(DoubleWritable value) {
-    sum = value.get();
+  public DoubleWritable createInitialValue() {
+    return new DoubleWritable(0);
   }
-
-  @Override
-  public DoubleWritable getAggregatedValue() {
-    return new DoubleWritable(sum);
-  }
-
-  @Override
-  public DoubleWritable createAggregatedValue() {
-    return new DoubleWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMaxAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.FloatWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting max float value.
- *
  */
-public class FloatMaxAggregator implements Aggregator<FloatWritable> {
-  /** Saved maximum value */
-  private float max = Float.MIN_VALUE;
-
-  /**
-   * Aggregate with a primitive float.
-   *
-   * @param value Float value to aggregate.
-   */
-  public void aggregate(float value) {
-    float val = value;
-    if (val > max) {
-      max = val;
-    }
-  }
-
+public class FloatMaxAggregator extends BasicAggregator<FloatWritable> {
   @Override
   public void aggregate(FloatWritable value) {
-    float val = value.get();
-    if (val > max) {
-      max = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive float.
-   *
-   * @param value Float value to set.
-   */
-  public void setAggregatedValue(float value) {
-    max = value;
-  }
-
-  @Override
-  public void setAggregatedValue(FloatWritable value) {
-    max = value.get();
-  }
-
-  @Override
-  public FloatWritable getAggregatedValue() {
-    return new FloatWritable(max);
+    getAggregatedValue().set(
+        Math.max(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public FloatWritable createAggregatedValue() {
-    return new FloatWritable();
+  public FloatWritable createInitialValue() {
+    return new FloatWritable(Float.MIN_VALUE);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatMinAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.FloatWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting min float value.
  */
-public class FloatMinAggregator implements Aggregator<FloatWritable> {
-  /** Internal aggregator */
-  private float min = Float.MAX_VALUE;
-
-  /**
-   * Aggregate with a primitive float.
-   *
-   * @param value Float value to aggregate.
-   */
-  public void aggregate(float value) {
-    float val = value;
-    if (val < min) {
-      min = val;
-    }
-  }
-
+public class FloatMinAggregator extends BasicAggregator<FloatWritable> {
   @Override
   public void aggregate(FloatWritable value) {
-    float val = value.get();
-    if (val < min) {
-      min = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive float.
-   *
-   * @param value Float value to set.
-   */
-  public void setAggregatedValue(float value) {
-    min = value;
+    getAggregatedValue().set(
+        Math.min(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public void setAggregatedValue(FloatWritable value) {
-    min = value.get();
+  public FloatWritable createInitialValue() {
+    return new FloatWritable(Float.MAX_VALUE);
   }
-
-  @Override
-  public FloatWritable getAggregatedValue() {
-    return new FloatWritable(min);
-  }
-
-  @Override
-  public FloatWritable createAggregatedValue() {
-    return new FloatWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatOverwriteAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.FloatWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator that stores a value that is overwritten once another value is
  * aggregated. This aggregator is useful for one-to-many communication from
  * master.compute() or from a special vertex. In case multiple vertices write
  * to this aggregator, its behavior is non-deterministic.
  */
-public class FloatOverwriteAggregator implements Aggregator<FloatWritable> {
-  /** Internal result */
-  private float result = 0.0f;
-
-  /**
-   * Aggregate with a primitive float.
-   *
-   * @param value Float value to aggregate.
-   */
-  public void aggregate(float value) {
-    result = value;
-  }
-
+public class FloatOverwriteAggregator extends BasicAggregator<FloatWritable> {
   @Override
   public void aggregate(FloatWritable value) {
-    result = value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive float.
-   *
-   * @param value Float value to set.
-   */
-  public void setAggregatedValue(float value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(FloatWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public FloatWritable getAggregatedValue() {
-    return new FloatWritable(result);
+    getAggregatedValue().set(value.get());
   }
 
   @Override
-  public FloatWritable createAggregatedValue() {
-    return new FloatWritable();
+  public FloatWritable createInitialValue() {
+    return new FloatWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatProductAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.FloatWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating products of float values.
  */
-public class FloatProductAggregator implements Aggregator<FloatWritable> {
-  /** Aggregated product */
-  private float product = 1.0f;
-
-  /**
-   * Aggregate a primitive float.
-   *
-   * @param value Float value to aggregate.
-   */
-  public void aggregate(float value) {
-    product *= value;
-  }
-
+public class FloatProductAggregator extends BasicAggregator<FloatWritable> {
   @Override
   public void aggregate(FloatWritable value) {
-    product *= value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive float.
-   *
-   * @param value Float value to set.
-   */
-  public void setAggregatedValue(float value) {
-    product = value;
+    getAggregatedValue().set(getAggregatedValue().get() * value.get());
   }
 
   @Override
-  public void setAggregatedValue(FloatWritable value) {
-    product = value.get();
+  public FloatWritable createInitialValue() {
+    return new FloatWritable(1);
   }
-
-  @Override
-  public FloatWritable getAggregatedValue() {
-    return new FloatWritable(product);
-  }
-
-  @Override
-  public FloatWritable createAggregatedValue() {
-    return new FloatWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/FloatSumAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,51 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.FloatWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for summing up float values.
  */
-public class FloatSumAggregator implements Aggregator<FloatWritable> {
-  /** Aggregated sum */
-  private float sum = 0;
-
-  /**
-   * Aggregate a primitive float.
-   *
-   * @param value Float value to aggregate.
-   */
-  public void aggregate(float value) {
-    sum += value;
-  }
-
+public class FloatSumAggregator extends BasicAggregator<FloatWritable> {
   @Override
   public void aggregate(FloatWritable value) {
-    sum += value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive float.
-   *
-   * @param value Float value to set.
-   */
-  public void setAggregatedValue(float value) {
-    sum = value;
+    getAggregatedValue().set(getAggregatedValue().get() + value.get());
   }
 
   @Override
-  public void setAggregatedValue(FloatWritable value) {
-    sum = value.get();
+  public FloatWritable createInitialValue() {
+    return new FloatWritable(0);
   }
-
-  @Override
-  public FloatWritable getAggregatedValue() {
-    return new FloatWritable(sum);
-  }
-
-  @Override
-  public FloatWritable createAggregatedValue() {
-    return new FloatWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMaxAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.IntWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting max integer value.
- *
  */
-public class IntMaxAggregator implements Aggregator<IntWritable> {
-  /** Saved maximum value */
-  private int max = Integer.MIN_VALUE;
-
-  /**
-   * Aggregate with a primitive integer.
-   *
-   * @param value Integer value to aggregate.
-   */
-  public void aggregate(int value) {
-    int val = value;
-    if (val > max) {
-      max = val;
-    }
-  }
-
+public class IntMaxAggregator extends BasicAggregator<IntWritable> {
   @Override
   public void aggregate(IntWritable value) {
-    int val = value.get();
-    if (val > max) {
-      max = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive integer.
-   *
-   * @param value Integer value to set.
-   */
-  public void setAggregatedValue(int value) {
-    max = value;
-  }
-
-  @Override
-  public void setAggregatedValue(IntWritable value) {
-    max = value.get();
-  }
-
-  @Override
-  public IntWritable getAggregatedValue() {
-    return new IntWritable(max);
+    getAggregatedValue().set(
+        Math.max(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public IntWritable createAggregatedValue() {
-    return new IntWritable();
+  public IntWritable createInitialValue() {
+    return new IntWritable(Integer.MIN_VALUE);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntMinAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.IntWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting min integer value.
  */
-public class IntMinAggregator implements Aggregator<IntWritable> {
-  /** Internal aggregator */
-  private int min = Integer.MAX_VALUE;
-
-  /**
-   * Aggregate with a primitive integer.
-   *
-   * @param value Integer value to aggregate.
-   */
-  public void aggregate(int value) {
-    int val = value;
-    if (val < min) {
-      min = val;
-    }
-  }
-
+public class IntMinAggregator extends BasicAggregator<IntWritable> {
   @Override
   public void aggregate(IntWritable value) {
-    int val = value.get();
-    if (val < min) {
-      min = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive integer.
-   *
-   * @param value Integer value to set.
-   */
-  public void setAggregatedValue(int value) {
-    min = value;
+    getAggregatedValue().set(
+        Math.min(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public void setAggregatedValue(IntWritable value) {
-    min = value.get();
+  public IntWritable createInitialValue() {
+    return new IntWritable(Integer.MAX_VALUE);
   }
-
-  @Override
-  public IntWritable getAggregatedValue() {
-    return new IntWritable(min);
-  }
-
-  @Override
-  public IntWritable createAggregatedValue() {
-    return new IntWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntOverwriteAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.IntWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator that stores a value that is overwritten once another value is
  * aggregated. This aggregator is useful for one-to-many communication from
  * master.compute() or from a special vertex. In case multiple vertices write
  * to this aggregator, its behavior is non-deterministic.
  */
-public class IntOverwriteAggregator implements Aggregator<IntWritable> {
-  /** Internal result */
-  private int result = 0;
-
-  /**
-   * Aggregate with a primitive integer.
-   *
-   * @param value Integer value to aggregate.
-   */
-  public void aggregate(int value) {
-    result = value;
-  }
-
+public class IntOverwriteAggregator extends BasicAggregator<IntWritable> {
   @Override
   public void aggregate(IntWritable value) {
-    result = value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive integer.
-   *
-   * @param value Integer value to set.
-   */
-  public void setAggregatedValue(int value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(IntWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public IntWritable getAggregatedValue() {
-    return new IntWritable(result);
+    getAggregatedValue().set(value.get());
   }
 
   @Override
-  public IntWritable createAggregatedValue() {
-    return new IntWritable();
+  public IntWritable createInitialValue() {
+    return new IntWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntProductAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.IntWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating products of long and integer values.
  */
-public class IntProductAggregator implements Aggregator<IntWritable> {
-  /** Internal product */
-  private int product = 1;
-
-  /**
-   * Aggregate a primitive integer.
-   *
-   * @param value Integer value to aggregate.
-   */
-  public void aggregate(int value) {
-    product *= value;
-  }
-
+public class IntProductAggregator extends BasicAggregator<IntWritable> {
   @Override
   public void aggregate(IntWritable value) {
-    product *= value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive integer.
-   *
-   * @param value Integer value to set.
-   */
-  public void setAggregatedValue(int value) {
-    product = value;
-  }
-
-  @Override
-  public void setAggregatedValue(IntWritable value) {
-    product = value.get();
-  }
-
-  @Override
-  public IntWritable getAggregatedValue() {
-    return new IntWritable(product);
+    getAggregatedValue().set(getAggregatedValue().get() * value.get());
   }
 
   @Override
-  public IntWritable createAggregatedValue() {
-    return new IntWritable();
+  public IntWritable createInitialValue() {
+    return new IntWritable(1);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/IntSumAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.IntWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for summing up integer values.
  */
-public class IntSumAggregator implements Aggregator<IntWritable> {
-  /** Internal sum */
-  private int sum = 0;
-
-  /**
-   * Aggregate a primitive integer.
-   *
-   * @param value Integer value to aggregate.
-   */
-  public void aggregate(int value) {
-    sum += value;
-  }
-
+public class IntSumAggregator extends BasicAggregator<IntWritable> {
   @Override
   public void aggregate(IntWritable value) {
-    sum += value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive integer.
-   *
-   * @param value Integer value to set.
-   */
-  public void setAggregatedValue(int value) {
-    sum = value;
-  }
-
-  @Override
-  public void setAggregatedValue(IntWritable value) {
-    sum = value.get();
-  }
-
-  @Override
-  public IntWritable getAggregatedValue() {
-    return new IntWritable(sum);
+    getAggregatedValue().set(getAggregatedValue().get() + value.get());
   }
 
   @Override
-  public IntWritable createAggregatedValue() {
-    return new IntWritable();
+  public IntWritable createInitialValue() {
+    return new IntWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMaxAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting max long value.
- *
  */
-public class LongMaxAggregator implements Aggregator<LongWritable> {
-  /** Saved maximum value */
-  private long max = Long.MIN_VALUE;
-
-  /**
-   * Aggregate with a primitive long.
-   *
-   * @param value Long value to aggregate.
-   */
-  public void aggregate(long value) {
-    long val = value;
-    if (val > max) {
-      max = val;
-    }
-  }
-
+public class LongMaxAggregator extends BasicAggregator<LongWritable> {
   @Override
   public void aggregate(LongWritable value) {
-    long val = value.get();
-    if (val > max) {
-      max = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive long.
-   *
-   * @param value Long value to set.
-   */
-  public void setAggregatedValue(long value) {
-    max = value;
-  }
-
-  @Override
-  public void setAggregatedValue(LongWritable value) {
-    max = value.get();
-  }
-
-  @Override
-  public LongWritable getAggregatedValue() {
-    return new LongWritable(max);
+    getAggregatedValue().set(
+        Math.max(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public LongWritable createAggregatedValue() {
-    return new LongWritable();
+  public LongWritable createInitialValue() {
+    return new LongWritable(Long.MIN_VALUE);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongMinAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,57 +20,18 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for getting min long value.
  */
-public class LongMinAggregator implements Aggregator<LongWritable> {
-  /** Internal aggregator */
-  private long min = Long.MAX_VALUE;
-
-  /**
-   * Aggregate with a primitive long.
-   *
-   * @param value Long value to aggregate.
-   */
-  public void aggregate(long value) {
-    long val = value;
-    if (val < min) {
-      min = val;
-    }
-  }
-
+public class LongMinAggregator extends BasicAggregator<LongWritable> {
   @Override
   public void aggregate(LongWritable value) {
-    long val = value.get();
-    if (val < min) {
-      min = val;
-    }
-  }
-
-  /**
-   * Set aggregated value using a primitive long.
-   *
-   * @param value Long value to set.
-   */
-  public void setAggregatedValue(long value) {
-    min = value;
+    getAggregatedValue().set(
+        Math.min(getAggregatedValue().get(), value.get()));
   }
 
   @Override
-  public void setAggregatedValue(LongWritable value) {
-    min = value.get();
+  public LongWritable createInitialValue() {
+    return new LongWritable(Long.MAX_VALUE);
   }
-
-  @Override
-  public LongWritable getAggregatedValue() {
-    return new LongWritable(min);
-  }
-
-  @Override
-  public LongWritable createAggregatedValue() {
-    return new LongWritable();
-  }
-
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongOverwriteAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,53 +20,20 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator that stores a value that is overwritten once another value is
  * aggregated. This aggregator is useful for one-to-many communication from
  * master.compute() or from a special vertex. In case multiple vertices write
  * to this aggregator, its behavior is non-deterministic.
  */
-public class LongOverwriteAggregator implements Aggregator<LongWritable> {
-  /** Internal result */
-  private long result = 0L;
-
-  /**
-   * Aggregate with a primitive long.
-   *
-   * @param value Long value to aggregate.
-   */
-  public void aggregate(long value) {
-    result = value;
-  }
-
+public class LongOverwriteAggregator extends BasicAggregator<LongWritable> {
   @Override
   public void aggregate(LongWritable value) {
-    result = value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive long.
-   *
-   * @param value Long value to set.
-   */
-  public void setAggregatedValue(long value) {
-    result = value;
-  }
-
-  @Override
-  public void setAggregatedValue(LongWritable value) {
-    result = value.get();
-  }
-
-  @Override
-  public LongWritable getAggregatedValue() {
-    return new LongWritable(result);
+    getAggregatedValue().set(value.get());
   }
 
   @Override
-  public LongWritable createAggregatedValue() {
-    return new LongWritable();
+  public LongWritable createInitialValue() {
+    return new LongWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongProductAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for calculating products of long values.
  */
-public class LongProductAggregator implements Aggregator<LongWritable> {
-  /** Internal product */
-  private long product = 1L;
-
-  /**
-   * Aggregate a primitive long.
-   *
-   * @param value Long value to aggregate.
-   */
-  public void aggregate(long value) {
-    product *= value;
-  }
-
+public class LongProductAggregator extends BasicAggregator<LongWritable> {
   @Override
   public void aggregate(LongWritable value) {
-    product *= value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive long.
-   *
-   * @param value Long value to set.
-   */
-  public void setAggregatedValue(long value) {
-    product = value;
-  }
-
-  @Override
-  public void setAggregatedValue(LongWritable value) {
-    product = value.get();
-  }
-
-  @Override
-  public LongWritable getAggregatedValue() {
-    return new LongWritable(product);
+    getAggregatedValue().set(getAggregatedValue().get() * value.get());
   }
 
   @Override
-  public LongWritable createAggregatedValue() {
-    return new LongWritable();
+  public LongWritable createInitialValue() {
+    return new LongWritable(1);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/aggregators/LongSumAggregator.java Thu Aug  9 09:10:57 2012
@@ -20,50 +20,17 @@ package org.apache.giraph.aggregators;
 
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.Aggregator;
-
 /**
  * Aggregator for summing up long values.
  */
-public class LongSumAggregator implements Aggregator<LongWritable> {
-  /** Internal sum */
-  private long sum = 0;
-
-  /**
-   * Aggregate a primitive long.
-   *
-   * @param value Long value to aggregate.
-   */
-  public void aggregate(long value) {
-    sum += value;
-  }
-
+public class LongSumAggregator extends BasicAggregator<LongWritable> {
   @Override
   public void aggregate(LongWritable value) {
-    sum += value.get();
-  }
-
-  /**
-   * Set aggregated value using a primitive long.
-   *
-   * @param value Long value to set.
-   */
-  public void setAggregatedValue(long value) {
-    sum = value;
-  }
-
-  @Override
-  public void setAggregatedValue(LongWritable value) {
-    sum = value.get();
-  }
-
-  @Override
-  public LongWritable getAggregatedValue() {
-    return new LongWritable(sum);
+    getAggregatedValue().set(getAggregatedValue().get() + value.get());
   }
 
   @Override
-  public LongWritable createAggregatedValue() {
-    return new LongWritable();
+  public LongWritable createInitialValue() {
+    return new LongWritable(0);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Aug  9 09:10:57 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.WorkerContext;
@@ -115,26 +116,17 @@ public class RandomMessageBenchmark impl
               DEFAULT_NUM_MESSAGES_PER_EDGE);
       numSupersteps = getContext().getConfiguration().
           getInt(SUPERSTEP_COUNT, -1);
-      registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
-          LongSumAggregator.class);
-      registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
-          LongSumAggregator.class);
-      registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
-          LongSumAggregator.class);
-      registerAggregator(WORKERS,
-          LongSumAggregator.class);
     }
 
     @Override
     public void preSuperstep() {
-      LongSumAggregator superstepBytesAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-      LongSumAggregator superstepMessagesAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
-      LongSumAggregator superstepMillisAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
-      LongSumAggregator workersAggregator =
-          (LongSumAggregator) getAggregator(WORKERS);
+      long superstepBytes = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
+      long superstepMessages = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
+      long superstepMillis = this.<LongWritable>
+          getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
+      long workers = this.<LongWritable>getAggregatedValue(WORKERS).get();
 
       // For timing and tracking the supersteps
       // - superstep 0 starts the time, but cannot display any stats
@@ -143,41 +135,26 @@ public class RandomMessageBenchmark impl
       if (getSuperstep() == 0) {
         startSuperstepMillis = System.currentTimeMillis();
       } else {
-        totalBytes +=
-            superstepBytesAggregator.getAggregatedValue().get();
-        totalMessages +=
-            superstepMessagesAggregator.getAggregatedValue().get();
-        totalMillis +=
-            superstepMillisAggregator.getAggregatedValue().get();
+        totalBytes += superstepBytes;
+        totalMessages += superstepMessages;
+        totalMillis += superstepMillis;
         double superstepMegabytesPerSecond =
-            superstepBytesAggregator.getAggregatedValue().get() *
-            workersAggregator.getAggregatedValue().get() *
-            1000d / 1024d / 1024d /
-            superstepMillisAggregator.getAggregatedValue().get();
+            superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
         double megabytesPerSecond = totalBytes *
-            workersAggregator.getAggregatedValue().get() *
-            1000d / 1024d / 1024d / totalMillis;
+            workers * 1000d / 1024d / 1024d / totalMillis;
         double superstepMessagesPerSecond =
-            superstepMessagesAggregator.getAggregatedValue().get() *
-            workersAggregator.getAggregatedValue().get() * 1000d /
-            superstepMillisAggregator.getAggregatedValue().get();
-        double messagesPerSecond = totalMessages *
-            workersAggregator.getAggregatedValue().get() * 1000d /
-            totalMillis;
+            superstepMessages * workers * 1000d / superstepMillis;
+        double messagesPerSecond =
+            totalMessages * workers * 1000d / totalMillis;
         if (LOG.isInfoEnabled()) {
-          LOG.info("Outputing statistics for superstep " +
-              getSuperstep());
-          LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
-              superstepBytesAggregator.getAggregatedValue());
+          LOG.info("Outputing statistics for superstep " + getSuperstep());
+          LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
           LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
-          LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
-              superstepMessagesAggregator.getAggregatedValue());
+          LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
           LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
-          LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
-              superstepMillisAggregator.getAggregatedValue());
+          LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
           LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
-          LOG.info(WORKERS + " : " +
-              workersAggregator.getAggregatedValue());
+          LOG.info(WORKERS + " : " + workers);
           LOG.info("Superstep megabytes / second = " +
               superstepMegabytesPerSecond);
           LOG.info("Total megabytes / second = " +
@@ -187,41 +164,25 @@ public class RandomMessageBenchmark impl
           LOG.info("Total messages / second = " +
               messagesPerSecond);
           LOG.info("Superstep megabytes / second / worker = " +
-              superstepMegabytesPerSecond /
-              workersAggregator.getAggregatedValue().get());
+              superstepMegabytesPerSecond / workers);
           LOG.info("Total megabytes / second / worker = " +
-              megabytesPerSecond /
-              workersAggregator.getAggregatedValue().get());
+              megabytesPerSecond / workers);
           LOG.info("Superstep messages / second / worker = " +
-              superstepMessagesPerSecond /
-              workersAggregator.getAggregatedValue().get());
+              superstepMessagesPerSecond / workers);
           LOG.info("Total messages / second / worker = " +
-              messagesPerSecond /
-              workersAggregator.getAggregatedValue().get());
+              messagesPerSecond / workers);
         }
       }
 
-      superstepBytesAggregator.setAggregatedValue(
-          new LongWritable(0L));
-      superstepMessagesAggregator.setAggregatedValue(
-          new LongWritable(0L));
-      workersAggregator.setAggregatedValue(
-          new LongWritable(1L));
-      useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-      useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
-      useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
-      useAggregator(WORKERS);
+      aggregate(WORKERS, new LongWritable(1));
     }
 
     @Override
     public void postSuperstep() {
-      LongSumAggregator superstepMillisAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
       long endSuperstepMillis = System.currentTimeMillis();
       long superstepMillis = endSuperstepMillis - startSuperstepMillis;
       startSuperstepMillis = endSuperstepMillis;
-      superstepMillisAggregator.setAggregatedValue(
-          new LongWritable(superstepMillis));
+      aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
     }
 
     @Override
@@ -263,6 +224,26 @@ public class RandomMessageBenchmark impl
   }
 
   /**
+   * Master compute associated with {@link RandomMessageBenchmark}.
+   * It registers required aggregators.
+   */
+  public static class RandomMessageBenchmarkMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+          LongSumAggregator.class);
+      registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+          LongSumAggregator.class);
+      registerAggregator(WORKERS,
+          LongSumAggregator.class);
+    }
+  }
+
+  /**
    * Actual message computation (messaging in this case)
    */
   public static class RandomMessageVertex extends EdgeListVertex<
@@ -271,10 +252,6 @@ public class RandomMessageBenchmark impl
     public void compute(Iterable<BytesWritable> messages) {
       RandomMessageBenchmarkWorkerContext workerContext =
           (RandomMessageBenchmarkWorkerContext) getWorkerContext();
-      LongSumAggregator superstepBytesAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
-      LongSumAggregator superstepMessagesAggregator =
-          (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
       if (getSuperstep() < workerContext.getNumSupersteps()) {
         for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
           workerContext.randomizeMessageBytes();
@@ -282,8 +259,9 @@ public class RandomMessageBenchmark impl
               new BytesWritable(workerContext.getMessageBytes()));
           long bytesSent = workerContext.getMessageBytes().length *
               getNumEdges();
-          superstepBytesAggregator.aggregate(bytesSent);
-          superstepMessagesAggregator.aggregate(getNumEdges());
+          aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
+          aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
+              new LongWritable(getNumEdges()));
         }
       } else {
         voteToHalt();
@@ -377,6 +355,7 @@ public class RandomMessageBenchmark impl
     job.setVertexClass(RandomMessageVertex.class);
     job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
+    job.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
     job.setWorkerConfiguration(workers, workers, 100.0f);
     job.getConfiguration().setLong(
         PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Aug  9 09:10:57 2012
@@ -20,7 +20,7 @@ package org.apache.giraph.bsp;
 
 import java.io.IOException;
 
-import org.apache.giraph.graph.AggregatorUsage;
+import org.apache.giraph.graph.MasterAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
@@ -35,9 +35,9 @@ import org.apache.zookeeper.KeeperExcept
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface CentralizedServiceMaster<
-  I extends WritableComparable, V extends Writable, E extends Writable,
-  M extends Writable> extends CentralizedService<I, V, E, M>, AggregatorUsage {
+public interface CentralizedServiceMaster<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    CentralizedService<I, V, E, M>, MasterAggregatorUsage {
   /**
    * Become the master.
    * @return true if became the master, false if the application is done.

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Aug  9 09:10:57 2012
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.graph.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import org.apache.giraph.graph.AggregatorUsage;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.GraphMapper;
 import org.apache.giraph.graph.partition.Partition;
@@ -48,7 +48,7 @@ import org.apache.giraph.graph.WorkerCon
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceWorker<I extends WritableComparable,
   V extends Writable, E extends Writable, M extends Writable>
-  extends CentralizedService<I, V, E, M>, AggregatorUsage {
+  extends CentralizedService<I, V, E, M>, WorkerAggregatorUsage {
   /**
    * Get the worker information
    *

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/** Vertex which uses aggrergators. To be used for testing. */
+public class AggregatorsTestVertex extends
+    EdgeListVertex<LongWritable, DoubleWritable, FloatWritable,
+        DoubleWritable> {
+
+  /** Name of regular aggregator */
+  private static final String REGULAR_AGG = "regular";
+  /** Name of persistent aggregator */
+  private static final String PERSISTENT_AGG = "persistent";
+  /** Name of master overwriting aggregator */
+  private static final String MASTER_WRITE_AGG = "master";
+  /** Value which master compute will use */
+  private static final long MASTER_VALUE = 12345;
+
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    long superstep = getSuperstep();
+
+    LongWritable myValue = new LongWritable(1L << superstep);
+    aggregate(REGULAR_AGG, myValue);
+    aggregate(PERSISTENT_AGG, myValue);
+
+    long nv = getTotalNumVertices();
+    if (superstep > 0) {
+      assertEquals(nv * (1L << (superstep - 1)),
+          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+    } else {
+      assertEquals(0,
+          ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+    }
+    assertEquals(nv * ((1L << superstep) - 1),
+        ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+    assertEquals(MASTER_VALUE * (1L << superstep),
+        ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
+
+    if (getSuperstep() == 10) {
+      voteToHalt();
+    }
+  }
+
+  /** Master compute which uses aggregators. To be used for testing. */
+  public static class AggregatorsTestMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void compute() {
+      long superstep = getSuperstep();
+
+      LongWritable myValue =
+          new LongWritable(MASTER_VALUE * (1L << superstep));
+      setAggregatedValue(MASTER_WRITE_AGG, myValue);
+
+      long nv = getTotalNumVertices();
+      if (superstep > 0) {
+        assertEquals(nv * (1L << (superstep - 1)),
+            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+      } else {
+        assertEquals(0,
+            ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+      }
+      assertEquals(nv * ((1L << superstep) - 1),
+          ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(REGULAR_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(PERSISTENT_AGG,
+          LongSumAggregator.class);
+      registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+    }
+  }
+
+  /**
+   * Throws exception if values are not equal.
+   *
+   * @param expected Expected value
+   * @param actual   Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Thu Aug  9 09:10:57 2012
@@ -19,10 +19,8 @@
 package org.apache.giraph.examples;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.giraph.graph.Aggregator;
 import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,11 +55,11 @@ public class SimpleAggregatorWriter impl
   }
 
   @Override
-  public void writeAggregator(Map<String, Aggregator<Writable>> map,
+  public void writeAggregator(
+      Iterable<Entry<String, Writable>> aggregatorMap,
       long superstep) throws IOException {
-
-    for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
-      aggregator.getValue().getAggregatedValue().write(output);
+    for (Entry<String, Writable> entry : aggregatorMap) {
+      entry.getValue().write(output);
     }
     output.flush();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Aug  9 09:10:57 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
@@ -67,9 +68,6 @@ public class SimpleCheckpointVertex exte
     SimpleCheckpointVertexWorkerContext workerContext =
         (SimpleCheckpointVertexWorkerContext) getWorkerContext();
 
-    LongSumAggregator sumAggregator = (LongSumAggregator)
-        getAggregator(LongSumAggregator.class.getName());
-
     boolean enableFault = workerContext.getEnableFault();
     int supersteps = workerContext.getSupersteps();
 
@@ -86,10 +84,12 @@ public class SimpleCheckpointVertex exte
       voteToHalt();
       return;
     }
-    LOG.info("compute: " + sumAggregator);
-    sumAggregator.aggregate(getId().get());
-    LOG.info("compute: sum = " +
-        sumAggregator.getAggregatedValue().get() +
+    long sumAgg = this.<LongWritable>getAggregatedValue(
+        LongSumAggregator.class.getName()).get();
+    LOG.info("compute: " + sumAgg);
+    aggregate(LongSumAggregator.class.getName(),
+        new LongWritable(getId().get()));
+    LOG.info("compute: sum = " + sumAgg +
         " for vertex " + getId());
     float msgValue = 0.0f;
     for (FloatWritable message : messages) {
@@ -139,11 +139,6 @@ public class SimpleCheckpointVertex exte
     @Override
     public void preApplication()
       throws InstantiationException, IllegalAccessException {
-      registerAggregator(LongSumAggregator.class.getName(),
-          LongSumAggregator.class);
-      LongSumAggregator sumAggregator = (LongSumAggregator)
-          getAggregator(LongSumAggregator.class.getName());
-      sumAggregator.setAggregatedValue(0);
       supersteps = getContext().getConfiguration()
           .getInt(SUPERSTEP_COUNT, supersteps);
       enableFault = getContext().getConfiguration()
@@ -152,15 +147,13 @@ public class SimpleCheckpointVertex exte
 
     @Override
     public void postApplication() {
-      LongSumAggregator sumAggregator = (LongSumAggregator)
-          getAggregator(LongSumAggregator.class.getName());
-      FINAL_SUM = sumAggregator.getAggregatedValue().get();
+      FINAL_SUM = this.<LongWritable>getAggregatedValue(
+          LongSumAggregator.class.getName()).get();
       LOG.info("FINAL_SUM=" + FINAL_SUM);
     }
 
     @Override
     public void preSuperstep() {
-      useAggregator(LongSumAggregator.class.getName());
     }
 
     @Override
@@ -221,6 +214,7 @@ public class SimpleCheckpointVertex exte
     bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
     bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
     bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+    bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
     bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
@@ -243,6 +237,20 @@ public class SimpleCheckpointVertex exte
   }
 
   /**
+   * Master compute associated with {@link SimpleCheckpointVertex}.
+   * It registers required aggregators.
+   */
+  public static class SimpleCheckpointVertexMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(LongSumAggregator.class.getName(),
+          LongSumAggregator.class);
+    }
+  }
+
+  /**
    * Executable from the command line.
    *
    * @param args Command line args.

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Thu Aug  9 09:10:57 2012
@@ -19,16 +19,12 @@
 package org.apache.giraph.examples;
 
 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
-import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.log4j.Logger;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 /**
  * Demonstrates a computation with a centralized part implemented via a
  * MasterCompute.
@@ -42,10 +38,8 @@ public class SimpleMasterComputeVertex e
 
   @Override
   public void compute(Iterable<DoubleWritable> messages) {
-    DoubleOverwriteAggregator agg =
-        (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
     double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
-    double newValue = agg.getAggregatedValue().get();
+    double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
     double newSum = oldSum + newValue;
     setValue(new DoubleWritable(newSum));
     SimpleMasterComputeWorkerContext workerContext =
@@ -65,12 +59,10 @@ public class SimpleMasterComputeVertex e
     @Override
     public void preApplication()
       throws InstantiationException, IllegalAccessException {
-      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
     }
 
     @Override
     public void preSuperstep() {
-      useAggregator(SMC_AGG);
     }
 
     @Override
@@ -94,20 +86,11 @@ public class SimpleMasterComputeVertex e
    * MasterCompute used with {@link SimpleMasterComputeVertex}.
    */
   public static class SimpleMasterCompute
-      extends MasterCompute {
-    @Override
-    public void write(DataOutput out) throws IOException {
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-    }
-
+      extends DefaultMasterCompute {
     @Override
     public void compute() {
-      DoubleOverwriteAggregator agg =
-          (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
-      agg.aggregate(((double) getSuperstep()) / 2 + 1);
+      setAggregatedValue(SMC_AGG,
+          new DoubleWritable(((double) getSuperstep()) / 2 + 1));
       if (getSuperstep() == 10) {
         haltComputation();
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Aug  9 09:10:57 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.aggregators.Dou
 import org.apache.giraph.aggregators.DoubleMinAggregator;
 import org.apache.giraph.aggregators.LongSumAggregator;
 import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
@@ -55,13 +56,15 @@ public class SimplePageRankVertex extend
   /** Logger */
   private static final Logger LOG =
       Logger.getLogger(SimplePageRankVertex.class);
+  /** Sum aggregator name */
+  private static String SUM_AGG = "sum";
+  /** Min aggregator name */
+  private static String MIN_AGG = "min";
+  /** Max aggregator name */
+  private static String MAX_AGG = "max";
 
   @Override
   public void compute(Iterable<DoubleWritable> messages) {
-    LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
-    DoubleMinAggregator minAggreg = (DoubleMinAggregator) getAggregator("min");
-    DoubleMaxAggregator maxAggreg = (DoubleMaxAggregator) getAggregator("max");
-
     if (getSuperstep() >= 1) {
       double sum = 0;
       for (DoubleWritable message : messages) {
@@ -70,12 +73,12 @@ public class SimplePageRankVertex extend
       DoubleWritable vertexValue =
           new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
       setValue(vertexValue);
-      maxAggreg.aggregate(vertexValue);
-      minAggreg.aggregate(vertexValue);
-      sumAggreg.aggregate(1L);
+      aggregate(MAX_AGG, vertexValue);
+      aggregate(MIN_AGG, vertexValue);
+      aggregate(SUM_AGG, new LongWritable(1));
       LOG.info(getId() + ": PageRank=" + vertexValue +
-          " max=" + maxAggreg.getAggregatedValue() +
-          " min=" + minAggreg.getAggregatedValue());
+          " max=" + getAggregatedValue(MAX_AGG) +
+          " min=" + getAggregatedValue(MIN_AGG));
     }
 
     if (getSuperstep() < MAX_SUPERSTEPS) {
@@ -114,24 +117,13 @@ public class SimplePageRankVertex extend
     @Override
     public void preApplication()
       throws InstantiationException, IllegalAccessException {
-      registerAggregator("sum", LongSumAggregator.class);
-      registerAggregator("min", DoubleMinAggregator.class);
-      registerAggregator("max", DoubleMaxAggregator.class);
     }
 
     @Override
     public void postApplication() {
-
-      LongSumAggregator sumAggreg =
-          (LongSumAggregator) getAggregator("sum");
-      DoubleMinAggregator minAggreg =
-          (DoubleMinAggregator) getAggregator("min");
-      DoubleMaxAggregator maxAggreg =
-          (DoubleMaxAggregator) getAggregator("max");
-
-      FINAL_SUM = sumAggreg.getAggregatedValue().get();
-      FINAL_MAX = maxAggreg.getAggregatedValue().get();
-      FINAL_MIN = minAggreg.getAggregatedValue().get();
+      FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+      FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
+      FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
 
       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
@@ -140,31 +132,21 @@ public class SimplePageRankVertex extend
 
     @Override
     public void preSuperstep() {
-
-      LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
-      DoubleMinAggregator minAggreg =
-          (DoubleMinAggregator) getAggregator("min");
-      DoubleMaxAggregator maxAggreg =
-          (DoubleMaxAggregator) getAggregator("max");
-
       if (getSuperstep() >= 3) {
         LOG.info("aggregatedNumVertices=" +
-            sumAggreg.getAggregatedValue() +
+            getAggregatedValue(SUM_AGG) +
             " NumVertices=" + getTotalNumVertices());
-        if (sumAggreg.getAggregatedValue().get() != getTotalNumVertices()) {
+        if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+            getTotalNumVertices()) {
           throw new RuntimeException("wrong value of SumAggreg: " +
-              sumAggreg.getAggregatedValue() + ", should be: " +
+              getAggregatedValue(SUM_AGG) + ", should be: " +
               getTotalNumVertices());
         }
-        DoubleWritable maxPagerank = maxAggreg.getAggregatedValue();
+        DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
-        DoubleWritable minPagerank = minAggreg.getAggregatedValue();
+        DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
       }
-      useAggregator("sum");
-      useAggregator("min");
-      useAggregator("max");
-      sumAggreg.setAggregatedValue(0L);
     }
 
     @Override
@@ -172,6 +154,21 @@ public class SimplePageRankVertex extend
   }
 
   /**
+   * Master compute associated with {@link SimplePageRankVertex}.
+   * It registers required aggregators.
+   */
+  public static class SimplePageRankVertexMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(SUM_AGG, LongSumAggregator.class);
+      registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
+      registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
+    }
+  }
+
+  /**
    * Simple VertexReader that supports {@link SimplePageRankVertex}
    */
   public static class SimplePageRankVertexReader extends

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Thu Aug  9 09:10:57 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.examples;
 
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerContext;
@@ -116,25 +117,19 @@ public class VerifyMessage {
       @Override
       public void preApplication() throws InstantiationException,
       IllegalAccessException {
-        registerAggregator(LongSumAggregator.class.getName(),
-            LongSumAggregator.class);
-        LongSumAggregator sumAggregator = (LongSumAggregator)
-            getAggregator(LongSumAggregator.class.getName());
-        sumAggregator.setAggregatedValue(0);
         SUPERSTEPS = getContext().getConfiguration().getInt(
             SUPERSTEP_COUNT, SUPERSTEPS);
       }
 
       @Override
       public void postApplication() {
-        LongSumAggregator sumAggregator = (LongSumAggregator)
-            getAggregator(LongSumAggregator.class.getName());
-        FINAL_SUM = sumAggregator.getAggregatedValue().get();
+        LongWritable sumAggregatorValue =
+            getAggregatedValue(LongSumAggregator.class.getName());
+        FINAL_SUM = sumAggregatorValue.get();
       }
 
       @Override
       public void preSuperstep() {
-        useAggregator(LongSumAggregator.class.getName());
       }
 
       @Override
@@ -143,19 +138,18 @@ public class VerifyMessage {
 
     @Override
     public void compute(Iterable<VerifiableMessage> messages) {
-      LongSumAggregator sumAggregator = (LongSumAggregator)
-          getAggregator(LongSumAggregator.class.getName());
+      String sumAggregatorName = LongSumAggregator.class.getName();
       if (getSuperstep() > SUPERSTEPS) {
         voteToHalt();
         return;
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("compute: " + sumAggregator);
+        LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
       }
-      sumAggregator.aggregate(getId().get());
+      aggregate(sumAggregatorName, new LongWritable(getId().get()));
       if (LOG.isDebugEnabled()) {
         LOG.debug("compute: sum = " +
-            sumAggregator.getAggregatedValue().get() +
+            this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
             " for vertex " + getId());
       }
       float msgValue = 0.0f;
@@ -206,4 +200,18 @@ public class VerifyMessage {
       }
     }
   }
+
+  /**
+   * Master compute associated with {@link VerifyMessageVertex}.
+   * It registers required aggregators.
+   */
+  public static class VerifyMessageMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(LongSumAggregator.class.getName(),
+          LongSumAggregator.class);
+    }
+  }
 }