You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/03 04:57:51 UTC

svn commit: r1393264 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ src/test/java/org/apache/giraph/graph/

Author: aching
Date: Wed Oct  3 02:57:50 2012
New Revision: 1393264

URL: http://svn.apache.org/viewvc?rev=1393264&view=rev
Log:
GIRAPH-293: Should aggregators be checkpointed?

Added:
    giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
Removed:
    giraph/trunk/src/test/java/org/apache/giraph/TestAggregatorsHandling.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct  3 02:57:50 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-293: Should aggregators be checkpointed? (majakabiljo via
+  aching)
+
   GIRAPH-355: Partition.readFields crashes. (maja via aching)
 
   GIRAPH-354: Giraph Formats should use hcatalog-core. (nitayj via

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Wed Oct  3 02:57:50 2012
@@ -37,7 +37,7 @@ import org.apache.zookeeper.KeeperExcept
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceMaster<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
-    CentralizedService<I, V, E, M>, MasterAggregatorUsage {
+    CentralizedService<I, V, E, M> {
   /**
    * Become the master.
    * @return true if became the master, false if the application is done.
@@ -92,4 +92,11 @@ public interface CentralizedServiceMaste
   void setJobState(ApplicationState state,
     long applicationAttempt,
     long desiredSuperstep);
+
+  /**
+   * Get master aggregator usage
+   *
+   * @return Master aggregator usage
+   */
+  MasterAggregatorUsage getAggregatorUsage();
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct  3 02:57:50 2012
@@ -48,7 +48,7 @@ import org.apache.giraph.graph.WorkerCon
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceWorker<I extends WritableComparable,
   V extends Writable, E extends Writable, M extends Writable>
-  extends CentralizedService<I, V, E, M>, WorkerAggregatorUsage {
+  extends CentralizedService<I, V, E, M> {
   /**
    * Get the worker information
    *
@@ -201,4 +201,11 @@ public interface CentralizedServiceWorke
    * @return Server data
    */
   ServerData<I, V, E, M> getServerData();
+
+  /**
+   * Get worker aggregator usage
+   *
+   * @return Worker aggregator usage
+   */
+  WorkerAggregatorUsage getAggregatorUsage();
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorHandler.java Wed Oct  3 02:57:50 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * Class which handles all the actions with aggregators
+ */
+public abstract class AggregatorHandler {
+  /** Map of aggregators */
+  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap;
+
+  /**
+   * Default constructor
+   */
+  protected AggregatorHandler() {
+    aggregatorMap = Maps.newHashMap();
+  }
+
+  /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  public <A extends Writable> A getAggregatedValue(String name) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator == null) {
+      return null;
+    } else {
+      return (A) aggregator.getPreviousAggregatedValue();
+    }
+  }
+
+  /**
+   * Get aggregator by name.
+   *
+   * @param name Name of aggregator
+   * @return Aggregator or null when not registered
+   */
+  protected AggregatorWrapper<Writable> getAggregator(String name) {
+    return aggregatorMap.get(name);
+  }
+
+  /**
+   * Get map of aggregators
+   *
+   * @return Aggregators map
+   */
+  protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
+    return aggregatorMap;
+  }
+
+  /**
+   * Register an aggregator with name and class.
+   *
+   * @param <A> Aggregator type
+   * @param name Name of the aggregator
+   * @param aggregatorClass Class of the aggregator
+   * @param persistent False iff aggregator should be reset at the end of
+   *                   every super step
+   * @return Aggregator
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  protected  <A extends Writable> AggregatorWrapper<A> registerAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
+    if (getAggregator(name) != null) {
+      return null;
+    }
+    AggregatorWrapper<A> aggregator =
+        new AggregatorWrapper<A>(aggregatorClass, persistent);
+    AggregatorWrapper<Writable> writableAggregator =
+        (AggregatorWrapper<Writable>) aggregator;
+    aggregatorMap.put(name, writableAggregator);
+    return aggregator;
+  }
+
+  /**
+   * Register an aggregator with name and className.
+   *
+   * @param <A> Aggregator type
+   * @param name Name of the aggregator
+   * @param aggregatorClassName Name of the aggregator class
+   * @param persistent False iff aggregator should be reset at the end of
+   *                   every super step
+   * @return Aggregator
+   */
+  protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
+      String name, String aggregatorClassName, boolean persistent) {
+    AggregatorWrapper<Writable> aggregatorWrapper = getAggregator(name);
+    if (aggregatorWrapper == null) {
+      try {
+        Class<? extends Aggregator<Writable>> aggregatorClass =
+            (Class<? extends Aggregator<Writable>>)
+                Class.forName(aggregatorClassName);
+        aggregatorWrapper =
+            registerAggregator(name, aggregatorClass, persistent);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Failed to create aggregator " +
+            name + " of class " + aggregatorClassName +
+            " with ClassNotFoundException", e);
+      } catch (InstantiationException e) {
+        throw new IllegalStateException("Failed to create aggregator " +
+            name + " of class " + aggregatorClassName +
+            " with InstantiationException", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Failed to create aggregator " +
+            name + " of class " + aggregatorClassName +
+            " with IllegalAccessException", e);
+      }
+    }
+    return (AggregatorWrapper<A>) aggregatorWrapper;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed Oct  3 02:57:50 2012
@@ -49,8 +49,6 @@ import java.security.InvalidParameterExc
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
 /**
  * Zookeeper-based implementation of {@link CentralizedService}.
@@ -161,13 +159,6 @@ public abstract class BspService<I exten
   /** JSON superstep key */
   public static final String JSONOBJ_SUPERSTEP_KEY =
       "_superstepKey";
-  /** Aggregator name key */
-  public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey";
-  /** Aggregator class name key */
-  public static final String AGGREGATOR_CLASS_NAME_KEY =
-      "_aggregatorClassNameKey";
-  /** Aggregator value key */
-  public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey";
   /** Suffix denotes a worker */
   public static final String WORKER_SUFFIX = "_worker";
   /** Suffix denotes a master */
@@ -266,9 +257,6 @@ public abstract class BspService<I exten
   private final FileSystem fs;
   /** Checkpoint frequency */
   private final int checkpointFrequency;
-  /** Map of aggregators */
-  private Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-      new TreeMap<String, AggregatorWrapper<Writable>>();
 
   /**
    * Constructor.
@@ -903,71 +891,6 @@ public abstract class BspService<I exten
   }
 
   /**
-   * Register an aggregator with name.
-   *
-   * @param <A> Aggregator type
-   * @param name Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
-   * @param persistent False iff aggregator should be reset at the end of
-   *                   every super step
-   * @return Aggregator
-   * @throws IllegalAccessException
-   * @throws InstantiationException
-   */
-  protected <A extends Writable> AggregatorWrapper<A> registerAggregator(
-      String name, Class<? extends Aggregator<A>> aggregatorClass,
-      boolean persistent) throws InstantiationException,
-      IllegalAccessException {
-    if (aggregatorMap.get(name) != null) {
-      return null;
-    }
-    AggregatorWrapper<A> aggregator =
-        new AggregatorWrapper<A>(aggregatorClass, persistent);
-    AggregatorWrapper<Writable> writableAggregator =
-        (AggregatorWrapper<Writable>) aggregator;
-    aggregatorMap.put(name, writableAggregator);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("registerAggregator: registered " + name);
-    }
-    return aggregator;
-  }
-
-  /**
-   * Get aggregator by name.
-   *
-   * @param name Name of aggregator
-   * @return Aggregator or null when not registered
-   */
-  protected AggregatorWrapper<? extends Writable> getAggregator(String name) {
-    return aggregatorMap.get(name);
-  }
-
-  /**
-   * Get value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param <A> Aggregated value
-   * @return Value of the aggregator
-   */
-  public <A extends Writable> A getAggregatedValue(String name) {
-    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
-    if (aggregator == null) {
-      return null;
-    } else {
-      return (A) aggregator.getPreviousAggregatedValue();
-    }
-  }
-
-  /**
-   * Get the aggregator map.
-   *
-   * @return Map of aggregator names to aggregator
-   */
-  protected Map<String, AggregatorWrapper<Writable>> getAggregatorMap() {
-    return aggregatorMap;
-  }
-
-  /**
    * Register a BspEvent.  Ensure that it will be signaled
    * by catastrophic failure so that threads waiting on an event signal
    * will be unblocked.

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct  3 02:57:50 2012
@@ -58,13 +58,11 @@ import org.json.JSONObject;
 
 import net.iharder.Base64;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -72,16 +70,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
  *
@@ -147,8 +141,8 @@ public class BspServiceMaster<I extends 
   /** All the partition stats from the last superstep */
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
-  /** Aggregator writer */
-  private AggregatorWriter aggregatorWriter;
+  /** Handler for aggregators */
+  private MasterAggregatorHandler aggregatorHandler;
   /** Master class */
   private MasterCompute masterCompute;
   /** Communication service */
@@ -581,6 +575,11 @@ public class BspServiceMaster<I extends 
     return splitList.size();
   }
 
