You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/08/09 11:10:59 UTC

svn commit: r1371108 [2/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/aggregators/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/gra...

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java Thu Aug  9 09:10:57 2012
@@ -36,12 +36,12 @@ public interface Aggregator<A extends Wr
   void aggregate(A value);
 
   /**
-   * Set aggregated value.
-   * Can be used for initialization or reset.
+   * Return new aggregated value which is neutral to aggregate operation.
+   * Must be changeable without affecting internals of Aggregator
    *
-   * @param value Value to be set.
+   * @return Neutral value
    */
-  void setAggregatedValue(A value);
+  A createInitialValue();
 
   /**
    * Return current aggregated value.
@@ -53,10 +53,15 @@ public interface Aggregator<A extends Wr
   A getAggregatedValue();
 
   /**
-   * Return new aggregated value.
-   * Must be changeable without affecting internals of Aggregator
+   * Set aggregated value.
+   * Can be used for initialization or reset.
    *
-   * @return Writable
+   * @param value Value to be set.
+   */
+  void setAggregatedValue(A value);
+
+  /**
+   * Reset the value of aggregator to neutral value
    */
-  A createAggregatedValue();
+  void reset();
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Wrapper for aggregators. Keeps two instances of an aggregator - one for
+ * the value from previous super step, and one for the value which is being
+ * generated in current super step.
+ *
+ * @param <A> Aggregated value
+ */
+public class AggregatorWrapper<A extends Writable> {
+  /** False iff aggregator should be reset at the end of each super step */
+  private final boolean persistent;
+  /** Value aggregated in previous super step */
+  private A previousAggregatedValue;
+  /** Aggregator for next super step */
+  private final Aggregator<A> currentAggregator;
+  /** Whether anyone changed current value since the moment it was reset */
+  private boolean changed;
+
+  /**
+   * @param aggregatorClass Class type of the aggregator
+   * @param persistent      False iff aggregator should be reset at the end of
+   *                        each super step
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
+      boolean persistent) throws IllegalAccessException,
+      InstantiationException {
+    this.persistent = persistent;
+    currentAggregator = aggregatorClass.newInstance();
+    changed = false;
+    previousAggregatedValue = currentAggregator.createInitialValue();
+  }
+
+  /**
+   * Get aggregated value from previous super step
+   *
+   * @return Aggregated value from previous super step
+   */
+  public A getPreviousAggregatedValue() {
+    return previousAggregatedValue;
+  }
+
+  /**
+   * Set aggregated value for previous super step
+   *
+   * @param value Aggregated value to set
+   */
+  public void setPreviousAggregatedValue(A value) {
+    previousAggregatedValue = value;
+  }
+
+  /**
+   * Check if aggregator is persistent
+   *
+   * @return False iff aggregator should be reset at the end of each super step
+   */
+  public boolean isPersistent() {
+    return persistent;
+  }
+
+  /**
+   * Check if current aggregator was changed
+   *
+   * @return Whether anyone changed current value since the moment it was reset
+   */
+  public boolean isChanged() {
+    return changed;
+  }
+
+  /**
+   * Add a new value to current aggregator
+   *
+   * @param value Value to be aggregated
+   */
+  public void aggregateCurrent(A value) {
+    changed = true;
+    currentAggregator.aggregate(value);
+  }
+
+  /**
+   * Get current aggregated value
+   *
+   * @return Current aggregated value
+   */
+  public A getCurrentAggregatedValue() {
+    return currentAggregator.getAggregatedValue();
+  }
+
+  /**
+   * Set aggregated value of current aggregator
+   *
+   * @param value Value to set it to
+   */
+  public void setCurrentAggregatedValue(A value) {
+    changed = true;
+    currentAggregator.setAggregatedValue(value);
+  }
+
+  /**
+   * Reset the value of current aggregator to neutral value
+   */
+  public void resetCurrentAggregator() {
+    changed = false;
+    currentAggregator.reset();
+  }
+
+  /**
+   * Return new aggregated value which is neutral to aggregate operation
+   *
+   * @return Neutral value
+   */
+  public A createInitialValue() {
+    return currentAggregator.createInitialValue();
+  }
+
+  /**
+   * Get class of wrapped aggregator
+   *
+   * @return Aggregator class
+   */
+  public Class<? extends Aggregator> getAggregatorClass() {
+    return currentAggregator.getClass();
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Thu Aug  9 09:10:57 2012
@@ -19,7 +19,7 @@
 package org.apache.giraph.graph;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.Mappe
  *  An AggregatorWriter is used to export Aggregators during or at the end of
  *  each computation. It runs on the master and it's called at the end of each
  *  superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
- *  passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
+ *  passed to {@link AggregatorWriter#writeAggregator(Iterable, long)} as the
  *  superstep value to signal the end of computation.
  */
 public interface AggregatorWriter {
@@ -53,19 +53,19 @@ public interface AggregatorWriter {
    * whether to write the aggregators values for the current superstep. For
    * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
    *
-   * @param aggregatorMap Map of aggregators to write
+   * @param aggregatorMap Map from aggregator name to aggregator value
    * @param superstep Current superstep
    * @throws IOException
    */
   void writeAggregator(
-      Map<String, Aggregator<Writable>> aggregatorMap,
+      Iterable<Entry<String, Writable>> aggregatorMap,
       long superstep) throws IOException;
 
   /**
    * The method is called at the end of a successful computation. The method
    * is not called when the job fails and a new master is elected. For this
    * reason it's advised to flush data at the end of
-   * {@link AggregatorWriter#writeAggregator(Map, long)}.
+   * {@link AggregatorWriter#writeAggregator(Iterable, long)}.
    *
    * @throws IOException
    */

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Aug  9 09:10:57 2012
@@ -259,8 +259,8 @@ public abstract class BspService<I exten
   /** Checkpoint frequency */
   private final int checkpointFrequency;
   /** Map of aggregators */
-  private Map<String, Aggregator<Writable>> aggregatorMap =
-      new TreeMap<String, Aggregator<Writable>>();
+  private Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+      new TreeMap<String, AggregatorWrapper<Writable>>();
 
   /**
    * Constructor.
@@ -884,22 +884,23 @@ public abstract class BspService<I exten
    * @param <A> Aggregator type
    * @param name Name of the aggregator
    * @param aggregatorClass Class of the aggregator
+   * @param persistent False iff aggregator should be reset at the end of
+   *                   every super step
    * @return Aggregator
    * @throws IllegalAccessException
    * @throws InstantiationException
    */
-  public final <A extends Writable> Aggregator<A> registerAggregator(
-    String name,
-    Class<? extends Aggregator<A>> aggregatorClass)
-    throws InstantiationException, IllegalAccessException {
+  protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
     if (aggregatorMap.get(name) != null) {
       return null;
     }
-    Aggregator<A> aggregator =
-        (Aggregator<A>) aggregatorClass.newInstance();
-    @SuppressWarnings("unchecked")
-    Aggregator<Writable> writableAggregator =
-      (Aggregator<Writable>) aggregator;
+    AggregatorWrapper<A> aggregator =
+        new AggregatorWrapper<A>(aggregatorClass, persistent);
+    AggregatorWrapper<Writable> writableAggregator =
+        (AggregatorWrapper<Writable>) aggregator;
     aggregatorMap.put(name, writableAggregator);
     if (LOG.isInfoEnabled()) {
       LOG.info("registerAggregator: registered " + name);
@@ -913,16 +914,32 @@ public abstract class BspService<I exten
    * @param name Name of aggregator
    * @return Aggregator or null when not registered
    */
-  public final Aggregator<? extends Writable> getAggregator(String name) {
+  protected AggregatorWrapper<? extends Writable> getAggregator(String name) {
     return aggregatorMap.get(name);
   }
 
   /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  public <A extends Writable> A getAggregatedValue(String name) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator == null) {
+      return null;
+    } else {
+      return (A) aggregator.getPreviousAggregatedValue();
+    }
+  }
+
+  /**
    * Get the aggregator map.
    *
    * @return Map of aggregator names to aggregator
    */
-  public Map<String, Aggregator<Writable>> getAggregatorMap() {
+  protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
     return aggregatorMap;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Aug  9 09:10:57 2012
@@ -48,7 +48,6 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -60,7 +59,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,11 +67,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
 /**
- * ZooKeeper-based implementation of {@link CentralizedService}.
+ * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -869,7 +872,7 @@ public class BspServiceMaster<I extends 
 
     for (String hostnameIdPath : hostnameIdPathList) {
       JSONObject workerFinishedInfoObj = null;
-      JSONArray aggregatorArray = null;
+      byte[] aggregatorArray = null;
       try {
         byte [] zkData =
             getZkExt().getData(hostnameIdPath, false, null);
@@ -886,89 +889,57 @@ public class BspServiceMaster<I extends 
             "collectAndProcessAggregatorValues: JSONException", e);
       }
       try {
-        aggregatorArray = workerFinishedInfoObj.getJSONArray(
-            JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
+        aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
+            JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
       } catch (JSONException e) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("collectAndProcessAggregatorValues: " +
               "No aggregators" + " for " + hostnameIdPath);
         }
         continue;
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: IOException", e);
       }
-      for (int i = 0; i < aggregatorArray.length(); ++i) {
-        try {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("collectAndProcessAggregatorValues: " +
-                "Getting aggregators from " +
-                aggregatorArray.getJSONObject(i));
-          }
-          String aggregatorName =
-              aggregatorArray.getJSONObject(i).getString(
-                  AGGREGATOR_NAME_KEY);
-          String aggregatorClassName =
-              aggregatorArray.getJSONObject(i).getString(
-                  AGGREGATOR_CLASS_NAME_KEY);
-          @SuppressWarnings("unchecked")
-          Aggregator<Writable> aggregator =
-            (Aggregator<Writable>) getAggregator(aggregatorName);
-          boolean firstTime = false;
+
+      DataInputStream input =
+          new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+      try {
+        while (input.available() > 0) {
+          String aggregatorName = input.readUTF();
+          AggregatorWrapper<Writable> aggregator =
+              getAggregatorMap().get(aggregatorName);
           if (aggregator == null) {
-            @SuppressWarnings("unchecked")
-            Class<? extends Aggregator<Writable>> aggregatorClass =
-              (Class<? extends Aggregator<Writable>>)
-              Class.forName(aggregatorClassName);
-            aggregator = registerAggregator(
-                aggregatorName,
-                aggregatorClass);
-            firstTime = true;
-          }
-          Writable aggregatorValue =
-              aggregator.createAggregatedValue();
-          InputStream input =
-              new ByteArrayInputStream(
-                  Base64.decode(
-                      aggregatorArray.getJSONObject(i).
-                      getString(AGGREGATOR_VALUE_KEY)));
-          aggregatorValue.readFields(new DataInputStream(input));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("collectAndProcessAggregatorValues: " +
-                "aggregator value size=" + input.available() +
-                " for aggregator=" + aggregatorName +
-                " value=" + aggregatorValue);
-          }
-          if (firstTime) {
-            aggregator.setAggregatedValue(aggregatorValue);
-          } else {
-            aggregator.aggregate(aggregatorValue);
+            throw new IllegalStateException(
+                "collectAndProcessAggregatorValues: " +
+                    "Master received aggregator which isn't registered: " +
+                    aggregatorName);
           }
-        } catch (IOException e) {
-          throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
-                  "IOException when reading aggregator data " +
-                  aggregatorArray, e);
-        } catch (JSONException e) {
-          throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
-                  "JSONException when reading aggregator data " +
-                  aggregatorArray, e);
-        } catch (ClassNotFoundException e) {
-          throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
-                  "ClassNotFoundException when reading aggregator data " +
-                  aggregatorArray, e);
-        } catch (InstantiationException e) {
-          throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
-                  "InstantiationException when reading aggregator data " +
-                  aggregatorArray, e);
-        } catch (IllegalAccessException e) {
-          throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
-                  "IOException when reading aggregator data " +
-                  aggregatorArray, e);
+          Writable aggregatorValue = aggregator.createInitialValue();
+          aggregatorValue.readFields(input);
+          aggregator.aggregateCurrent(aggregatorValue);
         }
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: " +
+                "IOException when reading aggregator data", e);
       }
     }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+    }
+
+    // prepare aggregators for master compute
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap().values()) {
+      if (aggregator.isPersistent()) {
+        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+      }
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+      aggregator.resetCurrentAggregator();
+    }
   }
 
   /**
@@ -977,48 +948,45 @@ public class BspServiceMaster<I extends 
    * @param superstep superstep for which to save values
    */
   private void saveAggregatorValues(long superstep) {
-    Map<String, Aggregator<Writable>> aggregatorMap = getAggregatorMap();
+    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+        getAggregatorMap();
+
+    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+      if (aggregator.isChanged()) {
+        // if master compute changed the value, use the one he chose
+        aggregator.setPreviousAggregatedValue(
+            aggregator.getCurrentAggregatedValue());
+        // reset aggregator for the next superstep
+        aggregator.resetCurrentAggregator();
+      }
+    }
+
     if (aggregatorMap.size() > 0) {
       String mergedAggregatorPath =
           getMergedAggregatorPath(getApplicationAttempt(), superstep);
-      byte [] zkData = null;
-      JSONArray aggregatorArray = new JSONArray();
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-        aggregatorMap.entrySet()) {
+
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      DataOutput output = new DataOutputStream(outputStream);
+      try {
+        output.writeInt(aggregatorMap.size());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+          aggregatorMap.entrySet()) {
         try {
-          ByteArrayOutputStream outputStream =
-              new ByteArrayOutputStream();
-          DataOutput output = new DataOutputStream(outputStream);
-          entry.getValue().getAggregatedValue().write(output);
-
-          JSONObject aggregatorObj = new JSONObject();
-          aggregatorObj.put(AGGREGATOR_NAME_KEY,
-              entry.getKey());
-          aggregatorObj.put(
-              AGGREGATOR_VALUE_KEY,
-              Base64.encodeBytes(outputStream.toByteArray()));
-          aggregatorArray.put(aggregatorObj);
-          if (LOG.isInfoEnabled()) {
-            LOG.info("saveAggregatorValues: " +
-                "Trying to add aggregatorObj " +
-                aggregatorObj + "(" +
-                    entry.getValue().getAggregatedValue() +
-                    ") to merged aggregator path " +
-                    mergedAggregatorPath);
-          }
+          output.writeUTF(entry.getKey());
+          output.writeUTF(entry.getValue().getAggregatorClass().getName());
+          entry.getValue().getPreviousAggregatedValue().write(output);
         } catch (IOException e) {
-          throw new IllegalStateException(
-              "saveAggregatorValues: " +
-                  "IllegalStateException", e);
-        } catch (JSONException e) {
-          throw new IllegalStateException(
-              "saveAggregatorValues: JSONException", e);
+          throw new IllegalStateException("saveAggregatorValues: " +
+              "IllegalStateException", e);
         }
       }
+
       try {
-        zkData = aggregatorArray.toString().getBytes();
         getZkExt().createExt(mergedAggregatorPath,
-            zkData,
+            outputStream.toByteArray(),
             Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT,
             true);
@@ -1034,10 +1002,8 @@ public class BspServiceMaster<I extends 
             e);
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("saveAggregatorValues: Finished " +
-            "loading " +
-            mergedAggregatorPath + " with aggregator values " +
-            aggregatorArray);
+        LOG.info("saveAggregatorValues: Finished loading " +
+            mergedAggregatorPath);
       }
     }
   }
@@ -1534,7 +1500,20 @@ public class BspServiceMaster<I extends 
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
     try {
-      aggregatorWriter.writeAggregator(getAggregatorMap(),
+      Iterable<Map.Entry<String, Writable>> iter =
+          Iterables.transform(
+              getAggregatorMap().entrySet(),
+              new Function<Entry<String, AggregatorWrapper<Writable>>,
+                  Entry<String, Writable>>() {
+                @Override
+                public Entry<String, Writable> apply(
+                    Entry<String, AggregatorWrapper<Writable>> entry) {
+                  return new AbstractMap.SimpleEntry<String,
+                      Writable>(entry.getKey(),
+                      entry.getValue().getPreviousAggregatedValue());
+                }
+              });
+      aggregatorWriter.writeAggregator(iter,
           (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
               AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
     } catch (IOException e) {
@@ -1787,14 +1766,28 @@ public class BspServiceMaster<I extends 
     return foundEvent;
   }
 
-  /**
-   * Use an aggregator in this superstep. Note that the master uses all
-   * aggregators by default, so calling this function is not neccessary.
-   *
-   * @param name Name of aggregator (should be unique)
-   * @return boolean (always true)
-   */
-  public boolean useAggregator(String name) {
-    return true;
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregatorClass, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregatorClass, true) != null;
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator == null) {
+      throw new IllegalStateException(
+          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
+              " registered " + name);
+    }
+    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Aug  9 09:10:57 2012
@@ -57,11 +57,11 @@ import net.iharder.Base64;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -71,7 +71,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeSet;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
@@ -92,8 +91,6 @@ public class BspServiceWorker<I extends 
   private int inputSplitCount = -1;
   /** My process health znode */
   private String myHealthZnode;
-  /** List of aggregators currently in use */
-  private Set<String> aggregatorInUse = new TreeSet<String>();
   /** Worker info */
   private final WorkerInfo workerInfo;
   /** Worker graph partitioner */
@@ -130,7 +127,6 @@ public class BspServiceWorker<I extends 
    * @param context Mapper context
    * @param graphMapper Graph mapper
    * @param graphState Global graph state
-   * @throws UnknownHostException
    * @throws IOException
    * @throws InterruptedException
    */
@@ -194,21 +190,6 @@ public class BspServiceWorker<I extends 
   }
 
   /**
-   * Use an aggregator in this superstep.
-   *
-   * @param name Name of aggregator (should be unique)
-   * @return boolean (false when aggregator not registered)
-   */
-  public boolean useAggregator(String name) {
-    if (getAggregatorMap().get(name) == null) {
-      LOG.error("userAggregator: Aggregator=" + name + " not registered");
-      return false;
-    }
-    aggregatorInUse.add(name);
-    return true;
-  }
-
-  /**
    * Try to reserve an InputSplit for loading.  While InputSplits exists that
    * are not finished, wait until they are.
    *
@@ -692,57 +673,49 @@ public class BspServiceWorker<I extends 
     finishSuperstep(partitionStatsList);
   }
 
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator != null) {
+      ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+    } else {
+      throw new IllegalStateException("aggregate: Tried to aggregate value " +
+          "to unregistered aggregator " + name);
+    }
+  }
+
   /**
-   *  Marshal the aggregator values of to a JSONArray that will later be
-   *  aggregated by master.  Reset the 'use' of aggregators in the next
-   *  superstep
+   *  Marshal the aggregator values of the worker to a byte array that will
+   *  later be aggregated by master.
    *
    * @param superstep Superstep to marshall on
-   * @return JSON array of the aggreagtor values
+   * @return Byte array of the aggreagtor values
    */
-  private JSONArray marshalAggregatorValues(long superstep) {
-    JSONArray aggregatorArray = new JSONArray();
-    if ((superstep == INPUT_SUPERSTEP) || (aggregatorInUse.size() == 0)) {
-      return aggregatorArray;
+  private byte[] marshalAggregatorValues(long superstep) {
+    if (superstep == INPUT_SUPERSTEP) {
+      return new byte[0];
     }
 
-    for (String name : aggregatorInUse) {
-      try {
-        Aggregator<Writable> aggregator = getAggregatorMap().get(name);
-        ByteArrayOutputStream outputStream =
-            new ByteArrayOutputStream();
-        DataOutput output = new DataOutputStream(outputStream);
-        aggregator.getAggregatedValue().write(output);
-
-        JSONObject aggregatorObj = new JSONObject();
-        aggregatorObj.put(AGGREGATOR_NAME_KEY, name);
-        aggregatorObj.put(AGGREGATOR_CLASS_NAME_KEY,
-            aggregator.getClass().getName());
-        aggregatorObj.put(
-            AGGREGATOR_VALUE_KEY,
-            Base64.encodeBytes(outputStream.toByteArray()));
-        aggregatorArray.put(aggregatorObj);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("marshalAggregatorValues: " +
-              "Found aggregatorObj " +
-              aggregatorObj + ", value (" +
-              aggregator.getAggregatedValue() + ")");
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(outputStream);
+    for (Entry<String, AggregatorWrapper<Writable>> entry :
+        getAggregatorMap().entrySet()) {
+      if (entry.getValue().isChanged()) {
+        try {
+          output.writeUTF(entry.getKey());
+          entry.getValue().getCurrentAggregatedValue().write(output);
+        } catch (IOException e) {
+          throw new IllegalStateException("Failed to marshall aggregator " +
+              "with IOException " + entry.getKey(), e);
         }
-      } catch (JSONException e) {
-        throw new IllegalStateException("Failed to marshall aggregator " +
-            "with JSONException " + name, e);
-      } catch (IOException e) {
-        throw new IllegalStateException("Failed to marshall aggregator " +
-            "with IOException " + name, e);
       }
     }
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("marshalAggregatorValues: Finished assembling " +
-          "aggregator values in JSONArray - " + aggregatorArray);
+      LOG.info(
+          "marshalAggregatorValues: Finished assembling aggregator values");
     }
-    aggregatorInUse.clear();
-    return aggregatorArray;
+    return outputStream.toByteArray();
   }
 
   /**
@@ -751,13 +724,18 @@ public class BspServiceWorker<I extends 
    * @param superstep Superstep to get the aggregated values from
    */
   private void getAggregatorValues(long superstep) {
+    // prepare aggregators for reading and next superstep
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap().values()) {
+      aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
+      aggregator.resetCurrentAggregator();
+    }
     String mergedAggregatorPath =
         getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-    JSONArray aggregatorArray = null;
+
+    byte[] aggregatorArray = null;
     try {
-      byte[] zkData =
-          getZkExt().getData(mergedAggregatorPath, false, null);
-      aggregatorArray = new JSONArray(new String(zkData));
+      aggregatorArray = getZkExt().getData(mergedAggregatorPath, false, null);
     } catch (KeeperException.NoNodeException e) {
       LOG.info("getAggregatorValues: no aggregators in " +
           mergedAggregatorPath + " on superstep " + superstep);
@@ -768,49 +746,58 @@ public class BspServiceWorker<I extends 
     } catch (InterruptedException e) {
       throw new IllegalStateException("Failed to get data for " +
           mergedAggregatorPath + " with InterruptedException", e);
-    } catch (JSONException e) {
-      throw new IllegalStateException("Failed to get data for " +
-          mergedAggregatorPath + " with JSONException", e);
     }
-    for (int i = 0; i < aggregatorArray.length(); ++i) {
+
+    DataInput input =
+        new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+    int numAggregators = 0;
+
+    try {
+      numAggregators = input.readInt();
+    } catch (IOException e) {
+      throw new IllegalStateException("getAggregatorValues: " +
+          "Failed to decode data", e);
+    }
+
+    for (int i = 0; i < numAggregators; i++) {
       try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("getAggregatorValues: " +
-              "Getting aggregators from " +
-              aggregatorArray.getJSONObject(i));
-        }
-        String aggregatorName = aggregatorArray.getJSONObject(i).
-            getString(AGGREGATOR_NAME_KEY);
-        Aggregator<Writable> aggregator =
+        String aggregatorName = input.readUTF();
+        String aggregatorClassName = input.readUTF();
+        AggregatorWrapper<Writable> aggregatorWrapper =
             getAggregatorMap().get(aggregatorName);
-        if (aggregator == null) {
-          continue;
-        }
-        Writable aggregatorValue = aggregator.getAggregatedValue();
-        InputStream input =
-            new ByteArrayInputStream(
-                Base64.decode(aggregatorArray.getJSONObject(i).
-                    getString(AGGREGATOR_VALUE_KEY)));
-        aggregatorValue.readFields(
-            new DataInputStream(input));
-        aggregator.setAggregatedValue(aggregatorValue);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("getAggregatorValues: " +
-              "Got aggregator=" + aggregatorName + " value=" +
-              aggregatorValue);
+        if (aggregatorWrapper == null) {
+          try {
+            Class<? extends Aggregator<Writable>> aggregatorClass =
+                (Class<? extends Aggregator<Writable>>)
+                    Class.forName(aggregatorClassName);
+            aggregatorWrapper =
+                registerAggregator(aggregatorName, aggregatorClass, false);
+          } catch (ClassNotFoundException e) {
+            throw new IllegalStateException("Failed to create aggregator " +
+                aggregatorName + " of class " + aggregatorClassName +
+                " with ClassNotFoundException", e);
+          } catch (InstantiationException e) {
+            throw new IllegalStateException("Failed to create aggregator " +
+                aggregatorName + " of class " + aggregatorClassName +
+                " with InstantiationException", e);
+          } catch (IllegalAccessException e) {
+            throw new IllegalStateException("Failed to create aggregator " +
+                aggregatorName + " of class " + aggregatorClassName +
+                " with IllegalAccessException", e);
+          }
         }
-      } catch (JSONException e) {
-        throw new IllegalStateException("Failed to decode data for index " +
-            i + " with KeeperException", e);
+        Writable aggregatorValue = aggregatorWrapper.createInitialValue();
+        aggregatorValue.readFields(input);
+        aggregatorWrapper.setPreviousAggregatedValue(aggregatorValue);
       } catch (IOException e) {
-        throw new IllegalStateException("Failed to decode data for index " +
-            i + " with KeeperException", e);
+        throw new IllegalStateException(
+            "Failed to decode data for index " + i, e);
       }
     }
+
     if (LOG.isInfoEnabled()) {
       LOG.info("getAggregatorValues: Finished loading " +
-          mergedAggregatorPath + " with aggregator values " +
-          aggregatorArray);
+          mergedAggregatorPath);
     }
   }
 
@@ -988,7 +975,7 @@ public class BspServiceWorker<I extends 
           MemoryUtils.getRuntimeMemoryStats());
     }
 
-    JSONArray aggregatorValueArray =
+    byte[] aggregatorArray =
         marshalAggregatorValues(getSuperstep());
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
@@ -1000,7 +987,7 @@ public class BspServiceWorker<I extends 
     JSONObject workerFinishedInfoObj = new JSONObject();
     try {
       workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
-          aggregatorValueArray);
+          Base64.encodeBytes(aggregatorArray));
       workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
           Base64.encodeBytes(partitionStatsBytes));
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Aug  9 09:10:57 2012
@@ -111,19 +111,21 @@ public class GraphMapper<I extends Writa
   }
 
   /**
-   * Get the aggregator usage, a subset of the functionality
+   * Get worker aggregator usage, a subset of the functionality
    *
-   * @return Aggregator usage interface
+   * @return Worker aggregator usage interface
    */
-  public final AggregatorUsage getAggregatorUsage() {
-    AggregatorUsage result = null;
-    if (serviceWorker != null) {
-      result = serviceWorker;
-    }
-    if (serviceMaster != null) {
-      result = serviceMaster;
-    }
-    return result;
+  public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
+    return serviceWorker;
+  }
+
+  /**
+   * Get master aggregator usage, a subset of the functionality
+   *
+   * @return Master aggregator usage interface
+   */
+  public final MasterAggregatorUsage getMasterAggregatorUsage() {
+    return serviceMaster;
   }
 
   public final WorkerContext getWorkerContext() {

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access and change aggregators through this interface
+ */
+public interface MasterAggregatorUsage {
+  /**
+   * Register an aggregator in preSuperstep() and/or preApplication(). This
+   * aggregator will have its value reset at the end of each super step.
+   *
+   * @param name of aggregator
+   * @param aggregatorClass Class type of the aggregator
+   * @param <A> Aggregator type
+   * @return True iff aggregator wasn't already registered
+   */
+  <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException;
+
+  /**
+   * Register persistent aggregator in preSuperstep() and/or
+   * preApplication(). This aggregator will not reset value at the end of
+   * super step.
+   *
+   * @param name of aggregator
+   * @param aggregatorClass Class type of the aggregator
+   * @param <A> Aggregator type
+   * @return True iff aggregator wasn't already registered
+   */
+  <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException;
+
+  /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  <A extends Writable> A getAggregatedValue(String name);
+
+  /**
+   * Sets value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param value Value to set
+   * @param <A> Aggregated value
+   */
+  <A extends Writable> void setAggregatedValue(String name, A value);
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Thu Aug  9 09:10:57 2012
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.Mappe
  * not have to be called.
  */
 @SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements AggregatorUsage, Writable,
+public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
     Configurable {
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
@@ -135,23 +135,32 @@ public abstract class MasterCompute impl
   }
 
   @Override
-  public final <A extends Writable> Aggregator<A> registerAggregator(
+  public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)
     throws InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
         registerAggregator(name, aggregatorClass);
   }
 
   @Override
-  public final Aggregator<? extends Writable> getAggregator(String name) {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
-      getAggregator(name);
+  public final <A extends Writable> boolean registerPersistentAggregator(
+      String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        registerPersistentAggregator(name, aggregatorClass);
   }
 
   @Override
-  public final boolean useAggregator(String name) {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
-      useAggregator(name);
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        getAggregatedValue(name);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    getGraphState().getGraphMapper().getMasterAggregatorUsage().
+        setAggregatedValue(name, value);
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Thu Aug  9 09:10:57 2012
@@ -19,7 +19,6 @@
 package org.apache.giraph.graph;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Map.Entry;
 
 import com.google.common.base.Charsets;
@@ -77,13 +76,13 @@ public class TextAggregatorWriter implem
   }
 
   @Override
-  public final void writeAggregator(
-      Map<String, Aggregator<Writable>> aggregators,
+  public void writeAggregator(
+      Iterable<Entry<String, Writable>> aggregatorMap,
       long superstep) throws IOException {
     if (shouldWrite(superstep)) {
-      for (Entry<String, Aggregator<Writable>> a: aggregators.entrySet()) {
-        byte[] bytes = aggregatorToString(a.getKey(), a.getValue(), superstep)
-            .getBytes(Charsets.UTF_8);
+      for (Entry<String, Writable> entry : aggregatorMap) {
+        byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
+            superstep).getBytes(Charsets.UTF_8);
         output.write(bytes, 0, bytes.length);
       }
       output.flush();
@@ -95,17 +94,15 @@ public class TextAggregatorWriter implem
    * Override this if you want to implement your own text format.
    *
    * @param aggregatorName Name of the aggregator
-   * @param a Aggregator
+   * @param value Value of aggregator
    * @param superstep Current superstep
    * @return The String representation for the aggregator
    */
   protected String aggregatorToString(String aggregatorName,
-      Aggregator<Writable> a,
+      Writable value,
       long superstep) {
-
     return new StringBuilder("superstep=").append(superstep).append("\t")
-        .append(aggregatorName).append("=").append(a.getAggregatedValue())
-        .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+        .append(aggregatorName).append("=").append(value).append("\n")
         .toString();
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Thu Aug  9 09:10:57 2012
@@ -45,7 +45,7 @@ import java.util.Map;
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements AggregatorUsage, Writable, Configurable {
+    implements WorkerAggregatorUsage, Writable, Configurable {
   /** Vertex id. */
   private I id;
   /** Vertex value. */
@@ -315,23 +315,15 @@ public abstract class Vertex<I extends W
   }
 
   @Override
-  public final <A extends Writable> Aggregator<A> registerAggregator(
-    String name, Class<? extends Aggregator<A>> aggregatorClass)
-    throws InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
-        registerAggregator(name, aggregatorClass);
+  public <A extends Writable> void aggregate(String name, A value) {
+    getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+        aggregate(name, value);
   }
 
   @Override
-  public final Aggregator<? extends Writable> getAggregator(String name) {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
-      getAggregator(name);
-  }
-
-  @Override
-  public final boolean useAggregator(String name) {
-    return getGraphState().getGraphMapper().getAggregatorUsage().
-      useAggregator(name);
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return getGraphState().getGraphMapper().getWorkerAggregatorUsage().
+        getAggregatedValue(name);
   }
 
   @Override

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex classes can access and change aggregators through this interface
+ */
+public interface WorkerAggregatorUsage {
+  /**
+   * Add a new value
+   *
+   * @param name Name of aggregator
+   * @param value Value to add
+   * @param <A> Aggregated value
+   */
+  <A extends Writable> void aggregate(String name, A value);
+
+  /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  <A extends Writable> A getAggregatedValue(String name);
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Thu Aug  9 09:10:57 2012
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.Mappe
  * on a per-worker basis. There's one WorkerContext per worker.
  */
 @SuppressWarnings("rawtypes")
-public abstract class WorkerContext implements AggregatorUsage {
+public abstract class WorkerContext implements WorkerAggregatorUsage {
   /** Global graph state */
   private GraphState graphState;
 
@@ -110,23 +110,14 @@ public abstract class WorkerContext impl
   }
 
   @Override
-  public final <A extends Writable> Aggregator<A> registerAggregator(
-    String name,
-    Class<? extends Aggregator<A>> aggregatorClass)
-    throws InstantiationException, IllegalAccessException {
-    return graphState.getGraphMapper().getAggregatorUsage().
-        registerAggregator(name, aggregatorClass);
+  public <A extends Writable> void aggregate(String name, A value) {
+    graphState.getGraphMapper().getWorkerAggregatorUsage().
+        aggregate(name, value);
   }
 
   @Override
-  public final Aggregator<? extends Writable> getAggregator(String name) {
-    return graphState.getGraphMapper().getAggregatorUsage().
-        getAggregator(name);
-  }
-
-  @Override
-  public final boolean useAggregator(String name) {
-    return graphState.getGraphMapper().getAggregatorUsage().
-        useAggregator(name);
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return graphState.getGraphMapper().getWorkerAggregatorUsage().
+        getAggregatedValue(name);
   }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Thu Aug  9 09:10:57 2012
@@ -174,6 +174,27 @@ public class BspCase implements Watcher 
   protected GiraphJob prepareJob(String name, Class<?> vertexClass,
       Class<?> workerContextClass, Class<?> vertexInputFormatClass,
       Class<?> vertexOutputFormatClass, Path outputPath) throws IOException {
+    return prepareJob(name, vertexClass, workerContextClass, null,
+        vertexInputFormatClass, vertexOutputFormatClass, outputPath);
+  }
+
+  /**
+   * Prepare a GiraphJob for test purposes
+   *
+   * @param name  identifying name for the job
+   * @param vertexClass class of the vertex to run
+   * @param workerContextClass class of the workercontext to use
+   * @param masterComputeClass class of mastercompute to use
+   * @param vertexInputFormatClass  inputformat to use
+   * @param vertexOutputFormatClass outputformat to use
+   * @param outputPath  destination path for the output
+   * @return  fully configured job instance
+   * @throws IOException
+   */
+  protected GiraphJob prepareJob(String name, Class<?> vertexClass,
+      Class<?> workerContextClass, Class<?> masterComputeClass,
+      Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+      Path outputPath) throws IOException {
     GiraphJob job = new GiraphJob(name);
     setupConfiguration(job);
     job.setVertexClass(vertexClass);
@@ -182,6 +203,9 @@ public class BspCase implements Watcher 
     if (workerContextClass != null) {
       job.setWorkerContextClass(workerContextClass);
     }
+    if (masterComputeClass != null) {
+      job.setMasterComputeClass(masterComputeClass);
+    }
     if (vertexOutputFormatClass != null) {
       job.setVertexOutputFormatClass(vertexOutputFormatClass);
     }

Added: giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java?rev=1371108&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java Thu Aug  9 09:10:57 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+/** Tests if aggregators are handled on a proper way during supersteps */
+public class TestAggregatorsHandling extends BspCase {
+
+  public TestAggregatorsHandling() {
+    super(TestAggregatorsHandling.class.getName());
+  }
+
+  @Test
+  public void testAggregatorsHandling() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        AggregatorsTestVertex.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    job.setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    job.getConfiguration().setBoolean(GiraphJob.USE_NETTY, true);
+    assertTrue(job.run(true));
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Thu Aug  9 09:10:57 2012
@@ -59,6 +59,7 @@ public class TestAutoCheckpoint extends 
     GiraphJob job = prepareJob(getCallingMethodName(),
         SimpleCheckpointVertex.class,
         SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
         SimpleSuperstepVertexInputFormat.class,
         SimpleSuperstepVertexOutputFormat.class,
         outputPath);

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Thu Aug  9 09:10:57 2012
@@ -301,6 +301,8 @@ public class TestBspBasic extends BspCas
         SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class);
     job.setWorkerContextClass(
         SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+    job.setMasterComputeClass(
+        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
     assertTrue(job.run(true));
     if (!runningInDistributedMode()) {
       double maxPageRank =
@@ -362,6 +364,8 @@ public class TestBspBasic extends BspCas
         outputPath);
     job.setWorkerContextClass(
         SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+    job.setMasterComputeClass(
+        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
 
     Configuration conf = job.getConfiguration();
 
@@ -401,23 +405,23 @@ public class TestBspBasic extends BspCas
             String[] tokens = line.split("\t");
             int superstep = Integer.parseInt(tokens[0].split("=")[1]);
             String value = (tokens[1].split("=")[1]);
-            String aggregatorName = tokens[2];
+            String aggregatorName = (tokens[1].split("=")[0]);
 
-            if (DoubleMinAggregator.class.getName().equals(aggregatorName)) {
+            if ("min".equals(aggregatorName)) {
               minValues.put(superstep, Double.parseDouble(value));
             }
-            if (DoubleMaxAggregator.class.getName().equals(aggregatorName)) {
+            if ("max".equals(aggregatorName)) {
               maxValues.put(superstep, Double.parseDouble(value));
             }
-            if (LongSumAggregator.class.getName().equals(aggregatorName)) {
+            if ("sum".equals(aggregatorName)) {
               vertexCounts.put(superstep, Long.parseLong(value));
             }
           }
 
           int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
-          assertEquals(maxSuperstep + 1, minValues.size());
-          assertEquals(maxSuperstep + 1, maxValues.size());
-          assertEquals(maxSuperstep + 1, vertexCounts.size());
+          assertEquals(maxSuperstep + 2, minValues.size());
+          assertEquals(maxSuperstep + 2, maxValues.size());
+          assertEquals(maxSuperstep + 2, vertexCounts.size());
 
           assertEquals(maxPageRank, maxValues.get(maxSuperstep));
           assertEquals(minPageRank, minValues.get(maxSuperstep));

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java Thu Aug  9 09:10:57 2012
@@ -77,6 +77,7 @@ public class TestGraphPartitioner extend
         GiraphJob job = prepareJob("testVertexBalancer",
             SimpleCheckpointVertex.class,
             SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
             SimpleSuperstepVertexInputFormat.class,
             SimpleSuperstepVertexOutputFormat.class, outputPath);
 
@@ -91,6 +92,7 @@ public class TestGraphPartitioner extend
         outputPath = getTempPath("testHashPartitioner");
         job = prepareJob("testHashPartitioner", SimpleCheckpointVertex.class,
             SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
             SimpleSuperstepVertexInputFormat.class,
             SimpleSuperstepVertexOutputFormat.class, outputPath);
         assertTrue(job.run(true));
@@ -100,6 +102,7 @@ public class TestGraphPartitioner extend
         job = prepareJob("testSuperstepHashPartitioner",
             SimpleCheckpointVertex.class,
             SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
             SimpleSuperstepVertexInputFormat.class,
             SimpleSuperstepVertexOutputFormat.class,
             outputPath);
@@ -115,6 +118,8 @@ public class TestGraphPartitioner extend
         job.setVertexClass(SimpleCheckpointVertex.class);
         job.setWorkerContextClass(
             SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+        job.setMasterComputeClass(
+            SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
         job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
         job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
         job.setGraphPartitionerFactoryClass(
@@ -128,6 +133,7 @@ public class TestGraphPartitioner extend
         job = prepareJob("testReverseIdSuperstepHashPartitioner",
             SimpleCheckpointVertex.class,
             SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+            SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
             SimpleSuperstepVertexInputFormat.class,
             SimpleSuperstepVertexOutputFormat.class, outputPath);
         job.setGraphPartitionerFactoryClass(

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Thu Aug  9 09:10:57 2012
@@ -53,6 +53,7 @@ public class TestManualCheckpoint extend
     GiraphJob job = prepareJob(getCallingMethodName(),
         SimpleCheckpointVertex.class,
         SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
         SimpleSuperstepVertexInputFormat.class,
         SimpleSuperstepVertexOutputFormat.class, outputPath);
 
@@ -81,8 +82,11 @@ public class TestManualCheckpoint extend
     GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
         SimpleCheckpointVertex.class,
         SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class,
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class,
         SimpleSuperstepVertexInputFormat.class,
         SimpleSuperstepVertexOutputFormat.class, outputPath);
+    job.setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
     restartedJob.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
         checkpointsDir.toString());
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestBooleanAggregators.java Thu Aug  9 09:10:57 2012
@@ -29,13 +29,13 @@ public class TestBooleanAggregators {
   public void testAndAggregator() {
     BooleanAndAggregator and = new BooleanAndAggregator();
     assertEquals(true, and.getAggregatedValue().get());
-    and.aggregate(true);
+    and.aggregate(new BooleanWritable(true));
     assertEquals(true, and.getAggregatedValue().get());
     and.aggregate(new BooleanWritable(false));
     assertEquals(false, and.getAggregatedValue().get());
-    and.setAggregatedValue(true);
+    and.setAggregatedValue(new BooleanWritable(true));
     assertEquals(true, and.getAggregatedValue().get());
-    BooleanWritable bw = and.createAggregatedValue();
+    BooleanWritable bw = and.createInitialValue();
     assertNotNull(bw);
   }
 
@@ -43,26 +43,26 @@ public class TestBooleanAggregators {
   public void testOrAggregator() {
     BooleanOrAggregator or = new BooleanOrAggregator();
     assertEquals(false, or.getAggregatedValue().get());
-    or.aggregate(false);
+    or.aggregate(new BooleanWritable(false));
     assertEquals(false, or.getAggregatedValue().get());
     or.aggregate(new BooleanWritable(true));
     assertEquals(true, or.getAggregatedValue().get());
-    or.setAggregatedValue(false);
+    or.setAggregatedValue(new BooleanWritable(false));
     assertEquals(false, or.getAggregatedValue().get());
-    BooleanWritable bw = or.createAggregatedValue();
+    BooleanWritable bw = or.createInitialValue();
     assertNotNull(bw);
   }
 
   @Test
   public void testOverwriteAggregator() {
     BooleanOverwriteAggregator overwrite = new BooleanOverwriteAggregator();
-    overwrite.aggregate(true);
+    overwrite.aggregate(new BooleanWritable(true));
     assertEquals(true, overwrite.getAggregatedValue().get());
     overwrite.aggregate(new BooleanWritable(false));
     assertEquals(false, overwrite.getAggregatedValue().get());
-    overwrite.setAggregatedValue(true);
+    overwrite.setAggregatedValue(new BooleanWritable(true));
     assertEquals(true, overwrite.getAggregatedValue().get());
-    BooleanWritable bw = overwrite.createAggregatedValue();
+    BooleanWritable bw = overwrite.createInitialValue();
     assertNotNull(bw);
   }
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestDoubleAggregators.java Thu Aug  9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestDoubleAggregators {
   @Test
   public void testMaxAggregator() {
     DoubleMaxAggregator max = new DoubleMaxAggregator();
-    max.aggregate(2.0);
+    max.aggregate(new DoubleWritable(2.0));
     max.aggregate(new DoubleWritable(3.0));
     assertEquals(3.0, max.getAggregatedValue().get());
-    max.setAggregatedValue(1.0);
+    max.setAggregatedValue(new DoubleWritable(1.0));
     assertEquals(1.0, max.getAggregatedValue().get());
-    DoubleWritable dw = max.createAggregatedValue();
+    DoubleWritable dw = max.createInitialValue();
     assertNotNull(dw);
   }
 
   @Test
   public void testMinAggregator() {
     DoubleMinAggregator min = new DoubleMinAggregator();
-    min.aggregate(3.0);
+    min.aggregate(new DoubleWritable(3.0));
     min.aggregate(new DoubleWritable(2.0));
     assertEquals(2.0, min.getAggregatedValue().get());
-    min.setAggregatedValue(3.0);
+    min.setAggregatedValue(new DoubleWritable(3.0));
     assertEquals(3.0, min.getAggregatedValue().get());
-    DoubleWritable dw = min.createAggregatedValue();
+    DoubleWritable dw = min.createInitialValue();
     assertNotNull(dw);
   }
 
   @Test
   public void testOverwriteAggregator() {
     DoubleOverwriteAggregator overwrite = new DoubleOverwriteAggregator();
-    overwrite.aggregate(1.0);
+    overwrite.aggregate(new DoubleWritable(1.0));
     assertEquals(1.0, overwrite.getAggregatedValue().get());
     overwrite.aggregate(new DoubleWritable(2.0));
     assertEquals(2.0, overwrite.getAggregatedValue().get());
-    overwrite.setAggregatedValue(3.0);
+    overwrite.setAggregatedValue(new DoubleWritable(3.0));
     assertEquals(3.0, overwrite.getAggregatedValue().get());
-    DoubleWritable dw = overwrite.createAggregatedValue();
+    DoubleWritable dw = overwrite.createInitialValue();
     assertNotNull(dw);
   }
-  
+
   @Test
   public void testProductAggregator() {
     DoubleProductAggregator product = new DoubleProductAggregator();
-    product.aggregate(6.0);
+    product.aggregate(new DoubleWritable(6.0));
     product.aggregate(new DoubleWritable(7.0));
     assertEquals(42.0, product.getAggregatedValue().get());
-    product.setAggregatedValue(1.0);
+    product.setAggregatedValue(new DoubleWritable(1.0));
     assertEquals(1.0, product.getAggregatedValue().get());
-    DoubleWritable dw = product.createAggregatedValue();
+    DoubleWritable dw = product.createInitialValue();
     assertNotNull(dw);
   }
 
   @Test
   public void testSumAggregator() {
     DoubleSumAggregator sum = new DoubleSumAggregator();
-    sum.aggregate(1.0);
+    sum.aggregate(new DoubleWritable(1.0));
     sum.aggregate(new DoubleWritable(2.0));
     assertEquals(3.0, sum.getAggregatedValue().get());
-    sum.setAggregatedValue(4.0);
+    sum.setAggregatedValue(new DoubleWritable(4.0));
     assertEquals(4.0, sum.getAggregatedValue().get());
-    DoubleWritable dw = sum.createAggregatedValue();
+    DoubleWritable dw = sum.createInitialValue();
     assertNotNull(dw);
   }
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestFloatAggregators.java Thu Aug  9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestFloatAggregators {
   @Test
   public void testMaxAggregator() {
     FloatMaxAggregator max = new FloatMaxAggregator();
-    max.aggregate(2.0f);
+    max.aggregate(new FloatWritable(2.0f));
     max.aggregate(new FloatWritable(3.0f));
     assertEquals(3.0f, max.getAggregatedValue().get());
-    max.setAggregatedValue(1.0f);
+    max.setAggregatedValue(new FloatWritable(1.0f));
     assertEquals(1.0f, max.getAggregatedValue().get());
-    FloatWritable fw = max.createAggregatedValue();
+    FloatWritable fw = max.createInitialValue();
     assertNotNull(fw);
   }
 
   @Test
   public void testMinAggregator() {
     FloatMinAggregator min = new FloatMinAggregator();
-    min.aggregate(3.0f);
+    min.aggregate(new FloatWritable(3.0f));
     min.aggregate(new FloatWritable(2.0f));
     assertEquals(2.0f, min.getAggregatedValue().get());
-    min.setAggregatedValue(3.0f);
+    min.setAggregatedValue(new FloatWritable(3.0f));
     assertEquals(3.0f, min.getAggregatedValue().get());
-    FloatWritable fw = min.createAggregatedValue();
+    FloatWritable fw = min.createInitialValue();
     assertNotNull(fw);
   }
 
   @Test
   public void testOverwriteAggregator() {
     FloatOverwriteAggregator overwrite = new FloatOverwriteAggregator();
-    overwrite.aggregate(1.0f);
+    overwrite.aggregate(new FloatWritable(1.0f));
     assertEquals(1.0f, overwrite.getAggregatedValue().get());
     overwrite.aggregate(new FloatWritable(2.0f));
     assertEquals(2.0f, overwrite.getAggregatedValue().get());
-    overwrite.setAggregatedValue(3.0f);
+    overwrite.setAggregatedValue(new FloatWritable(3.0f));
     assertEquals(3.0f, overwrite.getAggregatedValue().get());
-    FloatWritable fw = overwrite.createAggregatedValue();
+    FloatWritable fw = overwrite.createInitialValue();
     assertNotNull(fw);
   }
-  
+
   @Test
   public void testProductAggregator() {
     FloatProductAggregator product = new FloatProductAggregator();
-    product.aggregate(6.0f);
+    product.aggregate(new FloatWritable(6.0f));
     product.aggregate(new FloatWritable(7.0f));
     assertEquals(42.0f, product.getAggregatedValue().get());
-    product.setAggregatedValue(1.0f);
+    product.setAggregatedValue(new FloatWritable(1.0f));
     assertEquals(1.0f, product.getAggregatedValue().get());
-    FloatWritable fw = product.createAggregatedValue();
+    FloatWritable fw = product.createInitialValue();
     assertNotNull(fw);
   }
 
   @Test
   public void testSumAggregator() {
     FloatSumAggregator sum = new FloatSumAggregator();
-    sum.aggregate(1.0f);
+    sum.aggregate(new FloatWritable(1.0f));
     sum.aggregate(new FloatWritable(2.0f));
     assertEquals(3.0f, sum.getAggregatedValue().get());
-    sum.setAggregatedValue(4.0f);
+    sum.setAggregatedValue(new FloatWritable(4.0f));
     assertEquals(4.0f, sum.getAggregatedValue().get());
-    FloatWritable fw = sum.createAggregatedValue();
+    FloatWritable fw = sum.createInitialValue();
     assertNotNull(fw);
   }
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestIntAggregators.java Thu Aug  9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestIntAggregators {
   @Test
   public void testMaxAggregator() {
     IntMaxAggregator max = new IntMaxAggregator();
-    max.aggregate(2);
+    max.aggregate(new IntWritable(2));
     max.aggregate(new IntWritable(3));
     assertEquals(3, max.getAggregatedValue().get());
-    max.setAggregatedValue(1);
+    max.setAggregatedValue(new IntWritable(1));
     assertEquals(1, max.getAggregatedValue().get());
-    IntWritable iw = max.createAggregatedValue();
+    IntWritable iw = max.createInitialValue();
     assertNotNull(iw);
   }
 
   @Test
   public void testMinAggregator() {
     IntMinAggregator min = new IntMinAggregator();
-    min.aggregate(3);
+    min.aggregate(new IntWritable(3));
     min.aggregate(new IntWritable(2));
     assertEquals(2, min.getAggregatedValue().get());
-    min.setAggregatedValue(3);
+    min.setAggregatedValue(new IntWritable(3));
     assertEquals(3, min.getAggregatedValue().get());
-    IntWritable iw = min.createAggregatedValue();
+    IntWritable iw = min.createInitialValue();
     assertNotNull(iw);
   }
 
   @Test
   public void testOverwriteAggregator() {
     IntOverwriteAggregator overwrite = new IntOverwriteAggregator();
-    overwrite.aggregate(1);
+    overwrite.aggregate(new IntWritable(1));
     assertEquals(1, overwrite.getAggregatedValue().get());
     overwrite.aggregate(new IntWritable(2));
     assertEquals(2, overwrite.getAggregatedValue().get());
-    overwrite.setAggregatedValue(3);
+    overwrite.setAggregatedValue(new IntWritable(3));
     assertEquals(3, overwrite.getAggregatedValue().get());
-    IntWritable iw = overwrite.createAggregatedValue();
+    IntWritable iw = overwrite.createInitialValue();
     assertNotNull(iw);
   }
   
   @Test
   public void testProductAggregator() {
     IntProductAggregator product = new IntProductAggregator();
-    product.aggregate(6);
+    product.aggregate(new IntWritable(6));
     product.aggregate(new IntWritable(7));
     assertEquals(42, product.getAggregatedValue().get());
-    product.setAggregatedValue(1);
+    product.setAggregatedValue(new IntWritable(1));
     assertEquals(1, product.getAggregatedValue().get());
-    IntWritable iw = product.createAggregatedValue();
+    IntWritable iw = product.createInitialValue();
     assertNotNull(iw);
   }
 
   @Test
   public void testSumAggregator() {
     IntSumAggregator sum = new IntSumAggregator();
-    sum.aggregate(1);
+    sum.aggregate(new IntWritable(1));
     sum.aggregate(new IntWritable(2));
     assertEquals(3, sum.getAggregatedValue().get());
-    sum.setAggregatedValue(4);
+    sum.setAggregatedValue(new IntWritable(4));
     assertEquals(4, sum.getAggregatedValue().get());
-    IntWritable iw = sum.createAggregatedValue();
+    IntWritable iw = sum.createInitialValue();
     assertNotNull(iw);
   }
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java?rev=1371108&r1=1371107&r2=1371108&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/aggregators/TestLongAggregators.java Thu Aug  9 09:10:57 2012
@@ -28,61 +28,61 @@ public class TestLongAggregators {
   @Test
   public void testMaxAggregator() {
     LongMaxAggregator max = new LongMaxAggregator();
-    max.aggregate(2L);
+    max.aggregate(new LongWritable(2L));
     max.aggregate(new LongWritable(3L));
     assertEquals(3L, max.getAggregatedValue().get());
-    max.setAggregatedValue(1L);
+    max.setAggregatedValue(new LongWritable(1L));
     assertEquals(1L, max.getAggregatedValue().get());
-    LongWritable lw = max.createAggregatedValue();
+    LongWritable lw = max.createInitialValue();
     assertNotNull(lw);
   }
 
   @Test
   public void testMinAggregator() {
     LongMinAggregator min = new LongMinAggregator();
-    min.aggregate(3L);
+    min.aggregate(new LongWritable(3L));
     min.aggregate(new LongWritable(2L));
     assertEquals(2L, min.getAggregatedValue().get());
-    min.setAggregatedValue(3L);
+    min.setAggregatedValue(new LongWritable(3L));
     assertEquals(3L, min.getAggregatedValue().get());
-    LongWritable lw = min.createAggregatedValue();
+    LongWritable lw = min.createInitialValue();
     assertNotNull(lw);
   }
 
   @Test
   public void testOverwriteAggregator() {
     LongOverwriteAggregator overwrite = new LongOverwriteAggregator();
-    overwrite.aggregate(1L);
+    overwrite.aggregate(new LongWritable(1L));
     assertEquals(1L, overwrite.getAggregatedValue().get());
     overwrite.aggregate(new LongWritable(2L));
     assertEquals(2L, overwrite.getAggregatedValue().get());
-    overwrite.setAggregatedValue(3L);
+    overwrite.setAggregatedValue(new LongWritable(3L));
     assertEquals(3L, overwrite.getAggregatedValue().get());
-    LongWritable lw = overwrite.createAggregatedValue();
+    LongWritable lw = overwrite.createInitialValue();
     assertNotNull(lw);
   }
   
   @Test
   public void testProductAggregator() {
     LongProductAggregator product = new LongProductAggregator();
-    product.aggregate(6L);
+    product.aggregate(new LongWritable(6L));
     product.aggregate(new LongWritable(7L));
     assertEquals(42L, product.getAggregatedValue().get());
-    product.setAggregatedValue(1L);
+    product.setAggregatedValue(new LongWritable(1L));
     assertEquals(1L, product.getAggregatedValue().get());
-    LongWritable lw = product.createAggregatedValue();
+    LongWritable lw = product.createInitialValue();
     assertNotNull(lw);
   }
 
   @Test
   public void testSumAggregator() {
     LongSumAggregator sum = new LongSumAggregator();
-    sum.aggregate(1L);
+    sum.aggregate(new LongWritable(1L));
     sum.aggregate(new LongWritable(2L));
     assertEquals(3L, sum.getAggregatedValue().get());
-    sum.setAggregatedValue(4L);
+    sum.setAggregatedValue(new LongWritable(4L));
     assertEquals(4L, sum.getAggregatedValue().get());
-    LongWritable lw = sum.createAggregatedValue();
+    LongWritable lw = sum.createInitialValue();
     assertNotNull(lw);
   }