+  @Override
+  public MasterAggregatorUsage getAggregatorUsage() {
+    return aggregatorHandler;
+  }
+
   /**
    * Read the finalized checkpoint file and associated metadata files for the
    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
@@ -615,37 +614,8 @@ public class BspServiceMaster<I extends 
       validMetadataPathList.add(new Path(metadataFilePath));
     }
 
-    // Set the merged aggregator data if it exists.
-    int aggregatorDataSize = finalizedStream.readInt();
-    if (aggregatorDataSize > 0) {
-      byte [] aggregatorZkData = new byte[aggregatorDataSize];
-      int actualDataRead =
-          finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
-      if (actualDataRead != aggregatorDataSize) {
-        throw new RuntimeException(
-            "prepareCheckpointRestart: Only read " + actualDataRead +
-            " of " + aggregatorDataSize + " aggregator bytes from " +
-            finalizedCheckpointPath);
-      }
-      String mergedAggregatorPath =
-          getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("prepareCheckpointRestart: Reloading merged " +
-            "aggregator " + "data '" +
-            Arrays.toString(aggregatorZkData) +
-            "' to previous checkpoint in path " +
-            mergedAggregatorPath);
-      }
-      if (getZkExt().exists(mergedAggregatorPath, false) == null) {
-        getZkExt().createExt(mergedAggregatorPath,
-            aggregatorZkData,
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-      } else {
-        getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
-      }
-    }
+    aggregatorHandler.readFields(finalizedStream);
+    aggregatorHandler.finishSuperstep(superstep - 1, this);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
 
@@ -763,14 +733,8 @@ public class BspServiceMaster<I extends 
               getTaskPartition() -
               currentMasterTaskPartitionCounter.getValue());
           masterCompute = getConfiguration().createMasterCompute();
-          aggregatorWriter = getConfiguration().createAggregatorWriter();
-          try {
-            aggregatorWriter.initialize(getContext(),
-                getApplicationAttempt());
-          } catch (IOException e) {
-            throw new IllegalStateException("becomeMaster: " +
-                "Couldn't initialize aggregatorWriter", e);
-          }
+          aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
+          aggregatorHandler.initialize(this);
 
           if (getConfiguration().getUseNetty()) {
             commService = new NettyMasterClientServer(
@@ -870,165 +834,6 @@ public class BspServiceMaster<I extends 
   }
 
   /**
-   * Get the aggregator values for a particular superstep and aggregate them.
-   *
-   * @param superstep superstep to check
-   */
-  private void collectAndProcessAggregatorValues(long superstep) {
-    String workerFinishedPath =
-        getWorkerFinishedPath(getApplicationAttempt(), superstep);
-    List<String> hostnameIdPathList = null;
-    try {
-      hostnameIdPathList =
-          getZkExt().getChildrenExt(
-              workerFinishedPath, false, false, true);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "collectAndProcessAggregatorValues: KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "collectAndProcessAggregatorValues: InterruptedException", e);
-    }
-
-    for (String hostnameIdPath : hostnameIdPathList) {
-      JSONObject workerFinishedInfoObj = null;
-      byte[] aggregatorArray = null;
-      try {
-        byte [] zkData =
-            getZkExt().getData(hostnameIdPath, false, null);
-        workerFinishedInfoObj = new JSONObject(new String(zkData));
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: InterruptedException",
-            e);
-      } catch (JSONException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: JSONException", e);
-      }
-      try {
-        aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
-            JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
-      } catch (JSONException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("collectAndProcessAggregatorValues: " +
-              "No aggregators" + " for " + hostnameIdPath);
-        }
-        continue;
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: IOException", e);
-      }
-
-      DataInputStream input =
-          new DataInputStream(new ByteArrayInputStream(aggregatorArray));
-      try {
-        while (input.available() > 0) {
-          String aggregatorName = input.readUTF();
-          AggregatorWrapper<Writable> aggregator =
-              getAggregatorMap().get(aggregatorName);
-          if (aggregator == null) {
-            throw new IllegalStateException(
-                "collectAndProcessAggregatorValues: " +
-                    "Master received aggregator which isn't registered: " +
-                    aggregatorName);
-          }
-          Writable aggregatorValue = aggregator.createInitialValue();
-          aggregatorValue.readFields(input);
-          aggregator.aggregateCurrent(aggregatorValue);
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: " +
-                "IOException when reading aggregator data", e);
-      }
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
-    }
-
-    // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap().values()) {
-      if (aggregator.isPersistent()) {
-        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
-      }
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-      aggregator.resetCurrentAggregator();
-    }
-  }
-
-  /**
-   * Save the supplied aggregator values.
-   *
-   * @param superstep superstep for which to save values
-   */
-  private void saveAggregatorValues(long superstep) {
-    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-        getAggregatorMap();
-
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isChanged()) {
-        // if master compute changed the value, use the one he chose
-        aggregator.setPreviousAggregatedValue(
-            aggregator.getCurrentAggregatedValue());
-        // reset aggregator for the next superstep
-        aggregator.resetCurrentAggregator();
-      }
-    }
-
-    if (aggregatorMap.size() > 0) {
-      String mergedAggregatorPath =
-          getMergedAggregatorPath(getApplicationAttempt(), superstep);
-
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-      DataOutput output = new DataOutputStream(outputStream);
-      try {
-        output.writeInt(aggregatorMap.size());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-          aggregatorMap.entrySet()) {
-        try {
-          output.writeUTF(entry.getKey());
-          output.writeUTF(entry.getValue().getAggregatorClass().getName());
-          entry.getValue().getPreviousAggregatedValue().write(output);
-        } catch (IOException e) {
-          throw new IllegalStateException("saveAggregatorValues: " +
-              "IllegalStateException", e);
-        }
-      }
-
-      try {
-        getZkExt().createExt(mergedAggregatorPath,
-            outputStream.toByteArray(),
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-      } catch (KeeperException.NodeExistsException e) {
-        LOG.warn("saveAggregatorValues: " +
-            mergedAggregatorPath + " already exists!");
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "saveAggregatorValues: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "saveAggregatorValues: IllegalStateException",
-            e);
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("saveAggregatorValues: Finished loading " +
-            mergedAggregatorPath);
-      }
-    }
-  }
-
-  /**
    * Finalize the checkpoint file prefixes by taking the chosen workers and
    * writing them to a finalized file.  Also write out the master
    * aggregated aggregator array from the previous superstep.
@@ -1056,7 +861,7 @@ public class BspServiceMaster<I extends 
     // <global statistics>
     // <number of files>
     // <used file prefix 0><used file prefix 1>...
-    // <aggregator data length><aggregators as a serialized JSON byte array>
+    // <aggregator data>
     // <masterCompute data>
     FSDataOutputStream finalizedOutputStream =
         getFs().create(finalizedCheckpointPath);
@@ -1073,16 +878,7 @@ public class BspServiceMaster<I extends 
               chosenWorkerInfo.getHostnameId();
       finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
     }
-    String mergedAggregatorPath =
-        getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-    if (getZkExt().exists(mergedAggregatorPath, false) != null) {
-      byte [] aggregatorZkData =
-          getZkExt().getData(mergedAggregatorPath, false, null);
-      finalizedOutputStream.writeInt(aggregatorZkData.length);
-      finalizedOutputStream.write(aggregatorZkData);
-    } else {
-      finalizedOutputStream.writeInt(0);
-    }
+    aggregatorHandler.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
     lastCheckpointedSuperstep = superstep;
@@ -1512,9 +1308,9 @@ public class BspServiceMaster<I extends 
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    collectAndProcessAggregatorValues(getSuperstep());
+    aggregatorHandler.prepareSuperstep(getSuperstep(), this);
     runMasterCompute(getSuperstep());
-    saveAggregatorValues(getSuperstep());
+    aggregatorHandler.finishSuperstep(getSuperstep(), this);
 
     // If the master is halted or all the vertices voted to halt and there
     // are no more messages in the system, stop the computation
@@ -1546,28 +1342,7 @@ public class BspServiceMaster<I extends 
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
-    try {
-      Iterable<Map.Entry<String, Writable>> iter =
-          Iterables.transform(
-              getAggregatorMap().entrySet(),
-              new Function<Entry<String, AggregatorWrapper<Writable>>,
-                  Entry<String, Writable>>() {
-                @Override
-                public Entry<String, Writable> apply(
-                    Entry<String, AggregatorWrapper<Writable>> entry) {
-                  return new AbstractMap.SimpleEntry<String,
-                      Writable>(entry.getKey(),
-                      entry.getValue().getPreviousAggregatedValue());
-                }
-              });
-      aggregatorWriter.writeAggregator(iter,
-          (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
-              AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "coordinateSuperstep: IOException while " +
-              "writing aggregators data", e);
-    }
+    aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
 
     return superstepState;
   }
@@ -1736,7 +1511,7 @@ public class BspServiceMaster<I extends 
               " succeeded ");
         }
       }
-      aggregatorWriter.close();
+      aggregatorHandler.close();
 
       if (getConfiguration().getUseNetty()) {
         commService.closeConnections();
@@ -1836,31 +1611,6 @@ public class BspServiceMaster<I extends 
     return foundEvent;
   }
 
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    return registerAggregator(name, aggregatorClass, false) != null;
-  }
-
-  @Override
-  public <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    return registerAggregator(name, aggregatorClass, true) != null;
-  }
-
-  @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
-    if (aggregator == null) {
-      throw new IllegalStateException(
-          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
-              " registered " + name);
-    }
-    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
-  }
-
   /**
    * Set values of counters to match the ones from {@link GlobalStats}
    *

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct  3 02:57:50 2012
@@ -59,7 +59,6 @@ import net.iharder.Base64;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -123,6 +122,8 @@ public class BspServiceWorker<I extends 
    * Partition store for worker (only used by the Hadoop RPC implementation).
    */
   private final PartitionStore<I, V, E, M> workerPartitionStore;
+  /** Handler for aggregators */
+  private final WorkerAggregatorHandler aggregatorHandler;
 
   /**
    * Constructor for setting up the worker.
@@ -185,6 +186,8 @@ public class BspServiceWorker<I extends 
           new SimplePartitionStore<I, V, E, M>(getConfiguration(),
               getContext());
     }
+
+    aggregatorHandler = new WorkerAggregatorHandler();
   }
 
   public WorkerContext getWorkerContext() {
@@ -700,134 +703,6 @@ public class BspServiceWorker<I extends 
     finishSuperstep(partitionStatsList);
   }
 
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
-    if (aggregator != null) {
-      ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
-    } else {
-      throw new IllegalStateException("aggregate: Tried to aggregate value " +
-          "to unregistered aggregator " + name);
-    }
-  }
-
-  /**
-   *  Marshal the aggregator values of the worker to a byte array that will
-   *  later be aggregated by master.
-   *
-   * @param superstep Superstep to marshall on
-   * @return Byte array of the aggreagtor values
-   */
-  private byte[] marshalAggregatorValues(long superstep) {
-    if (superstep == INPUT_SUPERSTEP) {
-      return new byte[0];
-    }
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    DataOutputStream output = new DataOutputStream(outputStream);
-    for (Entry<String, AggregatorWrapper<Writable>> entry :
-        getAggregatorMap().entrySet()) {
-      if (entry.getValue().isChanged()) {
-        try {
-          output.writeUTF(entry.getKey());
-          entry.getValue().getCurrentAggregatedValue().write(output);
-        } catch (IOException e) {
-          throw new IllegalStateException("Failed to marshall aggregator " +
-              "with IOException " + entry.getKey(), e);
-        }
-      }
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info(
-          "marshalAggregatorValues: Finished assembling aggregator values");
-    }
-    return outputStream.toByteArray();
-  }
-
-  /**
-   * Get values of aggregators aggregated by master in previous superstep.
-   *
-   * @param superstep Superstep to get the aggregated values from
-   */
-  private void getAggregatorValues(long superstep) {
-    // prepare aggregators for reading and next superstep
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap().values()) {
-      aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
-      aggregator.resetCurrentAggregator();
-    }
-    String mergedAggregatorPath =
-        getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-
-    byte[] aggregatorArray = null;
-    try {
-      aggregatorArray = getZkExt().getData(mergedAggregatorPath, false, null);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.info("getAggregatorValues: no aggregators in " +
-          mergedAggregatorPath + " on superstep " + superstep);
-      return;
-    } catch (KeeperException e) {
-      throw new IllegalStateException("Failed to get data for " +
-          mergedAggregatorPath + " with KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Failed to get data for " +
-          mergedAggregatorPath + " with InterruptedException", e);
-    }
-
-    DataInput input =
-        new DataInputStream(new ByteArrayInputStream(aggregatorArray));
-    int numAggregators = 0;
-
-    try {
-      numAggregators = input.readInt();
-    } catch (IOException e) {
-      throw new IllegalStateException("getAggregatorValues: " +
-          "Failed to decode data", e);
-    }
-
-    for (int i = 0; i < numAggregators; i++) {
-      try {
-        String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
-        AggregatorWrapper<Writable> aggregatorWrapper =
-            getAggregatorMap().get(aggregatorName);
-        if (aggregatorWrapper == null) {
-          try {
-            Class<? extends Aggregator<Writable>> aggregatorClass =
-                (Class<? extends Aggregator<Writable>>)
-                    Class.forName(aggregatorClassName);
-            aggregatorWrapper =
-                registerAggregator(aggregatorName, aggregatorClass, false);
-          } catch (ClassNotFoundException e) {
-            throw new IllegalStateException("Failed to create aggregator " +
-                aggregatorName + " of class " + aggregatorClassName +
-                " with ClassNotFoundException", e);
-          } catch (InstantiationException e) {
-            throw new IllegalStateException("Failed to create aggregator " +
-                aggregatorName + " of class " + aggregatorClassName +
-                " with InstantiationException", e);
-          } catch (IllegalAccessException e) {
-            throw new IllegalStateException("Failed to create aggregator " +
-                aggregatorName + " of class " + aggregatorClassName +
-                " with IllegalAccessException", e);
-          }
-        }
-        Writable aggregatorValue = aggregatorWrapper.createInitialValue();
-        aggregatorValue.readFields(input);
-        aggregatorWrapper.setPreviousAggregatedValue(aggregatorValue);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to decode data for index " + i, e);
-      }
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getAggregatorValues: Finished loading " +
-          mergedAggregatorPath);
-    }
-  }
-
   /**
    * Register the health of this worker for a given superstep
    *
@@ -967,7 +842,7 @@ public class BspServiceWorker<I extends 
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      getAggregatorValues(getSuperstep());
+      aggregatorHandler.prepareSuperstep(getSuperstep(), this);
     }
     getContext().setStatus("startSuperstep: " +
         getGraphMapper().getMapFunctions().toString() +
@@ -1017,7 +892,7 @@ public class BspServiceWorker<I extends 
     }
 
     byte[] aggregatorArray =
-        marshalAggregatorValues(getSuperstep());
+        aggregatorHandler.finishSuperstep(getSuperstep());
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, getPartitionStore());
@@ -1618,4 +1493,9 @@ public class BspServiceWorker<I extends 
   public ServerData<I, V, E, M> getServerData() {
     return commService.getServerData();
   }
+
+  @Override
+  public WorkerAggregatorUsage getAggregatorUsage() {
+    return aggregatorHandler;
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Oct  3 02:57:50 2012
@@ -119,7 +119,7 @@ public class GraphMapper<I extends Writa
    * @return Worker aggregator usage interface
    */
   public final WorkerAggregatorUsage getWorkerAggregatorUsage() {
-    return serviceWorker;
+    return serviceWorker.getAggregatorUsage();
   }
 
   /**
@@ -128,7 +128,7 @@ public class GraphMapper<I extends Writa
    * @return Master aggregator usage interface
    */
   public final MasterAggregatorUsage getMasterAggregatorUsage() {
-    return serviceMaster;
+    return serviceMaster.getAggregatorUsage();
   }
 
   public final WorkerContext getWorkerContext() {

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Wed Oct  3 02:57:50 2012
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import net.iharder.Base64;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+
+/** Master implementation of {@link AggregatorHandler} */
+public class MasterAggregatorHandler extends AggregatorHandler implements
+    MasterAggregatorUsage, Writable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(MasterAggregatorHandler.class);
+  /** Aggregator writer */
+  private final AggregatorWriter aggregatorWriter;
+
+  /**
+   * @param config Hadoop configuration
+   */
+  public MasterAggregatorHandler(Configuration config) {
+    aggregatorWriter = BspUtils.createAggregatorWriter(config);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator == null) {
+      throw new IllegalStateException(
+          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
+              " registered " + name);
+    }
+    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregatorClass, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregatorClass, true) != null;
+  }
+
+  /**
+   * Get aggregator values supplied by workers for a particular superstep and
+   * aggregate them
+   *
+   * @param superstep Superstep which we are preparing for
+   * @param service BspService to get zookeeper info from
+   */
+  public void prepareSuperstep(long superstep, BspService service) {
+    String workerFinishedPath =
+        service.getWorkerFinishedPath(
+            service.getApplicationAttempt(), superstep);
+    List<String> hostnameIdPathList = null;
+    try {
+      hostnameIdPathList =
+          service.getZkExt().getChildrenExt(
+              workerFinishedPath, false, false, true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "collectAndProcessAggregatorValues: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "collectAndProcessAggregatorValues: InterruptedException", e);
+    }
+
+    for (String hostnameIdPath : hostnameIdPathList) {
+      JSONObject workerFinishedInfoObj = null;
+      byte[] aggregatorArray = null;
+      try {
+        byte[] zkData =
+            service.getZkExt().getData(hostnameIdPath, false, null);
+        workerFinishedInfoObj = new JSONObject(new String(zkData));
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: InterruptedException",
+            e);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: JSONException", e);
+      }
+      try {
+        aggregatorArray = Base64.decode(workerFinishedInfoObj.getString(
+            service.JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY));
+      } catch (JSONException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("collectAndProcessAggregatorValues: " +
+              "No aggregators" + " for " + hostnameIdPath);
+        }
+        continue;
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: IOException", e);
+      }
+
+      DataInputStream input =
+          new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+      try {
+        while (input.available() > 0) {
+          String aggregatorName = input.readUTF();
+          AggregatorWrapper<Writable> aggregator =
+              getAggregatorMap().get(aggregatorName);
+          if (aggregator == null) {
+            throw new IllegalStateException(
+                "collectAndProcessAggregatorValues: " +
+                    "Master received aggregator which isn't registered: " +
+                    aggregatorName);
+          }
+          Writable aggregatorValue = aggregator.createInitialValue();
+          aggregatorValue.readFields(input);
+          aggregator.aggregateCurrent(aggregatorValue);
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "collectAndProcessAggregatorValues: " +
+                "IOException when reading aggregator data", e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("collectAndProcessAggregatorValues: Processed aggregators");
+    }
+
+    // prepare aggregators for master compute
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap().values()) {
+      if (aggregator.isPersistent()) {
+        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+      }
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+      aggregator.resetCurrentAggregator();
+    }
+  }
+
+  /**
+   * Save the supplied aggregator values.
+   *
+   * @param superstep Superstep which we are finishing.
+   * @param service BspService to get zookeeper info from
+   */
+  public void finishSuperstep(long superstep, BspService service) {
+    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+        getAggregatorMap();
+
+    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
+      if (aggregator.isChanged()) {
+        // if master compute changed the value, use the one he chose
+        aggregator.setPreviousAggregatedValue(
+            aggregator.getCurrentAggregatedValue());
+        // reset aggregator for the next superstep
+        aggregator.resetCurrentAggregator();
+      }
+    }
+
+    if (aggregatorMap.size() > 0) {
+      String mergedAggregatorPath =
+          service.getMergedAggregatorPath(
+              service.getApplicationAttempt(),
+              superstep);
+
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      DataOutput output = new DataOutputStream(outputStream);
+      try {
+        output.writeInt(aggregatorMap.size());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+          aggregatorMap.entrySet()) {
+        try {
+          output.writeUTF(entry.getKey());
+          output.writeUTF(entry.getValue().getAggregatorClass().getName());
+          entry.getValue().getPreviousAggregatedValue().write(output);
+        } catch (IOException e) {
+          throw new IllegalStateException("saveAggregatorValues: " +
+              "IllegalStateException", e);
+        }
+      }
+
+      try {
+        service.getZkExt().createExt(mergedAggregatorPath,
+            outputStream.toByteArray(),
+            ZooDefs.Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT,
+            true);
+      } catch (KeeperException.NodeExistsException e) {
+        LOG.warn("saveAggregatorValues: " +
+            mergedAggregatorPath + " already exists!");
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "saveAggregatorValues: KeeperException", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "saveAggregatorValues: IllegalStateException",
+            e);
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("saveAggregatorValues: Finished loading " +
+            mergedAggregatorPath);
+      }
+    }
+  }
+
+  /**
+   * Write aggregators to {@link AggregatorWriter}
+   *
+   * @param superstep Superstep which just finished
+   * @param superstepState State of the superstep which just finished
+   */
+  public void writeAggregators(long superstep,
+      SuperstepState superstepState) {
+    try {
+      Iterable<Map.Entry<String, Writable>> iter =
+          Iterables.transform(
+              getAggregatorMap().entrySet(),
+              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
+                  Map.Entry<String, Writable>>() {
+                @Override
+                public Map.Entry<String, Writable> apply(
+                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
+                  return new AbstractMap.SimpleEntry<String,
+                      Writable>(entry.getKey(),
+                      entry.getValue().getPreviousAggregatedValue());
+                }
+              });
+      aggregatorWriter.writeAggregator(iter,
+          (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
+              AggregatorWriter.LAST_SUPERSTEP : superstep);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "coordinateSuperstep: IOException while " +
+              "writing aggregators data", e);
+    }
+  }
+
+  /**
+   * Initialize {@link AggregatorWriter}
+   *
+   * @param service BspService
+   */
+  public void initialize(BspService service) {
+    try {
+      aggregatorWriter.initialize(service.getContext(),
+          service.getApplicationAttempt());
+    } catch (IOException e) {
+      throw new IllegalStateException("MasterAggregatorHandler: " +
+          "Couldn't initialize aggregatorWriter", e);
+    }
+  }
+
+  /**
+   * Close {@link AggregatorWriter}
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    aggregatorWriter.close();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+        getAggregatorMap();
+    out.writeInt(aggregatorMap.size());
+    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+        aggregatorMap.entrySet()) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue().getAggregatorClass().getName());
+      out.writeBoolean(entry.getValue().isPersistent());
+      entry.getValue().getPreviousAggregatedValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+        getAggregatorMap();
+    aggregatorMap.clear();
+    int numAggregators = in.readInt();
+    for (int i = 0; i < numAggregators; i++) {
+      String aggregatorName = in.readUTF();
+      String aggregatorClassName = in.readUTF();
+      boolean isPersistent = in.readBoolean();
+      AggregatorWrapper<Writable> aggregator =
+          registerAggregator(aggregatorName, aggregatorClassName,
+              isPersistent);
+      Writable value = aggregator.createInitialValue();
+      value.readFields(in);
+      aggregator.setPreviousAggregatedValue(value);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1393264&r1=1393263&r2=1393264&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Wed Oct  3 02:57:50 2012
@@ -295,7 +295,7 @@ public abstract class Vertex<I extends W
    *
    * @param graphState Graph state for all workers
    */
-  void setGraphState(GraphState<I, V, E, M> graphState) {
+  public void setGraphState(GraphState<I, V, E, M> graphState) {
     this.graphState = graphState;
   }
 

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java Wed Oct  3 02:57:50 2012
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Worker implementation of {@link AggregatorHandler}
+ */
+public class WorkerAggregatorHandler extends AggregatorHandler implements
+    WorkerAggregatorUsage {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(WorkerAggregatorHandler.class);
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    AggregatorWrapper<? extends Writable> aggregator = getAggregator(name);
+    if (aggregator != null) {
+      ((AggregatorWrapper<A>) aggregator).aggregateCurrent(value);
+    } else {
+      throw new IllegalStateException("aggregate: Tried to aggregate value " +
+          "to unregistered aggregator " + name);
+    }
+  }
+
+  /**
+   * Get aggregator values aggregated by master in previous superstep
+   *
+   * @param superstep Superstep which we are preparing for
+   * @param service BspService to get zookeeper info from
+   */
+  public void prepareSuperstep(long superstep, BspService service) {
+    // prepare aggregators for reading and next superstep
+    for (AggregatorWrapper<Writable> aggregator :
+        getAggregatorMap().values()) {
+      aggregator.setPreviousAggregatedValue(aggregator.createInitialValue());
+      aggregator.resetCurrentAggregator();
+    }
+    String mergedAggregatorPath =
+        service.getMergedAggregatorPath(service.getApplicationAttempt(),
+            superstep - 1);
+
+    byte[] aggregatorArray;
+    try {
+      aggregatorArray =
+          service.getZkExt().getData(mergedAggregatorPath, false, null);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("getAggregatorValues: no aggregators in " +
+          mergedAggregatorPath + " on superstep " + superstep);
+      return;
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to get data for " +
+          mergedAggregatorPath + " with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Failed to get data for " +
+          mergedAggregatorPath + " with InterruptedException", e);
+    }
+
+    DataInput input =
+        new DataInputStream(new ByteArrayInputStream(aggregatorArray));
+    int numAggregators = 0;
+
+    try {
+      numAggregators = input.readInt();
+    } catch (IOException e) {
+      throw new IllegalStateException("getAggregatorValues: " +
+          "Failed to decode data", e);
+    }
+
+    for (int i = 0; i < numAggregators; i++) {
+      try {
+        String aggregatorName = input.readUTF();
+        String aggregatorClassName = input.readUTF();
+        AggregatorWrapper<Writable> aggregator =
+            registerAggregator(aggregatorName, aggregatorClassName, false);
+        Writable aggregatorValue = aggregator.createInitialValue();
+        aggregatorValue.readFields(input);
+        aggregator.setPreviousAggregatedValue(aggregatorValue);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "Failed to decode data for index " + i, e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getAggregatorValues: Finished loading " +
+          mergedAggregatorPath);
+    }
+  }
+
+  /**
+   * Put aggregator values of the worker to a byte array that will later be
+   * aggregated by master.
+   *
+   * @param superstep Superstep which we are finishing.
+   * @return Byte array of the aggreagtor values
+   */
+  public byte[] finishSuperstep(long superstep) {
+    if (superstep == BspService.INPUT_SUPERSTEP) {
+      return new byte[0];
+    }
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(outputStream);
+    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
+        getAggregatorMap().entrySet()) {
+      if (entry.getValue().isChanged()) {
+        try {
+          output.writeUTF(entry.getKey());
+          entry.getValue().getCurrentAggregatedValue().write(output);
+        } catch (IOException e) {
+          throw new IllegalStateException("Failed to marshall aggregator " +
+              "with IOException " + entry.getKey(), e);
+        }
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info(
+          "marshalAggregatorValues: Finished assembling aggregator values");
+    }
+    return outputStream.toByteArray();
+  }
+}

Added: giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java?rev=1393264&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java Wed Oct  3 02:57:50 2012
@@ -0,0 +1,162 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.examples.AggregatorsTestVertex;
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Tests if aggregators are handled on a proper way */
+public class TestAggregatorsHandling extends BspCase {
+
+  public TestAggregatorsHandling() {
+    super(TestAggregatorsHandling.class.getName());
+  }
+
+  /** Tests if aggregators are handled on a proper way during supersteps */
+  @Test
+  public void testAggregatorsHandling() throws IOException,
+      ClassNotFoundException, InterruptedException {
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        AggregatorsTestVertex.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    job.getConfiguration().setMasterComputeClass(
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class);
+    assertTrue(job.run(true));
+  }
+
+  /** Test if aggregators serialization captures everything */
+  @Test
+  public void testMasterAggregatorsSerialization() throws
+      IllegalAccessException, InstantiationException, IOException {
+    MasterAggregatorHandler handler =
+        new MasterAggregatorHandler(new Configuration());
+
+    String regularAggName = "regular";
+    LongWritable regularValue = new LongWritable(5);
+    handler.registerAggregator(regularAggName, LongSumAggregator.class);
+    handler.setAggregatedValue(regularAggName, regularValue);
+
+    String persistentAggName = "persistent";
+    DoubleWritable persistentValue = new DoubleWritable(10.5);
+    handler.registerPersistentAggregator(persistentAggName,
+        DoubleOverwriteAggregator.class);
+    handler.setAggregatedValue(persistentAggName, persistentValue);
+
+    for (AggregatorWrapper<Writable> aggregator :
+        handler.getAggregatorMap().values()) {
+      aggregator.setPreviousAggregatedValue(
+          aggregator.getCurrentAggregatedValue());
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    handler.write(new DataOutputStream(out));
+
+    MasterAggregatorHandler restartedHandler =
+        new MasterAggregatorHandler(new Configuration());
+    restartedHandler.readFields(
+        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
+
+    assertEquals(2, restartedHandler.getAggregatorMap().size());
+
+    AggregatorWrapper<Writable> regularAgg =
+        restartedHandler.getAggregatorMap().get(regularAggName);
+    assertTrue(
+        regularAgg.getAggregatorClass().equals(LongSumAggregator.class));
+    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
+    assertEquals(regularValue,
+        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
+    assertFalse(regularAgg.isPersistent());
+
+    AggregatorWrapper<Writable> persistentAgg =
+        restartedHandler.getAggregatorMap().get(persistentAggName);
+    assertTrue(persistentAgg.getAggregatorClass().equals
+        (DoubleOverwriteAggregator.class));
+    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
+    assertEquals(persistentValue,
+        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
+    assertTrue(persistentAgg.isPersistent());
+  }
+
+  /**
+   * Test if aggregators are are handled properly when restarting from a
+   * checkpoint
+   */
+  @Test
+  public void testAggregatorsCheckpointing() throws ClassNotFoundException,
+      IOException, InterruptedException {
+    Path checkpointsDir = getTempPath("checkPointsForTesting");
+    Path outputPath = getTempPath(getCallingMethodName());
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        AggregatorsTestVertex.class,
+        null,
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+        null,
+        outputPath);
+
+    job.getConfiguration().set(GiraphConfiguration.CHECKPOINT_DIRECTORY,
+        checkpointsDir.toString());
+    job.getConfiguration().setBoolean(
+        GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.getConfiguration().setInt(GiraphConfiguration.CHECKPOINT_FREQUENCY, 4);
+
+    assertTrue(job.run(true));
+
+    // Restart the test from superstep 4
+    System.out.println("testAggregatorsCheckpointing: Restarting from " +
+        "superstep 4 with checkpoint path = " + checkpointsDir);
+    outputPath = getTempPath(getCallingMethodName() + "Restarted");
+    GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
+        AggregatorsTestVertex.class,
+        null,
+        AggregatorsTestVertex.AggregatorsTestMasterCompute.class,
+        SimplePageRankVertex.SimplePageRankVertexInputFormat.class,
+        null,
+        outputPath);
+    job.getConfiguration().setMasterComputeClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+    restartedJob.getConfiguration().set(
+        GiraphConfiguration.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+    restartedJob.getConfiguration().setLong(
+        GiraphConfiguration.RESTART_SUPERSTEP, 4);
+
+    assertTrue(restartedJob.run(true));
+  }
+}