You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/10/10 00:10:21 UTC

[1/3] Reduce/broadcast API

Repository: giraph
Updated Branches:
  refs/heads/trunk 61db68912 -> f43f45009


http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
new file mode 100644
index 0000000..9f821b4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Object responsible for performing reducing operation.
+ * Simple wrapper of ReduceOperation object and current value holding
+ * partially reduced result.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public class Reducer<S, R extends Writable> implements Writable {
+  /** Reduce operations */
+  private ReduceOperation<S, R> reduceOp;
+  /** Current (partially) reduced value*/
+  private R currentValue;
+
+  /**
+   * Constructor
+   */
+  public Reducer() {
+  }
+  /**
+   * Constructor
+   * @param reduceOp Reduce operations
+   */
+  public Reducer(ReduceOperation<S, R> reduceOp) {
+    this.reduceOp = reduceOp;
+    this.currentValue = reduceOp.createInitialValue();
+  }
+  /**
+   * Constructor
+   * @param reduceOp Reduce operations
+   * @param currentValue current reduced value
+   */
+  public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) {
+    this.reduceOp = reduceOp;
+    this.currentValue = currentValue;
+  }
+
+  /**
+   * Reduce given value into current reduced value.
+   * @param valueToReduce Single value to reduce
+   */
+  public void reduceSingle(S valueToReduce) {
+    reduceOp.reduceSingle(currentValue, valueToReduce);
+  }
+  /**
+   * Reduce given partially reduced value into current reduced value.
+   * @param valueToReduce Partial value to reduce
+   */
+  public void reducePartial(R valueToReduce) {
+    reduceOp.reducePartial(currentValue, valueToReduce);
+  }
+  /**
+   * Return new initial reduced value.
+   * @return New initial reduced value
+   */
+  public R createInitialValue() {
+    return reduceOp.createInitialValue();
+  }
+
+  public ReduceOperation<S, R> getReduceOp() {
+    return reduceOp;
+  }
+
+  public R getCurrentValue() {
+    return currentValue;
+  }
+
+  public void setCurrentValue(R currentValue) {
+    this.currentValue = currentValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeWritableObject(reduceOp, out);
+    currentValue.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reduceOp = WritableUtils.readWritableObject(in, null);
+    currentValue = reduceOp.createInitialValue();
+    currentValue.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
new file mode 100644
index 0000000..eeefdeb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph reducers.
+ */
+package org.apache.giraph.reducers;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 5e046cc..3d654b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,8 +17,12 @@
  */
 package org.apache.giraph.utils;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -29,6 +33,7 @@ import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -57,11 +62,8 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 
 /**
  * Translate command line args into Configuration Key-Value pairs.
@@ -147,6 +149,10 @@ public final class ConfigurationUtils {
       ImmutableClassesGiraphConfiguration configuration) {
     if (configuration != null) {
       configuration.configureIfPossible(object);
+    } else if (object instanceof GiraphConfigurationSettable) {
+      throw new IllegalArgumentException(
+          "Trying to configure configurable object without value, " +
+          object.getClass());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3c5cbad..923d369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -739,4 +739,38 @@ public class WritableUtils {
     }
   }
 
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param reusableOut Reusable output stream to serialize into
+   * @param reusableIn Reusable input stream to deserialize out of
+   * @param original Original value of which to make a copy
+   * @param <T> Type of the object
+   * @return Copy of the original value
+   */
+  public static <T extends Writable> T createCopy(
+      UnsafeByteArrayOutputStream reusableOut,
+      UnsafeReusableByteArrayInput reusableIn, T original) {
+    T copy = (T) createWritable(original.getClass(), null);
+
+    try {
+      reusableOut.reset();
+      original.write(reusableOut);
+      reusableIn.initialize(
+          reusableOut.getByteArray(), 0, reusableOut.getPos());
+      copy.readFields(reusableIn);
+
+      if (reusableIn.available() != 0) {
+        throw new RuntimeException("Serialization of " +
+            original.getClass() + " encountered issues, " +
+            reusableIn.available() + " bytes left to be read");
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "IOException occurred while trying to create a copy " +
+          original.getClass(), e);
+    }
+    return copy;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 120678f..f61e817 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -18,6 +18,27 @@
 
 package org.apache.giraph.worker;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -66,10 +87,10 @@ import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -96,26 +117,6 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
@@ -162,10 +163,10 @@ public class BspServiceWorker<I extends WritableComparable,
   private final WorkerContext workerContext;
 
   /** Handler for aggregators */
-  private final WorkerAggregatorHandler aggregatorHandler;
+  private final WorkerAggregatorHandler globalCommHandler;
 
   /** Superstep output */
-  private SuperstepOutput<I, V, E> superstepOutput;
+  private final SuperstepOutput<I, V, E> superstepOutput;
 
   /** array of observers to call back to */
   private final WorkerObserver[] observers;
@@ -212,10 +213,10 @@ public class BspServiceWorker<I extends WritableComparable,
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
 
-    aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+    globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
 
     workerContext = conf.createWorkerContext();
-    workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+    workerContext.setWorkerGlobalCommUsage(globalCommHandler);
 
     superstepOutput = conf.createSuperstepOutput(context);
 
@@ -584,7 +585,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     // Initialize aggregator at worker side during setup.
     // Do this just before vertex and edge loading.
-    aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 
     VertexEdgeCount vertexEdgeCount;
     long entriesLoaded;
@@ -895,7 +896,7 @@ public class BspServiceWorker<I extends WritableComparable,
       postSuperstepCallbacks();
     }
 
-    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+    globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
 
     MessageStore<I, Writable> incomingMessageStore =
         getServerData().getIncomingMessageStore();
@@ -1920,15 +1921,16 @@ else[HADOOP_NON_SECURE]*/
     return workerServer.getServerData();
   }
 
+
   @Override
   public WorkerAggregatorHandler getAggregatorHandler() {
-    return aggregatorHandler;
+    return globalCommHandler;
   }
 
   @Override
   public void prepareSuperstep() {
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+      globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 35ad94b..89f74b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.VertexEdgeCount;
@@ -37,8 +39,6 @@ import org.apache.log4j.Logger;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
-import java.io.IOException;
-
 /**
  * Load as many edge input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -62,7 +62,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       EdgeInputSplitsCallable.class);
 
   /** Aggregator handler */
-  private final WorkerThreadAggregatorUsage aggregatorUsage;
+  private final WorkerThreadGlobalCommUsage globalCommUsage;
   /** Bsp service worker (only use thread-safe methods) */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Edge input format */
@@ -105,7 +105,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     this.bspServiceWorker = bspServiceWorker;
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
     // Initialize aggregator usage.
-    this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
+    this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
       .newThreadAggregatorUsage();
     edgeInputFilter = configuration.getEdgeInputFilter();
     canEmbedInIds = bspServiceWorker
@@ -147,7 +147,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
 
     edgeReader.initialize(inputSplit, context);
     // Set aggregator usage to edge reader
-    edgeReader.setWorkerAggregatorUse(aggregatorUsage);
+    edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     long inputSplitEdgesLoaded = 0;
     long inputSplitEdgesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index a2279a9..f6dca25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -18,21 +18,21 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.MappingReader;
-import org.apache.giraph.mapping.MappingStore;
 import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.mapping.MappingStore;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Load as many mapping input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -89,11 +89,11 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
         mappingInputFormat.createMappingReader(inputSplit, context);
     mappingReader.setConf(configuration);
 
-    WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+    WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
         .getAggregatorHandler().newThreadAggregatorUsage();
 
     mappingReader.initialize(inputSplit, context);
-    mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+    mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     int entriesLoaded = 0;
     MappingStore<I, B> mappingStore =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 4c85765..00a2781 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
@@ -42,8 +44,6 @@ import org.apache.log4j.Logger;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
-import java.io.IOException;
-
 /**
  * Load as many vertex input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -79,7 +79,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * Whether the chosen {@link OutEdges} implementation allows for Edge
    * reuse.
    */
-  private boolean reuseEdgeObjects;
+  private final boolean reuseEdgeObjects;
   /** Used to translate Edges during vertex input phase based on localData */
   private final TranslateEdge<I, E> translateEdge;
 
@@ -152,13 +152,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(configuration);
 
-    WorkerThreadAggregatorUsage aggregatorUsage =
+    WorkerThreadGlobalCommUsage globalCommUsage =
       this.bspServiceWorker
         .getAggregatorHandler().newThreadAggregatorUsage();
 
     vertexReader.initialize(inputSplit, context);
     // Set aggregator usage to vertex reader
-    vertexReader.setWorkerAggregatorUse(aggregatorUsage);
+    vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     long inputSplitVerticesLoaded = 0;
     long inputSplitVerticesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
new file mode 100644
index 0000000..5238a07
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Class for delegating WorkerAggregatorUsage and
+ * WorkerGlobalCommUsage methods to corresponding interface.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements WorkerAggregatorUsage, WorkerGlobalCommUsage {
+
+  /** Worker aggregator usage */
+  private WorkerGlobalCommUsage workerGlobalCommUsage;
+
+  /**
+   * Set worker global communication usage
+   *
+   * @param workerGlobalCommUsage Worker global communication usage
+   */
+  public void setWorkerGlobalCommUsage(
+      WorkerGlobalCommUsage workerGlobalCommUsage) {
+    this.workerGlobalCommUsage = workerGlobalCommUsage;
+  }
+
+  @Override
+  public final void reduce(String name, Object value) {
+    workerGlobalCommUsage.reduce(name, value);
+  }
+
+  @Override
+  public final <B extends Writable> B getBroadcast(String name) {
+    return workerGlobalCommUsage.getBroadcast(name);
+  }
+
+  @Override
+  public final <A extends Writable> void aggregate(String name, A value) {
+    reduce(name, value);
+  }
+
+  @Override
+  public final <A extends Writable> A getAggregatedValue(String name) {
+    return this.<A>getBroadcast(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 45ca665..05a13a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -15,22 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.worker;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.Factory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -38,35 +42,18 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-/**
- * Handler for aggregators on worker. Provides the aggregated values and
- * performs aggregations from user vertex code (thread-safe). Also has
- * methods for all superstep coordination related to aggregators.
- *
- * At the beginning of any superstep any worker calls prepareSuperstep(),
- * which blocks until the final aggregates from the previous superstep have
- * been delivered to the worker.
- * Next, during the superstep worker can call aggregate() and
- * getAggregatedValue() (both methods are thread safe) the former
- * computes partial aggregates for this superstep from the worker,
- * the latter returns (read-only) final aggregates from the previous superstep.
- * Finally, at the end of the superstep, the worker calls finishSuperstep(),
- * which propagates non-owned partial aggregates to the owner workers,
- * and sends the final aggregate from the owner worker to the master.
- */
-public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
+/** Handler for reduce/broadcast on the workers */
+public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(WorkerAggregatorHandler.class);
-  /** Map of values from previous superstep */
-  private final Map<String, Writable> previousAggregatedValueMap =
+  /** Map of broadcasted values */
+  private final Map<String, Writable> broadcastedMap =
       Maps.newHashMap();
-  /** Map of aggregator factories for current superstep */
-  private final Map<String, Factory<Aggregator<Writable>>>
-  currentAggregatorFactoryMap = Maps.newHashMap();
-  /** Map of aggregators for current superstep */
-  private final Map<String, Aggregator<Writable>> currentAggregatorMap =
+  /** Map of reducers currently being reduced */
+  private final Map<String, Reducer<Object, Writable>> reducerMap =
       Maps.newHashMap();
+
   /** Service worker */
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Progressable for reporting progress */
@@ -96,29 +83,48 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
-    if (aggregator != null) {
+  public <B extends Writable> B getBroadcast(String name) {
+    B value = (B) broadcastedMap.get(name);
+    if (value == null) {
+      LOG.warn("getBroadcast: " +
+          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+              broadcastedMap.size() != 0, conf));
+    }
+    return value;
+  }
+
+  @Override
+  public void reduce(String name, Object value) {
+    Reducer<Object, Writable> reducer = reducerMap.get(name);
+    if (reducer != null) {
       progressable.progress();
-      synchronized (aggregator) {
-        aggregator.aggregate(value);
+      synchronized (reducer) {
+        reducer.reduceSingle(value);
       }
     } else {
-      throw new IllegalStateException("aggregate: " +
+      throw new IllegalStateException("reduce: " +
           AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              currentAggregatorMap.size() != 0, conf));
+              reducerMap.size() != 0, conf));
     }
   }
 
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    A value = (A) previousAggregatedValueMap.get(name);
-    if (value == null) {
-      LOG.warn("getAggregatedValue: " +
+  /**
+   * Combine partially reduced value into currently reduced value.
+   * @param name Name of the reducer
+   * @param valueToReduce Partial value to reduce
+   */
+  protected void reducePartial(String name, Writable valueToReduce) {
+    Reducer<Object, Writable> reducer = reducerMap.get(name);
+    if (reducer != null) {
+      progressable.progress();
+      synchronized (reducer) {
+        reducer.reducePartial(valueToReduce);
+      }
+    } else {
+      throw new IllegalStateException("reduce: " +
           AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              previousAggregatedValueMap.size() != 0, conf));
+              reducerMap.size() != 0, conf));
     }
-    return value;
   }
 
   /**
@@ -128,53 +134,35 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    */
   public void prepareSuperstep(
       WorkerAggregatorRequestProcessor requestProcessor) {
+    broadcastedMap.clear();
+    reducerMap.clear();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Start preparing aggregators");
     }
-    AllAggregatorServerData allAggregatorData =
+    AllAggregatorServerData allGlobalCommData =
         serviceWorker.getServerData().getAllAggregatorData();
     // Wait for my aggregators
     Iterable<byte[]> dataToDistribute =
-        allAggregatorData.getDataFromMasterWhenReady(
+        allGlobalCommData.getDataFromMasterWhenReady(
             serviceWorker.getMasterInfo());
     try {
       // Distribute my aggregators
-      requestProcessor.distributeAggregators(dataToDistribute);
+      requestProcessor.distributeReducedValues(dataToDistribute);
     } catch (IOException e) {
       throw new IllegalStateException("prepareSuperstep: " +
           "IOException occurred while trying to distribute aggregators", e);
     }
     // Wait for all other aggregators and store them
-    allAggregatorData.fillNextSuperstepMapsWhenReady(
-        getOtherWorkerIdsSet(), previousAggregatedValueMap,
-        currentAggregatorFactoryMap);
-    fillAndInitAggregatorsMap(currentAggregatorMap);
-    allAggregatorData.reset();
+    allGlobalCommData.fillNextSuperstepMapsWhenReady(
+        getOtherWorkerIdsSet(), broadcastedMap,
+        reducerMap);
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Aggregators prepared");
     }
   }
 
   /**
-   * Fills aggregators map from currentAggregatorFactoryMap.
-   * All aggregators in this map will be set to initial value.
-   * @param aggregatorMap Map to fill.
-   */
-  private void fillAndInitAggregatorsMap(
-      Map<String, Aggregator<Writable>> aggregatorMap) {
-    for (Map.Entry<String, Factory<Aggregator<Writable>>> entry :
-        currentAggregatorFactoryMap.entrySet()) {
-      Aggregator<Writable> aggregator =
-          aggregatorMap.get(entry.getKey());
-      if (aggregator == null) {
-        aggregatorMap.put(entry.getKey(), entry.getValue().create());
-      } else {
-        aggregator.reset();
-      }
-    }
-  }
-
-  /**
    * Send aggregators to their owners and in the end to the master
    *
    * @param requestProcessor Request processor for aggregators
@@ -186,19 +174,19 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
           "workers will send their aggregated values " +
           "once they are done with superstep computation");
     }
-    OwnerAggregatorServerData ownerAggregatorData =
+    OwnerAggregatorServerData ownerGlobalCommData =
         serviceWorker.getServerData().getOwnerAggregatorData();
     // First send partial aggregated values to their owners and determine
     // which aggregators belong to this worker
-    for (Map.Entry<String, Aggregator<Writable>> entry :
-        currentAggregatorMap.entrySet()) {
+    for (Map.Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
       try {
-        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
-            entry.getValue().getAggregatedValue());
+        boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
+            entry.getValue().getCurrentValue());
         if (!sent) {
           // If it's my aggregator, add it directly
-          ownerAggregatorData.aggregate(entry.getKey(),
-              entry.getValue().getAggregatedValue());
+          ownerGlobalCommData.reduce(entry.getKey(),
+              entry.getValue().getCurrentValue());
         }
       } catch (IOException e) {
         throw new IllegalStateException("finishSuperstep: " +
@@ -216,20 +204,21 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     }
 
     // Wait to receive partial aggregated values from all other workers
-    Iterable<Map.Entry<String, Writable>> myAggregators =
-        ownerAggregatorData.getMyAggregatorValuesWhenReady(
+    Iterable<Map.Entry<String, Writable>> myReducedValues =
+        ownerGlobalCommData.getMyReducedValuesWhenReady(
             getOtherWorkerIdsSet());
 
     // Send final aggregated values to master
-    AggregatedValueOutputStream aggregatorOutput =
-        new AggregatedValueOutputStream();
-    for (Map.Entry<String, Writable> entry : myAggregators) {
+    GlobalCommValueOutputStream globalOutput =
+        new GlobalCommValueOutputStream(false);
+    for (Map.Entry<String, Writable> entry : myReducedValues) {
       try {
-        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+        int currentSize = globalOutput.addValue(entry.getKey(),
+            GlobalCommType.REDUCED_VALUE,
             entry.getValue());
         if (currentSize > maxBytesPerAggregatorRequest) {
-          requestProcessor.sendAggregatedValuesToMaster(
-              aggregatorOutput.flush());
+          requestProcessor.sendReducedValuesToMaster(
+              globalOutput.flush());
         }
         progressable.progress();
       } catch (IOException e) {
@@ -239,7 +228,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
       }
     }
     try {
-      requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+      requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
     } catch (IOException e) {
       throw new IllegalStateException("finishSuperstep: " +
           "IOException occured while sending aggregators to master", e);
@@ -247,7 +236,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     // Wait for master to receive aggregated values before proceeding
     serviceWorker.getWorkerClient().waitAllRequests();
 
-    ownerAggregatorData.reset();
+    ownerGlobalCommData.reset();
     if (LOG.isDebugEnabled()) {
       LOG.debug("finishSuperstep: Aggregators finished");
     }
@@ -259,9 +248,9 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    *
    * @return New aggregator usage
    */
-  public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+  public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
     if (AggregatorUtils.useThreadLocalAggregators(conf)) {
-      return new ThreadLocalWorkerAggregatorUsage();
+      return new ThreadLocalWorkerGlobalCommUsage();
     } else {
       return this;
     }
@@ -290,56 +279,70 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   /**
-   * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
-   * We can use one instance of this object per thread to prevent
-   * synchronizing on each aggregate() call. In the end of superstep,
-   * values from each of these will be aggregated back to {@link
-   * WorkerAggregatorHandler}
-   */
-  public class ThreadLocalWorkerAggregatorUsage
-      implements WorkerThreadAggregatorUsage {
-    /** Thread-local aggregator map */
-    private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+  * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}.
+  * We can use one instance of this object per thread to prevent
+  * synchronizing on each aggregate() call. In the end of superstep,
+  * values from each of these will be aggregated back to {@link
+  * WorkerThreadGlobalCommUsage}
+  */
+  public class ThreadLocalWorkerGlobalCommUsage
+    implements WorkerThreadGlobalCommUsage {
+    /** Thread-local reducer map */
+    private final Map<String, Reducer<Object, Writable>> threadReducerMap;
 
     /**
-     * Constructor
-     *
-     * Creates new instances of all aggregators from
-     * {@link WorkerAggregatorHandler}
-     */
-    public ThreadLocalWorkerAggregatorUsage() {
-      threadAggregatorMap = Maps.newHashMapWithExpectedSize(
-          WorkerAggregatorHandler.this.currentAggregatorMap.size());
-      fillAndInitAggregatorsMap(threadAggregatorMap);
+    * Constructor
+    *
+    * Creates new instances of all reducers from
+    * {@link WorkerAggregatorHandler}
+    */
+    public ThreadLocalWorkerGlobalCommUsage() {
+      threadReducerMap = Maps.newHashMapWithExpectedSize(
+          WorkerAggregatorHandler.this.reducerMap.size());
+
+      UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+      UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
+
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          reducerMap.entrySet()) {
+        ReduceOperation<Object, Writable> globalReduceOp =
+            entry.getValue().getReduceOp();
+
+        ReduceOperation<Object, Writable> threadLocalCopy =
+            WritableUtils.createCopy(out, in, globalReduceOp);
+
+        threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
+      }
     }
 
     @Override
-    public <A extends Writable> void aggregate(String name, A value) {
-      Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
-      if (aggregator != null) {
+    public void reduce(String name, Object value) {
+      Reducer<Object, Writable> reducer = threadReducerMap.get(name);
+      if (reducer != null) {
         progressable.progress();
-        aggregator.aggregate(value);
+        reducer.reduceSingle(value);
       } else {
-        throw new IllegalStateException("aggregate: " +
+        throw new IllegalStateException("reduce: " +
             AggregatorUtils.getUnregisteredAggregatorMessage(name,
-                threadAggregatorMap.size() != 0, conf));
+                threadReducerMap.size() != 0, conf));
       }
     }
 
     @Override
-    public <A extends Writable> A getAggregatedValue(String name) {
-      return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+    public <B extends Writable> B getBroadcast(String name) {
+      return WorkerAggregatorHandler.this.getBroadcast(name);
     }
 
     @Override
     public void finishThreadComputation() {
       // Aggregate the values this thread's vertices provided back to
       // WorkerAggregatorHandler
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-          threadAggregatorMap.entrySet()) {
-        WorkerAggregatorHandler.this.aggregate(entry.getKey(),
-            entry.getValue().getAggregatedValue());
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          threadReducerMap.entrySet()) {
+        WorkerAggregatorHandler.this.reducePartial(entry.getKey(),
+            entry.getValue().getCurrentValue());
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 7a55d56..b977ea1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -18,30 +18,28 @@
 
 package org.apache.giraph.worker;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
 /**
  * WorkerContext allows for the execution of user code
  * on a per-worker basis. There's one WorkerContext per worker.
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext
-  extends DefaultImmutableClassesGiraphConfigurable
-  implements WorkerAggregatorUsage, Writable {
+  extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
+  implements Writable {
   /** Global graph state */
   private GraphState graphState;
-  /** Worker aggregator usage */
-  private WorkerAggregatorUsage workerAggregatorUsage;
 
   /** Service worker */
   private CentralizedServiceWorker serviceWorker;
@@ -71,16 +69,6 @@ public abstract class WorkerContext
   }
 
   /**
-   * Set worker aggregator usage
-   *
-   * @param workerAggregatorUsage Worker aggregator usage
-   */
-  public void setWorkerAggregatorUsage(
-      WorkerAggregatorUsage workerAggregatorUsage) {
-    this.workerAggregatorUsage = workerAggregatorUsage;
-  }
-
-  /**
    * Initialize the WorkerContext.
    * This method is executed once on each Worker before the first
    * superstep starts.
@@ -196,16 +184,6 @@ public abstract class WorkerContext
     return graphState.getContext();
   }
 
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
-
   /**
    * Call this to log a line to command line of the job. Use in moderation -
    * it's a synchronous call to Job client

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
new file mode 100644
index 0000000..39566f5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Methods on worker can access broadcasted values and provide
+ * values to reduce through this interface
+ */
+public interface WorkerGlobalCommUsage {
+  /**
+   * Reduce given value.
+   * @param name Name of the reducer
+   * @param value Single value to reduce
+   */
+  void reduce(String name, Object value);
+  /**
+   * Get value broadcasted from master
+   * @param name Name of the broadcasted value
+   * @return Broadcasted value
+   * @param <B> Broadcast value type
+   */
+  <B extends Writable> B getBroadcast(String name);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
deleted file mode 100644
index 194127e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-/**
- * {@link WorkerAggregatorUsage} which can be used in each of the
- * computation threads.
- */
-public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
-  /**
-   * Call this after thread's computation is finished,
-   * i.e. when all vertices have provided their values to aggregators
-   */
-  void finishThreadComputation();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
new file mode 100644
index 0000000..8edbdc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadGlobalCommUsage extends WorkerGlobalCommUsage {
+  /**
+   * Call this after thread's computation is finished,
+   * i.e. when all vertices have provided their values to aggregators
+   */
+  void finishThreadComputation();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 488e1ea..26459c0 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -18,6 +18,24 @@
 
 package org.apache.giraph;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.SimpleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -62,24 +80,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Unit test for many simple BSP applications.
  */
@@ -456,8 +456,8 @@ public class
           assertEquals(maxSuperstep + 2, maxValues.size());
           assertEquals(maxSuperstep + 2, vertexCounts.size());
 
-          assertEquals(maxPageRank, (double) maxValues.get(maxSuperstep), 0d);
-          assertEquals(minPageRank, (double) minValues.get(maxSuperstep), 0d);
+          assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d);
+          assertEquals(minPageRank, minValues.get(maxSuperstep), 0d);
           assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
 
         } finally {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index eb3f686..602ab32 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -18,34 +18,19 @@
 
 package org.apache.giraph.aggregators;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 import org.apache.giraph.BspCase;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.examples.AggregatorsTestComputation;
 import org.apache.giraph.examples.SimpleCheckpoint;
 import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /** Tests if aggregators are handled on a proper way */
 public class TestAggregatorsHandling extends BspCase {
@@ -54,21 +39,6 @@ public class TestAggregatorsHandling extends BspCase {
     super(TestAggregatorsHandling.class.getName());
   }
 
-  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
-      (MasterAggregatorHandler aggregatorHandler) {
-    try {
-      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
-          ("aggregatorMap");
-      aggregtorMapField.setAccessible(true);
-      return (Map<String, AggregatorWrapper<Writable>>)
-          aggregtorMapField.get(aggregatorHandler);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException(e);
-    } catch (NoSuchFieldException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
   /** Tests if aggregators are handled on a proper way during supersteps */
   @Test
   public void testAggregatorsHandling() throws IOException,
@@ -88,64 +58,6 @@ public class TestAggregatorsHandling extends BspCase {
     assertTrue(job.run(true));
   }
 
-  /** Test if aggregators serialization captures everything */
-  @Test
-  public void testMasterAggregatorsSerialization() throws
-      IllegalAccessException, InstantiationException, IOException {
-    ImmutableClassesGiraphConfiguration conf =
-        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
-    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
-        TextAggregatorWriter.class);
-    Progressable progressable = Mockito.mock(Progressable.class);
-    MasterAggregatorHandler handler =
-        new MasterAggregatorHandler(conf, progressable);
-
-    String regularAggName = "regular";
-    LongWritable regularValue = new LongWritable(5);
-    handler.registerAggregator(regularAggName, LongSumAggregator.class);
-    handler.setAggregatedValue(regularAggName, regularValue);
-
-    String persistentAggName = "persistent";
-    DoubleWritable persistentValue = new DoubleWritable(10.5);
-    handler.registerPersistentAggregator(persistentAggName,
-        DoubleOverwriteAggregator.class);
-    handler.setAggregatedValue(persistentAggName, persistentValue);
-
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap(handler).values()) {
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-    }
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.write(new DataOutputStream(out));
-
-    MasterAggregatorHandler restartedHandler =
-        new MasterAggregatorHandler(conf, progressable);
-    restartedHandler.readFields(
-        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
-
-    assertEquals(2, getAggregatorMap(restartedHandler).size());
-
-    AggregatorWrapper<Writable> regularAgg =
-        getAggregatorMap(restartedHandler).get(regularAggName);
-    assertTrue(regularAgg.getAggregatorFactory().create().getClass().equals(
-        LongSumAggregator.class));
-    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
-    assertEquals(regularValue,
-        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
-    assertFalse(regularAgg.isPersistent());
-
-    AggregatorWrapper<Writable> persistentAgg =
-        getAggregatorMap(restartedHandler).get(persistentAggName);
-    assertTrue(persistentAgg.getAggregatorFactory().create().getClass().equals
-        (DoubleOverwriteAggregator.class));
-    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
-    assertEquals(persistentValue,
-        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
-    assertTrue(persistentAgg.isPersistent());
-  }
-
   /**
    * Test if aggregators are are handled properly when restarting from a
    * checkpoint


[2/3] Reduce/broadcast API

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
new file mode 100644
index 0000000..7171f04
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.IOException;
+
+import org.apache.giraph.master.MasterAggregatorHandler;
+
+/**
+ * Request to send final aggregated values from worker which owns
+ * aggregators to the master
+ */
+public class SendReducedToMasterRequest extends ByteArrayRequest
+    implements MasterRequest {
+
+  /**
+   * Constructor
+   *
+   * @param data Serialized aggregator data
+   */
+  public SendReducedToMasterRequest(byte[] data) {
+    super(data);
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendReducedToMasterRequest() {
+  }
+
+  @Override
+  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+    try {
+      aggregatorHandler.acceptReducedValues(getDataInput());
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: " +
+          "IOException occurred while processing request", e);
+    }
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 00a0c26..2f76e6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 /**
  * Request to send partial aggregated values for current superstep (values
  * which were computed by one worker's vertices)
@@ -56,20 +56,23 @@ public class SendWorkerAggregatorsRequest extends
     OwnerAggregatorServerData aggregatorData =
         serverData.getOwnerAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        if (aggregatorName.equals(
-            AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          LongWritable value = new LongWritable();
+          value.readFields(input);
+          aggregatorData.receivedRequestCountFromWorker(
+              value.get(),
               getSenderTaskId());
+        } else if (type == GlobalCommType.REDUCED_VALUE) {
+          Writable value = aggregatorData.createInitialValue(name);
+          value.readFields(input);
+          aggregatorData.reduce(name, value);
         } else {
-          Writable aggregatedValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatedValue.readFields(input);
-          aggregatorData.aggregate(aggregatorName, aggregatedValue);
+          throw new IllegalStateException(
+              "SendWorkerAggregatorsRequest received " + type);
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
index e7c3084..1ea6603 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
@@ -18,20 +18,20 @@
 
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 /**
  * See {@link Computation} for explanation of the interface.
  *
@@ -52,7 +52,7 @@ import java.util.Iterator;
 public abstract class AbstractComputation<I extends WritableComparable,
     V extends Writable, E extends Writable, M1 extends Writable,
     M2 extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    extends WorkerAggregatorDelegator<I, V, E>
     implements Computation<I, V, E, M1, M2> {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(AbstractComputation.class);
@@ -63,8 +63,6 @@ public abstract class AbstractComputation<I extends WritableComparable,
   private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
   /** Graph-wide BSP Mapper for this Computation */
   private GraphTaskManager<I, V, E> graphTaskManager;
-  /** Worker aggregator usage */
-  private WorkerAggregatorUsage workerAggregatorUsage;
   /** Worker context */
   private WorkerContext workerContext;
 
@@ -76,6 +74,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
    *                 superstep.  Each message is only guaranteed to have
    *                 a life expectancy as long as next() is not called.
    */
+  @Override
   public abstract void compute(Vertex<I, V, E> vertex,
       Iterable<M1> messages) throws IOException;
 
@@ -103,7 +102,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
    * @param graphState Graph state
    * @param workerClientRequestProcessor Processor for handling requests
    * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
-   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerGlobalCommUsage Worker global communication usage
    * @param workerContext Worker context
    */
   @Override
@@ -111,12 +110,12 @@ public abstract class AbstractComputation<I extends WritableComparable,
       GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerAggregatorUsage workerAggregatorUsage,
+      WorkerGlobalCommUsage workerGlobalCommUsage,
       WorkerContext workerContext) {
     this.graphState = graphState;
     this.workerClientRequestProcessor = workerClientRequestProcessor;
     this.graphTaskManager = graphTaskManager;
-    this.workerAggregatorUsage = workerAggregatorUsage;
+    this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
     this.workerContext = workerContext;
   }
 
@@ -274,14 +273,4 @@ public abstract class AbstractComputation<I extends WritableComparable,
   public <W extends WorkerContext> W getWorkerContext() {
     return (W) workerContext;
   }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 7a7b40f..d310da9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.TypesHolder;
@@ -24,13 +27,11 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 /**
  * Interface for an application for computation.
  *
@@ -55,7 +56,7 @@ public interface Computation<I extends WritableComparable,
     M2 extends Writable>
     extends TypesHolder<I, V, E, M1, M2>,
     ImmutableClassesGiraphConfigurable<I, V, E>,
-    WorkerAggregatorUsage {
+    WorkerGlobalCommUsage, WorkerAggregatorUsage {
   /**
    * Must be defined by user to do computation on a single Vertex.
    *
@@ -87,13 +88,13 @@ public interface Computation<I extends WritableComparable,
    * @param graphState Graph state
    * @param workerClientRequestProcessor Processor for handling requests
    * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
-   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerGlobalCommUsage Worker global communication usage
    * @param workerContext Worker context
    */
   void initialize(GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerAggregatorUsage workerAggregatorUsage, WorkerContext workerContext);
+      WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext);
 
   /**
    * Retrieves the current superstep.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index d9c4302..33f2255 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -36,7 +36,7 @@ import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.utils.Trimmable;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerProgress;
-import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
+import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -135,7 +135,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
             context, configuration, serviceWorker);
-    WorkerThreadAggregatorUsage aggregatorUsage =
+    WorkerThreadGlobalCommUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
     WorkerContext workerContext = serviceWorker.getWorkerContext();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index ba5d2fa..eb9fad3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,6 +18,19 @@
 
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -26,9 +39,7 @@ import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.job.JobProgressTracker;
-import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.master.BspServiceMaster;
-import org.apache.giraph.master.MasterAggregatorUsage;
 import org.apache.giraph.master.MasterThread;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
@@ -40,6 +51,7 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
@@ -60,19 +72,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The Giraph-specific business logic for a single BSP
  * compute node in whatever underlying type of cluster
@@ -149,7 +148,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /** Timer for WorkerContext#preSuperstep() */
   private GiraphTimer wcPreSuperstepTimer;
   /** The Hadoop Mapper#Context for this job */
-  private Mapper<?, ?, ?, ?>.Context context;
+  private final Mapper<?, ?, ?, ?>.Context context;
   /** is this GraphTaskManager the master? */
   private boolean isMaster;
 
@@ -497,15 +496,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     return graphFunctions;
   }
 
-  /**
-   * Get master aggregator usage, a subset of the functionality
-   *
-   * @return Master aggregator usage interface
-   */
-  public final MasterAggregatorUsage getMasterAggregatorUsage() {
-    return serviceMaster.getAggregatorHandler();
-  }
-
   public final WorkerContext getWorkerContext() {
     return serviceWorker.getWorkerContext();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 1bc48e3..83a0369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -19,9 +19,9 @@
 package org.apache.giraph.io;
 
 import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+
 import org.apache.giraph.edge.Edge;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,11 +36,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 @SuppressWarnings("rawtypes")
 public abstract class EdgeReader<I extends WritableComparable,
-    E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
-        I, Writable, E> implements WorkerAggregatorUsage {
-
-  /** Aggregator usage for edge reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
+    E extends Writable> extends WorkerAggregatorDelegator<
+        I, Writable, E> {
 
   /**
    * Use the input split and context to setup reading the edges.
@@ -56,21 +53,6 @@ public abstract class EdgeReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading an edge.
-   * It is invoked just after initialization.
-   * E.g.,
-   * edgeReader.initialize(inputSplit, context);
-   * edgeReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for edge reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
-  /**
    * Read the next edge.
    *
    * @return false iff there are no more edges
@@ -117,14 +99,4 @@ public abstract class EdgeReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
index b7ce97c..7c71585 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
 import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * Will read the mapping from an input split.
  *
@@ -38,12 +37,7 @@ import java.io.IOException;
  */
 public abstract class MappingReader<I extends WritableComparable,
     V extends Writable, E extends Writable, B extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements WorkerAggregatorUsage {
-
-  /** Aggregator usage for vertex reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
-
+    extends WorkerAggregatorDelegator<I, V, E> {
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -57,22 +51,6 @@ public abstract class MappingReader<I extends WritableComparable,
     TaskAttemptContext context)
     throws IOException, InterruptedException;
 
-
-  /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading a vertex.
-   * It is invoked just after initialization.
-   * E.g.,
-   * vertexReader.initialize(inputSplit, context);
-   * vertexReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for vertex reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
   /**
    *
    * @return false iff there are no more vertices
@@ -111,14 +89,4 @@ public abstract class MappingReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 94a4083..64ec800 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * Analogous to Hadoop's RecordReader for vertices.  Will read the
  * vertices from an input split.
@@ -39,11 +38,7 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public abstract class VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements WorkerAggregatorUsage {
-  /** Aggregator usage for vertex reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
-
+    WorkerAggregatorDelegator<I, V, E> {
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -58,21 +53,6 @@ public abstract class VertexReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading a vertex.
-   * It is invoked just after initialization.
-   * E.g.,
-   * vertexReader.initialize(inputSplit, context);
-   * vertexReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for vertex reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
-  /**
    *
    * @return false iff there are no more vertices
    * @throws IOException
@@ -108,14 +88,4 @@ public abstract class VertexReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index 9b5e8c6..05dd5bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -72,9 +72,10 @@ public class WrappedEdgeReader<I extends WritableComparable,
   }
 
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    // Set aggregator usage for edge reader
-    baseEdgeReader.setWorkerAggregatorUse(agg);
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
+    // Set global communication usage for edge reader
+    baseEdgeReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
index 7d1c4c9..659776b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.MappingReader;
 import org.apache.giraph.job.HadoopUtils;
 import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -74,11 +74,11 @@ public class WrappedMappingReader<I extends WritableComparable,
         HadoopUtils.makeTaskAttemptContext(getConf(), context));
   }
 
-
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    // Set aggregator usage for vertex reader
-    baseMappingReader.setWorkerAggregatorUse(agg);
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
+    // Set global communication usage for edge reader
+    baseMappingReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 8e25602..8c23cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -73,9 +73,10 @@ public class WrappedVertexReader<I extends WritableComparable,
   }
 
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
     // Set aggregator usage for vertex reader
-    baseVertexReader.setWorkerAggregatorUse(agg);
+    baseVertexReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
new file mode 100644
index 0000000..1673f6d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Translates aggregation operation to reduce operations.
+ *
+ * @param <A> Aggregation object type
+ */
+public class AggregatorReduceOperation<A extends Writable>
+    extends OnSameReduceOperation<A> {
+  /** Aggregator factory */
+  private WritableFactory<? extends Aggregator<A>> aggregatorFactory;
+  /** Aggregator */
+  private Aggregator<A> aggregator;
+
+  /** Constructor */
+  public AggregatorReduceOperation() {
+  }
+
+  /**
+   * Constructor
+   * @param aggregatorFactory Aggregator factory
+   */
+  public AggregatorReduceOperation(
+      WritableFactory<? extends Aggregator<A>> aggregatorFactory) {
+    this.aggregatorFactory = aggregatorFactory;
+    this.aggregator = aggregatorFactory.create();
+    this.aggregator.setAggregatedValue(null);
+  }
+
+  @Override
+  public A createInitialValue() {
+    return aggregator.createInitialValue();
+  }
+
+  /**
+   * Creates copy of this object
+   * @return copy
+   */
+  public AggregatorReduceOperation<A> createCopy() {
+    return new AggregatorReduceOperation<>(aggregatorFactory);
+  }
+
+  @Override
+  public synchronized void reduceSingle(A curValue, A valueToReduce) {
+    aggregator.setAggregatedValue(curValue);
+    aggregator.aggregate(valueToReduce);
+    if (curValue != aggregator.getAggregatedValue()) {
+      throw new IllegalStateException(
+          "Aggregator " + aggregator + " aggregates by creating new value");
+    }
+    aggregator.setAggregatedValue(null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeWritableObject(aggregatorFactory, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    aggregatorFactory = WritableUtils.readWritableObject(in, null);
+    aggregator = aggregatorFactory.create();
+    this.aggregator.setAggregatedValue(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
new file mode 100644
index 0000000..7492fc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.aggregators.ClassAggregatorFactory;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class that translates aggregator handling on the master to
+ * reduce and broadcast operations supported by the MasterAggregatorHandler.
+ */
+public class AggregatorToGlobalCommTranslation
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements MasterAggregatorUsage, Writable {
+  /** Class providing reduce and broadcast interface to use */
+  private final MasterGlobalCommUsage globalComm;
+  /** List of registered aggregators */
+  private final HashMap<String, AggregatorWrapper<Writable>>
+  registeredAggregators = new HashMap<>();
+
+  /**
+   * Constructor
+   * @param globalComm Global communication interface
+   */
+  public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) {
+    this.globalComm = globalComm;
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return globalComm.getReduced(name);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
+    aggregator.setCurrentValue(value);
+  }
+
+  /**
+   * Called after master compute, to do aggregator->reduce/broadcast
+   * translation
+   */
+  public void postMasterCompute() {
+    // broadcast what master set, or if it didn't broadcast reduced value
+    // register reduce with the same value
+    for (Entry<String, AggregatorWrapper<Writable>> entry :
+        registeredAggregators.entrySet()) {
+      Writable value = entry.getValue().currentValue != null ?
+          entry.getValue().getCurrentValue() :
+            globalComm.getReduced(entry.getKey());
+      if (value == null) {
+        value = entry.getValue().getReduceOp().createInitialValue();
+      }
+
+      globalComm.broadcast(entry.getKey(), value);
+      // Always register clean instance of reduceOp, not to conflict with
+      // reduceOp from previous superstep.
+      AggregatorReduceOperation<Writable> cleanReduceOp =
+          entry.getValue().createReduceOp();
+      if (entry.getValue().isPersistent()) {
+        globalComm.registerReduce(
+            entry.getKey(), cleanReduceOp, value);
+      } else {
+        globalComm.registerReduce(
+            entry.getKey(), cleanReduceOp);
+      }
+      entry.getValue().setCurrentValue(null);
+    }
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass);
+    return registerAggregator(name, aggregatorFactory, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      WritableFactory<? extends Aggregator<A>> aggregator) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregator, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass);
+    return registerAggregator(name, aggregatorFactory, true) != null;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(registeredAggregators.size());
+    for (Entry<String, AggregatorWrapper<Writable>> entry :
+        registeredAggregators.entrySet()) {
+      out.writeUTF(entry.getKey());
+      entry.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    registeredAggregators.clear();
+    int numAggregators = in.readInt();
+    for (int i = 0; i < numAggregators; i++) {
+      String name = in.readUTF();
+      AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
+      agg.readFields(in);
+      registeredAggregators.put(name, agg);
+    }
+  }
+
+  /**
+   * Helper function for registering aggregators.
+   *
+   * @param name              Name of the aggregator
+   * @param aggregatorFactory Aggregator factory
+   * @param persistent        Whether aggregator is persistent or not
+   * @param <A>               Aggregated value type
+   * @return Newly registered aggregator or aggregator which was previously
+   *         created with selected name, if any
+   */
+  private <A extends Writable> AggregatorWrapper<A> registerAggregator
+  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
+    AggregatorWrapper<A> aggregatorWrapper =
+        (AggregatorWrapper<A>) registeredAggregators.get(name);
+    if (aggregatorWrapper == null) {
+      aggregatorWrapper =
+          new AggregatorWrapper<A>(aggregatorFactory, persistent);
+      registeredAggregators.put(
+          name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+    }
+    return aggregatorWrapper;
+  }
+
+  /**
+   * Object holding all needed data related to single Aggregator
+   * @param <A> Aggregated value type
+   */
+  private static class AggregatorWrapper<A extends Writable>
+      implements Writable {
+    /** False iff aggregator should be reset at the end of each super step */
+    private boolean persistent;
+    /** Translation of aggregator to reduce operations */
+    private AggregatorReduceOperation<A> reduceOp;
+    /** Current value, set by master manually */
+    private A currentValue;
+
+    /** Constructor */
+    public AggregatorWrapper() {
+    }
+
+    /**
+     * Constructor
+     * @param aggregatorFactory Aggregator factory
+     * @param persistent Is persistent
+     */
+    public AggregatorWrapper(
+        WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+        boolean persistent) {
+      this.persistent = persistent;
+      this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory);
+    }
+
+    public AggregatorReduceOperation<A> getReduceOp() {
+      return reduceOp;
+    }
+
+    /**
+     * Create a fresh instance of AggregatorReduceOperation
+     * @return fresh instance of AggregatorReduceOperation
+     */
+    public AggregatorReduceOperation<A> createReduceOp() {
+      return reduceOp.createCopy();
+    }
+
+    public A getCurrentValue() {
+      return currentValue;
+    }
+
+    public void setCurrentValue(A currentValue) {
+      this.currentValue = currentValue;
+    }
+
+    public boolean isPersistent() {
+      return persistent;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeBoolean(persistent);
+      reduceOp.write(out);
+
+      Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
+          "shouldn't have value at the end of the superstep");
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      persistent = in.readBoolean();
+      reduceOp = new AggregatorReduceOperation<>();
+      reduceOp.readFields(in);
+      currentValue = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index efa5b87..ab1289d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -18,11 +18,39 @@
 
 package org.apache.giraph.master;
 
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
@@ -33,23 +61,17 @@ import org.apache.giraph.comm.netty.NettyMasterServer;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.counters.GiraphStats;
-import org.apache.giraph.graph.InputSplitPaths;
-import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphFunctions;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.partition.BasicPartitionOwner;
-import org.apache.giraph.partition.MasterGraphPartitioner;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.partition.PartitionUtils;
 import org.apache.giraph.metrics.AggregatedMetrics;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphTimer;
@@ -57,13 +79,18 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.utils.CheckpointingUtils;
-import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
-import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.partition.BasicPartitionOwner;
+import org.apache.giraph.partition.MasterGraphPartitioner;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.CheckpointingUtils;
+import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
@@ -89,32 +116,6 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
-import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
-import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
@@ -167,8 +168,10 @@ public class BspServiceMaster<I extends WritableComparable,
   /** All the partition stats from the last superstep */
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
-  /** Handler for aggregators */
-  private MasterAggregatorHandler aggregatorHandler;
+  /** Handler for global communication */
+  private MasterAggregatorHandler globalCommHandler;
+  /** Handler for aggregators to reduce/broadcast translation */
+  private AggregatorToGlobalCommTranslation aggregatorTranslation;
   /** Master class */
   private MasterCompute masterCompute;
   /** IPC Client */
@@ -232,7 +235,7 @@ public class BspServiceMaster<I extends WritableComparable,
     this.checkpointStatus = CheckpointStatus.NONE;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    GiraphStats.init((Mapper.Context) context);
+    GiraphStats.init(context);
   }
 
   @Override
@@ -738,8 +741,13 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
-  public MasterAggregatorHandler getAggregatorHandler() {
-    return aggregatorHandler;
+  public MasterAggregatorHandler getGlobalCommHandler() {
+    return globalCommHandler;
+  }
+
+  @Override
+  public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
+    return aggregatorTranslation;
   }
 
   @Override
@@ -811,7 +819,8 @@ public class BspServiceMaster<I extends WritableComparable,
     });
 
 
-    aggregatorHandler.readFields(finalizedStream);
+    globalCommHandler.readFields(finalizedStream);
+    aggregatorTranslation.readFields(finalizedStream);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
 
@@ -883,9 +892,12 @@ public class BspServiceMaster<I extends WritableComparable,
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
               setValue(getTaskPartition());
-          aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
-              getContext());
-          aggregatorHandler.initialize(this);
+          globalCommHandler = new MasterAggregatorHandler(
+              getConfiguration(), getContext());
+          aggregatorTranslation = new AggregatorToGlobalCommTranslation(
+              globalCommHandler);
+
+          globalCommHandler.initialize(this);
           masterCompute = getConfiguration().createMasterCompute();
           masterCompute.setMasterService(this);
 
@@ -1097,7 +1109,8 @@ public class BspServiceMaster<I extends WritableComparable,
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
     }
-    aggregatorHandler.write(finalizedOutputStream);
+    globalCommHandler.write(finalizedOutputStream);
+    aggregatorTranslation.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
     lastCheckpointedSuperstep = superstep;
@@ -1502,7 +1515,8 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private void initializeAggregatorInputSuperstep()
     throws InterruptedException {
-    aggregatorHandler.prepareSuperstep(masterClient);
+    globalCommHandler.prepareSuperstep();
+
     prepareMasterCompute(getSuperstep());
     try {
       masterCompute.initialize();
@@ -1516,7 +1530,10 @@ public class BspServiceMaster<I extends WritableComparable,
       throw new RuntimeException(
         "initializeAggregatorInputSuperstep: Failed in access", e);
     }
-    aggregatorHandler.finishSuperstep(masterClient);
+    aggregatorTranslation.postMasterCompute();
+    globalCommHandler.finishSuperstep();
+
+    globalCommHandler.sendDataToOwners(masterClient);
   }
 
   /**
@@ -1579,18 +1596,18 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
+    // We need to finalize aggregators from previous superstep
+    if (getSuperstep() >= 0) {
+      aggregatorTranslation.postMasterCompute();
+      globalCommHandler.finishSuperstep();
+    }
+
     masterClient.openConnections();
 
     GiraphStats.getInstance().
         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
     assignPartitionOwners();
 
-    // We need to finalize aggregators from previous superstep (send them to
-    // worker owners) after new worker assignments
-    if (getSuperstep() >= 0) {
-      aggregatorHandler.finishSuperstep(masterClient);
-    }
-
     // Finalize the valid checkpoint file prefixes and possibly
     // the aggregators.
     if (checkpointStatus != CheckpointStatus.NONE) {
@@ -1616,6 +1633,11 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
+    // We need to send aggregators to worker owners after new worker assignments
+    if (getSuperstep() >= 0) {
+      globalCommHandler.sendDataToOwners(masterClient);
+    }
+
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
       initializeAggregatorInputSuperstep();
@@ -1645,7 +1667,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    aggregatorHandler.prepareSuperstep(masterClient);
+    globalCommHandler.prepareSuperstep();
     SuperstepClasses superstepClasses =
       prepareMasterCompute(getSuperstep() + 1);
     doMasterCompute();
@@ -1710,7 +1732,7 @@ public class BspServiceMaster<I extends WritableComparable,
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
-    aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
+    globalCommHandler.writeAggregators(getSuperstep(), superstepState);
 
     return superstepState;
   }
@@ -1935,7 +1957,7 @@ public class BspServiceMaster<I extends WritableComparable,
         failJob(new Exception("Checkpoint and halt requested. " +
             "Killing this job."));
       }
-      aggregatorHandler.close();
+      globalCommHandler.close();
       masterClient.closeConnections();
       masterServer.close();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 2b0cdd6..5f7bd73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -15,263 +15,224 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.master;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.AbstractMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.aggregators.AggregatorWrapper;
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.aggregators.ClassAggregatorFactory;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.MasterLoggingAggregator;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
-    Writable {
+/** Handler for reduce/broadcast on the master */
+public class MasterAggregatorHandler
+    implements MasterGlobalCommUsage, Writable {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(MasterAggregatorHandler.class);
-  /**
-   * Map of aggregators.
-   * This map is used to store final aggregated values received from worker
-   * owners, and also to read and write values provided during master.compute.
-   */
-  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+
+  /** Map of reducers registered for the next worker computation */
+  private final Map<String, Reducer<Object, Writable>> reducerMap =
+      Maps.newHashMap();
+  /** Map of values to be sent to workers for next computation */
+  private final Map<String, Writable> broadcastMap =
       Maps.newHashMap();
-  /** Aggregator writer */
+  /** Values reduced from previous computation */
+  private final Map<String, Writable> reducedMap =
+      Maps.newHashMap();
+
+  /** Aggregator writer - for writing reduced values */
   private final AggregatorWriter aggregatorWriter;
   /** Progressable used to report progress */
   private final Progressable progressable;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 
   /**
    * Constructor
    *
-   * @param conf         Giraph configuration
-   * @param progressable Progressable used for reporting progress
+   * @param conf Configuration
+   * @param progressable Progress reporter
    */
   public MasterAggregatorHandler(
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
       Progressable progressable) {
-    this.conf = conf;
     this.progressable = progressable;
     aggregatorWriter = conf.createAggregatorWriter();
-    MasterLoggingAggregator.registerAggregator(this, conf);
   }
 
   @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      LOG.warn("getAggregatedValue: " +
-          AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              aggregatorMap.size() != 0, conf));
-      return null;
-    } else {
-      return (A) aggregator.getPreviousAggregatedValue();
-    }
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp) {
+    registerReduce(name, reduceOp, reduceOp.createInitialValue());
   }
 
   @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      throw new IllegalStateException(
-          "setAggregatedValue: " +
-              AggregatorUtils.getUnregisteredAggregatorMessage(name,
-                  aggregatorMap.size() != 0, conf));
+  public <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    if (reducerMap.containsKey(name)) {
+      throw new IllegalArgumentException(
+          "Reducer with name " + name + " was already registered");
+    }
+    if (reduceOp == null) {
+      throw new IllegalArgumentException("null reduce cannot be registered");
     }
-    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
-  }
 
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass, conf);
-    return registerAggregator(name, aggregatorFactory, false) != null;
+    Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
+    reducerMap.put(name, (Reducer<Object, Writable>) reducer);
   }
 
   @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      WritableFactory<? extends Aggregator<A>> aggregator) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregator, false) != null;
+  public <T extends Writable> T getReduced(String name) {
+    return (T) reducedMap.get(name);
   }
 
   @Override
-  public <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass, conf);
-    return registerAggregator(name, aggregatorFactory, true) != null;
-  }
-
-  /**
-   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
-   * the name of aggregator. Throw an exception if he tries to use it.
-   *
-   * @param name Name of the aggregator to check.
-   */
-  private void checkAggregatorName(String name) {
-    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-      throw new IllegalStateException("checkAggregatorName: " +
-          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
-          " is not allowed for the name of aggregator");
+  public void broadcast(String name, Writable object) {
+    if (broadcastMap.containsKey(name)) {
+      throw new IllegalArgumentException(
+          "Value already broadcasted for name " + name);
     }
-  }
-
-  /**
-   * Helper function for registering aggregators.
-   *
-   * @param name              Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param persistent        Whether aggregator is persistent or not
-   * @param <A>               Aggregated value type
-   * @return Newly registered aggregator or aggregator which was previously
-   *         created with selected name, if any
-   */
-  private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
-      boolean persistent) throws InstantiationException,
-      IllegalAccessException {
-    AggregatorWrapper<A> aggregatorWrapper =
-        (AggregatorWrapper<A>) aggregatorMap.get(name);
-    if (aggregatorWrapper == null) {
-      aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorFactory, persistent, conf);
-      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+    if (object == null) {
+      throw new IllegalArgumentException("null cannot be broadcasted");
     }
-    return aggregatorWrapper;
+
+    broadcastMap.put(name, object);
   }
 
-  /**
-   * Prepare aggregators for current superstep
-   *
-   * @param masterClient IPC client on master
-   */
-  public void prepareSuperstep(MasterClient masterClient) {
+  /** Prepare reduced values for current superstep's master compute */
+  public void prepareSuperstep() {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Start preparing aggregators");
+      LOG.debug("prepareSuperstep: Start preparing reducers");
     }
-    // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isPersistent()) {
-        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+
+    Preconditions.checkState(reducedMap.isEmpty(),
+        "reducedMap must be empty before start of the superstep");
+    Preconditions.checkState(broadcastMap.isEmpty(),
+        "broadcastMap must be empty before start of the superstep");
+
+    for (Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
+      Writable value = entry.getValue().getCurrentValue();
+      if (value == null) {
+        value = entry.getValue().createInitialValue();
       }
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-      aggregator.resetCurrentAggregator();
-      progressable.progress();
+
+      reducedMap.put(entry.getKey(), value);
     }
-    MasterLoggingAggregator.logAggregatedValue(this, conf);
+
+    reducerMap.clear();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Aggregators prepared");
     }
   }
 
-  /**
-   * Finalize aggregators for current superstep and share them with workers
-   *
-   * @param masterClient IPC client on master
-   */
-  public void finishSuperstep(MasterClient masterClient) {
+  /** Finalize aggregators for current superstep */
+  public void finishSuperstep() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("finishSuperstep: Start finishing aggregators");
     }
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isChanged()) {
-        // if master compute changed the value, use the one he chose
-        aggregator.setPreviousAggregatedValue(
-            aggregator.getCurrentAggregatedValue());
-        // reset aggregator for the next superstep
-        aggregator.resetCurrentAggregator();
-      }
-      progressable.progress();
+
+    reducedMap.clear();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
     }
+  }
 
-    // send aggregators to their owners
-    // TODO: if aggregator owner and it's value didn't change,
-    //       we don't need to resend it
+  /**
+   * Send data to workers (through owner workers)
+   *
+   * @param masterClient IPC client on master
+   */
+  public void sendDataToOwners(MasterClient masterClient) {
+    // send broadcast values and reduceOperations to their owners
     try {
-      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-          aggregatorMap.entrySet()) {
-        masterClient.sendAggregator(entry.getKey(),
-            entry.getValue().getAggregatorFactory(),
-            entry.getValue().getPreviousAggregatedValue());
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          reducerMap.entrySet()) {
+        masterClient.sendToOwner(entry.getKey(),
+            GlobalCommType.REDUCE_OPERATIONS,
+            entry.getValue().getReduceOp());
         progressable.progress();
       }
-      masterClient.finishSendingAggregatedValues();
+
+      for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+        masterClient.sendToOwner(entry.getKey(),
+            GlobalCommType.BROADCAST,
+            entry.getValue());
+        progressable.progress();
+      }
+      masterClient.finishSendingValues();
+
+      broadcastMap.clear();
     } catch (IOException e) {
       throw new IllegalStateException("finishSuperstep: " +
           "IOException occurred while sending aggregators", e);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Aggregators finished");
-    }
   }
 
   /**
-   * Accept aggregated values sent by worker. Every aggregator will be sent
+   * Accept reduced values sent by worker. Every value will be sent
    * only once, by its owner.
    * We don't need to count the number of these requests because global
    * superstep barrier will happen after workers ensure all requests of this
    * type have been received and processed by master.
    *
-   * @param aggregatedValuesInput Input in which aggregated values are
+   * @param reducedValuesInput Input in which aggregated values are
    *                              written in the following format:
-   *                              number_of_aggregators
-   *                              name_1  value_1
-   *                              name_2  value_2
+   *                              numReducers
+   *                              name_1  REDUCED_VALUE  value_1
+   *                              name_2  REDUCED_VALUE  value_2
    *                              ...
    * @throws IOException
    */
-  public void acceptAggregatedValues(
-      DataInput aggregatedValuesInput) throws IOException {
-    int numAggregators = aggregatedValuesInput.readInt();
-    for (int i = 0; i < numAggregators; i++) {
-      String aggregatorName = aggregatedValuesInput.readUTF();
-      AggregatorWrapper<Writable> aggregator =
-          aggregatorMap.get(aggregatorName);
-      if (aggregator == null) {
+  public void acceptReducedValues(
+      DataInput reducedValuesInput) throws IOException {
+    int numReducers = reducedValuesInput.readInt();
+    for (int i = 0; i < numReducers; i++) {
+      String name = reducedValuesInput.readUTF();
+      GlobalCommType type =
+          GlobalCommType.values()[reducedValuesInput.readByte()];
+      if (type != GlobalCommType.REDUCED_VALUE) {
         throw new IllegalStateException(
-            "acceptAggregatedValues: " +
-                "Master received aggregator which isn't registered: " +
-                aggregatorName);
+            "SendReducedToMasterRequest received " + type);
+      }
+      Reducer<Object, Writable> reducer = reducerMap.get(name);
+      if (reducer == null) {
+        throw new IllegalStateException(
+            "acceptReducedValues: " +
+                "Master received reduced value which isn't registered: " +
+                name);
+      }
+
+      Writable valueToReduce = reducer.createInitialValue();
+      valueToReduce.readFields(reducedValuesInput);
+
+      if (reducer.getCurrentValue() != null) {
+        reducer.reducePartial(valueToReduce);
+      } else {
+        reducer.setCurrentValue(valueToReduce);
       }
-      Writable aggregatorValue = aggregator.createInitialValue();
-      aggregatorValue.readFields(aggregatedValuesInput);
-      aggregator.setCurrentAggregatedValue(aggregatorValue);
       progressable.progress();
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("acceptAggregatedValues: Accepted one set with " +
-          numAggregators + " aggregated values");
+      LOG.debug("acceptReducedValues: Accepted one set with " +
+          numReducers + " aggregated values");
     }
   }
 
@@ -281,23 +242,10 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
    * @param superstep      Superstep which just finished
    * @param superstepState State of the superstep which just finished
    */
-  public void writeAggregators(long superstep, SuperstepState superstepState) {
+  public void writeAggregators(
+      long superstep, SuperstepState superstepState) {
     try {
-      Iterable<Map.Entry<String, Writable>> iter =
-          Iterables.transform(
-              aggregatorMap.entrySet(),
-              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
-                  Map.Entry<String, Writable>>() {
-                @Override
-                public Map.Entry<String, Writable> apply(
-                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
-                  progressable.progress();
-                  return new AbstractMap.SimpleEntry<String,
-                      Writable>(entry.getKey(),
-                      entry.getValue().getPreviousAggregatedValue());
-                }
-              });
-      aggregatorWriter.writeAggregator(iter,
+      aggregatorWriter.writeAggregator(reducedMap.entrySet(),
           (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
               AggregatorWriter.LAST_SUPERSTEP : superstep);
     } catch (IOException e) {
@@ -333,43 +281,44 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(aggregatorMap.size());
-    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-        aggregatorMap.entrySet()) {
+    // At the end of superstep, only reduceOpMap can be non-empty
+    Preconditions.checkState(reducedMap.isEmpty(),
+        "reducedMap must be empty at the end of the superstep");
+
+    out.writeInt(reducerMap.size());
+    for (Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
       out.writeUTF(entry.getKey());
-      WritableUtils.writeWritableObject(
-          entry.getValue().getAggregatorFactory(), out);
-      out.writeBoolean(entry.getValue().isPersistent());
-      entry.getValue().getPreviousAggregatedValue().write(out);
+      entry.getValue().write(out);
       progressable.progress();
     }
+
+    out.writeInt(broadcastMap.size());
+    for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+      out.writeUTF(entry.getKey());
+      WritableUtils.writeWritableObject(entry.getValue(), out);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    aggregatorMap.clear();
-    int numAggregators = in.readInt();
-    try {
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = in.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(in, conf);
-        boolean isPersistent = in.readBoolean();
-        AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator(
-            aggregatorName,
-            aggregatorFactory,
-            isPersistent);
-        Writable value = aggregatorWrapper.createInitialValue();
-        value.readFields(in);
-        aggregatorWrapper.setPreviousAggregatedValue(value);
-        progressable.progress();
-      }
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("readFields: " +
-          "InstantiationException occurred", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("readFields: " +
-          "IllegalAccessException occurred", e);
+    reducedMap.clear();
+    broadcastMap.clear();
+    reducerMap.clear();
+
+    int numReducers = in.readInt();
+    for (int i = 0; i < numReducers; i++) {
+      String name = in.readUTF();
+      Reducer<Object, Writable> reducer = new Reducer<>();
+      reducer.readFields(in);
+      reducerMap.put(name, reducer);
+    }
+
+    int numBroadcast = in.readInt();
+    for (int i = 0; i < numBroadcast; i++) {
+      String name = in.readUTF();
+      Writable value = WritableUtils.readWritableObject(in, null);
+      broadcastMap.put(name, value);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 552cca9..72e4d0a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -24,6 +24,7 @@ import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -43,7 +44,7 @@ import org.apache.hadoop.mapreduce.Mapper;
  */
 public abstract class MasterCompute
     extends DefaultImmutableClassesGiraphConfigurable
-    implements MasterAggregatorUsage, Writable {
+    implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
   /** Master aggregator usage */
@@ -190,10 +191,33 @@ public abstract class MasterCompute
   }
 
   @Override
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp) {
+    serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp);
+  }
+
+  @Override
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    serviceMaster.getGlobalCommHandler().registerReduce(
+        name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public final <T extends Writable> T getReduced(String name) {
+    return serviceMaster.getGlobalCommHandler().getReduced(name);
+  }
+
+  @Override
+  public final void broadcast(String name, Writable object) {
+    serviceMaster.getGlobalCommHandler().broadcast(name, object);
+  }
+
+  @Override
   public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)
     throws InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerAggregator(
+    return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
         name, aggregatorClass);
   }
 
@@ -201,7 +225,7 @@ public abstract class MasterCompute
   public final <A extends Writable> boolean registerAggregator(
     String name, WritableFactory<? extends Aggregator<A>> aggregator)
     throws InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerAggregator(
+    return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
         name, aggregator);
   }
 
@@ -210,19 +234,21 @@ public abstract class MasterCompute
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerPersistentAggregator(
-        name, aggregatorClass);
+    return serviceMaster.getAggregatorTranslationHandler()
+        .registerPersistentAggregator(name, aggregatorClass);
   }
 
   @Override
   public final <A extends Writable> A getAggregatedValue(String name) {
-    return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name);
+    return serviceMaster.getAggregatorTranslationHandler()
+        .<A>getAggregatedValue(name);
   }
 
   @Override
   public final <A extends Writable> void setAggregatedValue(
       String name, A value) {
-    serviceMaster.getAggregatorHandler().setAggregatedValue(name, value);
+    serviceMaster.getAggregatorTranslationHandler()
+        .setAggregatedValue(name, value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
new file mode 100644
index 0000000..c3ce0ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsage {
+  /**
+   * Register reducer to be reduced in the next worker computation,
+   * using given name and operations.
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp);
+
+  /**
+   * Register reducer to be reduced in the next worker computation, using
+   * given name and operations, starting globally from globalInitialValue.
+   * (globalInitialValue is reduced only once, each worker will still start
+   * from neutral initial value)
+   *
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param globalInitialValue Global initial value
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+  /**
+   * Get reduced value from previous worker computation.
+   * @param name Name of the reducer
+   * @return Reduced value
+   * @param <R> Reduced value type
+   */
+  <R extends Writable> R getReduced(String name);
+
+  /**
+   * Broadcast given value to all workers for next computation.
+   * @param name Name of the broadcast object
+   * @param value Value
+   */
+  void broadcast(String name, Writable value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
new file mode 100644
index 0000000..a675f4d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * ReduceOperation object when single object being reduced is of
+ * same type as reduced value.
+ *
+ * @param <R> Reduced object type.
+ */
+public abstract class OnSameReduceOperation<R extends Writable>
+    implements ReduceOperation<R, R> {
+  @Override
+  public final void reducePartial(R curValue, R valueToReduce) {
+    reduceSingle(curValue, valueToReduce);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
new file mode 100644
index 0000000..434e21a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reduce operations defining how to reduce single values
+ * passed on workers, into partial values on workers, and then
+ * into a single global reduced value.
+ *
+ * Object should be thread safe. Most frequently it should be
+ * immutable object, so that functions can execute concurrently.
+ * Rarely when object is mutable ({@link AggregatorReduceOperation}),
+ * i.e. stores reusable object inside, accesses should be synchronized.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public interface ReduceOperation<S, R extends Writable> extends Writable {
+  /**
+   * Return new reduced value which is neutral to reduce operation.
+   *
+   * @return Neutral value
+   */
+  R createInitialValue();
+  /**
+   * Add a new value.
+   * Needs to be commutative and associative
+   *
+   * @param curValue Partial value into which to reduce and store the result
+   * @param valueToReduce Single value to be reduced
+   */
+  void reduceSingle(R curValue, S valueToReduce);
+  /**
+   * Add partially reduced value to current partially reduced value.
+   *
+   * @param curValue Partial value into which to reduce and store the result
+   * @param valueToReduce Partial value to be reduced
+   */
+  void reducePartial(R curValue, R valueToReduce);
+}


[3/3] git commit: updated refs/heads/trunk to f43f450

Posted by ma...@apache.org.
Reduce/broadcast API

Summary:
Adding reduce and broadcast API

Reduce and broadcast API should be a simplest comprehensive API for what it and aggregators do,
and has somewhat simpler implementation, since there is no strange interaction with values from
previous and next superstep.

It is very flexible and allows building more complex abstraction on top of it.
Aggregators API is built on top of it in AggregatorToGlobalCommTranslation,
when reverse is not true, which shows it is more flexible then previous API.

Once primitive types diff goes in, it will be trivial to create generic reducers
(instead of having huge number of specialized aggregators)

Current aggregator API has multiple issues:
- no matter whether aggregated values are needed on workers, they are distributed to them
- there is no way to register aggregator for a single superstep
- in order for master to send data to workers, it needs to go through an aggregator - even though it can only be Writable
- value to be aggregated and result of aggregation need to be of the same type
- logic of how to do aggregation is combined with how Aggregator is kept (i.e. aggregate(valueToAggregate), instead of aggregate(currentAggregatedValue, valueToAggregate)), and so every aggregator needs to extend BasicAggregator, but that still limits what can be done.

Related to https://phabricator.fb.com/D1303953

Test Plan:
All unit tests, will run some jobs on production.
Will add unit test to use reduce/broadcast directly, instead of through aggregators

Reviewers: majakabiljo, avery.ching, pavanka, sergey.edunov, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D21423


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f43f4500
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f43f4500
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f43f4500

Branch: refs/heads/trunk
Commit: f43f450093876ba8ae164681789cad0e6ea4b68e
Parents: 61db689
Author: Igor Kabiljo <ik...@fb.com>
Authored: Thu Oct 9 14:55:20 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Oct 9 14:55:20 2014 -0700

----------------------------------------------------------------------
 .../giraph/aggregators/AggregatorWrapper.java   | 150 --------
 .../aggregators/ClassAggregatorFactory.java     |  17 +-
 .../giraph/benchmark/ReducersBenchmark.java     | 263 +++++++++++++
 .../giraph/bsp/CentralizedServiceMaster.java    |  20 +-
 .../org/apache/giraph/comm/GlobalCommType.java  |  32 ++
 .../org/apache/giraph/comm/MasterClient.java    |  15 +-
 .../AggregatedValueOutputStream.java            |  45 ---
 .../aggregators/AggregatorOutputStream.java     |  51 ---
 .../comm/aggregators/AggregatorUtils.java       |   6 -
 .../aggregators/AllAggregatorServerData.java    | 117 +++---
 .../GlobalCommValueOutputStream.java            |  71 ++++
 .../aggregators/OwnerAggregatorServerData.java  |  71 ++--
 .../aggregators/SendAggregatedValueCache.java   |  87 -----
 .../comm/aggregators/SendAggregatorCache.java   |  92 -----
 .../comm/aggregators/SendGlobalCommCache.java   | 102 +++++
 .../WorkerAggregatorRequestProcessor.java       |  34 +-
 .../giraph/comm/netty/NettyMasterClient.java    |  32 +-
 .../giraph/comm/netty/NettyMasterServer.java    |  10 +-
 .../NettyWorkerAggregatorRequestProcessor.java  |  44 +--
 .../handler/MasterRequestServerHandler.java     |   4 +-
 .../giraph/comm/requests/RequestType.java       |   2 +-
 .../SendAggregatorsToMasterRequest.java         |  61 ---
 .../requests/SendAggregatorsToOwnerRequest.java |  45 ++-
 .../SendAggregatorsToWorkerRequest.java         |  28 +-
 .../requests/SendReducedToMasterRequest.java    |  61 +++
 .../requests/SendWorkerAggregatorsRequest.java  |  35 +-
 .../giraph/graph/AbstractComputation.java       |  31 +-
 .../org/apache/giraph/graph/Computation.java    |  13 +-
 .../apache/giraph/graph/ComputeCallable.java    |   4 +-
 .../apache/giraph/graph/GraphTaskManager.java   |  40 +-
 .../java/org/apache/giraph/io/EdgeReader.java   |  36 +-
 .../org/apache/giraph/io/MappingReader.java     |  40 +-
 .../java/org/apache/giraph/io/VertexReader.java |  38 +-
 .../giraph/io/internal/WrappedEdgeReader.java   |  13 +-
 .../io/internal/WrappedMappingReader.java       |  14 +-
 .../giraph/io/internal/WrappedVertexReader.java |  11 +-
 .../master/AggregatorReduceOperation.java       |  92 +++++
 .../AggregatorToGlobalCommTranslation.java      | 240 ++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  | 144 ++++---
 .../giraph/master/MasterAggregatorHandler.java  | 371 ++++++++-----------
 .../org/apache/giraph/master/MasterCompute.java |  40 +-
 .../giraph/master/MasterGlobalCommUsage.java    |  68 ++++
 .../giraph/reducers/OnSameReduceOperation.java  |  34 ++
 .../apache/giraph/reducers/ReduceOperation.java |  57 +++
 .../org/apache/giraph/reducers/Reducer.java     | 110 ++++++
 .../apache/giraph/reducers/package-info.java    |  21 ++
 .../apache/giraph/utils/ConfigurationUtils.java |  20 +-
 .../org/apache/giraph/utils/WritableUtils.java  |  34 ++
 .../apache/giraph/worker/BspServiceWorker.java  |  60 +--
 .../giraph/worker/EdgeInputSplitsCallable.java  |  10 +-
 .../worker/MappingInputSplitsCallable.java      |  12 +-
 .../worker/VertexInputSplitsCallable.java       |  10 +-
 .../worker/WorkerAggregatorDelegator.java       |  69 ++++
 .../giraph/worker/WorkerAggregatorHandler.java  | 241 ++++++------
 .../org/apache/giraph/worker/WorkerContext.java |  38 +-
 .../giraph/worker/WorkerGlobalCommUsage.java    |  40 ++
 .../worker/WorkerThreadAggregatorUsage.java     |  31 --
 .../worker/WorkerThreadGlobalCommUsage.java     |  32 ++
 .../java/org/apache/giraph/TestBspBasic.java    |  40 +-
 .../aggregators/TestAggregatorsHandling.java    |  96 +----
 60 files changed, 2104 insertions(+), 1541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
deleted file mode 100644
index fa18a64..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.aggregators;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Wrapper for aggregators. Keeps two instances of an aggregator - one for
- * the value from previous super step, and one for the value which is being
- * generated in current super step.
- *
- * @param <A> Aggregated value
- */
-public class AggregatorWrapper<A extends Writable> {
-  /** False iff aggregator should be reset at the end of each super step */
-  private final boolean persistent;
-  /** Value aggregated in previous super step */
-  private A previousAggregatedValue;
-  /** Aggregator factory */
-  private final WritableFactory<? extends Aggregator<A>> aggregatorFactory;
-  /** Aggregator for next super step */
-  private final Aggregator<A> currentAggregator;
-  /** Whether anyone changed current value since the moment it was reset */
-  private boolean changed;
-
-  /**
-   * @param aggregatorFactory Aggregator Factory
-   * @param persistent        False iff aggregator should be reset at the end
-   *                          of each super step
-   * @param conf              Configuration
-   */
-  public AggregatorWrapper(
-      WritableFactory<? extends Aggregator<A>> aggregatorFactory,
-      boolean persistent, ImmutableClassesGiraphConfiguration conf) {
-    this.persistent = persistent;
-    this.aggregatorFactory = aggregatorFactory;
-    currentAggregator = aggregatorFactory.create();
-    changed = false;
-    previousAggregatedValue = currentAggregator.createInitialValue();
-  }
-
-  /**
-   * Get aggregated value from previous super step
-   *
-   * @return Aggregated value from previous super step
-   */
-  public A getPreviousAggregatedValue() {
-    return previousAggregatedValue;
-  }
-
-  /**
-   * Set aggregated value for previous super step
-   *
-   * @param value Aggregated value to set
-   */
-  public void setPreviousAggregatedValue(A value) {
-    previousAggregatedValue = value;
-  }
-
-  /**
-   * Check if aggregator is persistent
-   *
-   * @return False iff aggregator should be reset at the end of each super step
-   */
-  public boolean isPersistent() {
-    return persistent;
-  }
-
-  /**
-   * Check if current aggregator was changed
-   *
-   * @return Whether anyone changed current value since the moment it was reset
-   */
-  public boolean isChanged() {
-    return changed;
-  }
-
-  /**
-   * Add a new value to current aggregator
-   *
-   * @param value Value to be aggregated
-   */
-  public synchronized void aggregateCurrent(A value) {
-    changed = true;
-    currentAggregator.aggregate(value);
-  }
-
-  /**
-   * Get current aggregated value
-   *
-   * @return Current aggregated value
-   */
-  public A getCurrentAggregatedValue() {
-    return currentAggregator.getAggregatedValue();
-  }
-
-  /**
-   * Set aggregated value of current aggregator
-   *
-   * @param value Value to set it to
-   */
-  public void setCurrentAggregatedValue(A value) {
-    changed = true;
-    currentAggregator.setAggregatedValue(value);
-  }
-
-  /**
-   * Reset the value of current aggregator to neutral value
-   */
-  public void resetCurrentAggregator() {
-    changed = false;
-    currentAggregator.reset();
-  }
-
-  /**
-   * Return new aggregated value which is neutral to aggregate operation
-   *
-   * @return Neutral value
-   */
-  public A createInitialValue() {
-    return currentAggregator.createInitialValue();
-  }
-
-  /**
-   * Get class of wrapped aggregator
-   *
-   * @return Aggregator class
-   */
-  public WritableFactory<? extends Aggregator<A>> getAggregatorFactory() {
-    return aggregatorFactory;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
index 944656e..a022480 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
@@ -21,8 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.utils.WritableUtils;
@@ -36,7 +34,6 @@ import com.google.common.base.Preconditions;
  * @param <T> Aggregated value type
  */
 public class ClassAggregatorFactory<T extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable
     implements WritableFactory<Aggregator<T>> {
   /** Aggregator class */
   private Class<? extends Aggregator<T>> aggregatorClass;
@@ -51,26 +48,14 @@ public class ClassAggregatorFactory<T extends Writable>
    */
   public ClassAggregatorFactory(
       Class<? extends Aggregator<T>> aggregatorClass) {
-    this(aggregatorClass, null);
-
-  }
-
-  /**
-   * Constructor
-   * @param aggregatorClass Aggregator class
-   * @param conf Configuration
-   */
-  public ClassAggregatorFactory(Class<? extends Aggregator<T>> aggregatorClass,
-      ImmutableClassesGiraphConfiguration conf) {
     Preconditions.checkNotNull(aggregatorClass,
         "aggregatorClass cannot be null in ClassAggregatorFactory");
     this.aggregatorClass = aggregatorClass;
-    setConf(conf);
   }
 
   @Override
   public Aggregator<T> create() {
-    return ReflectionUtils.newInstance(aggregatorClass, getConf());
+    return ReflectionUtils.newInstance(aggregatorClass, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
new file mode 100644
index 0000000..ce5c96e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
+import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Benchmark for reducers. Also checks the correctness.
+ */
+public class ReducersBenchmark extends GiraphBenchmark {
+  /** Number of reducers setting */
+  private static final String REDUCERS_NUM = "reducersbenchmark.num";
+
+  /** Option for number of reducers */
+  private static final BenchmarkOption REDUCERS =
+      new BenchmarkOption("r", "reducers",
+          true, "Reducers", "Need to set number of reducers (-r)");
+
+  /** LongSumReducer */
+  public static class TestLongSumReducer
+      extends OnSameReduceOperation<LongWritable> {
+    /** Singleton */
+    public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
+
+    @Override
+    public LongWritable createInitialValue() {
+      return new LongWritable();
+    }
+
+    @Override
+    public void reduceSingle(
+        LongWritable curValue, LongWritable valueToReduce) {
+      curValue.set(curValue.get() + valueToReduce.get());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+  }
+
+  /**
+   * Vertex class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkComputation extends
+      BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
+      int n = getNumReducers(getConf());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(getConf(), superstep);
+      for (int i = 0; i < n; i++) {
+        reduce("w" + i, new LongWritable((superstep + 1) * i));
+        reduce("p" + i, new LongWritable(i));
+
+        if (superstep > 0) {
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getBroadcast("w" + i)).get());
+          assertEquals(-(superstep * i),
+              ((LongWritable) getBroadcast("m" + i)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getBroadcast("p" + i)).get());
+        }
+      }
+      if (superstep > 2) {
+        vertex.voteToHalt();
+      }
+    }
+  }
+
+  /**
+   * MasterCompute class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void compute() {
+      int n = getNumReducers(getConf());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(getConf(), superstep);
+      for (int i = 0; i < n; i++) {
+        String wi = "w" + i;
+        String mi = "m" + i;
+        String pi = "p" + i;
+
+        registerReduce(wi, TestLongSumReducer.INSTANCE);
+        registerReduce(mi, new TestLongSumReducer());
+
+        if (superstep > 0) {
+          broadcast(wi, getReduced(wi));
+          broadcast(mi, new LongWritable(-superstep * i));
+          broadcast(pi, getReduced(pi));
+
+          registerReduce(pi, new TestLongSumReducer(),
+              (LongWritable) getReduced(pi));
+
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getReduced(wi)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getReduced(pi)).get());
+        } else {
+          registerReduce(pi, new TestLongSumReducer());
+        }
+      }
+    }
+  }
+
+  /**
+   * WorkerContext class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkWorkerContext
+      extends DefaultWorkerContext {
+    @Override
+    public void preSuperstep() {
+      addToWorkerReducers(1);
+      checkReducers();
+    }
+
+    @Override
+    public void postSuperstep() {
+      addToWorkerReducers(2);
+      checkReducers();
+    }
+
+    /**
+     * Check if reducer values are correct for current superstep
+     */
+    private void checkReducers() {
+      int n = getNumReducers(getContext().getConfiguration());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(
+          getContext().getConfiguration(), superstep);
+      for (int i = 0; i < n; i++) {
+        if (superstep > 0) {
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getBroadcast("w" + i)).get());
+          assertEquals(-(superstep * i),
+              ((LongWritable) getBroadcast("m" + i)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getBroadcast("p" + i)).get());
+        }
+      }
+    }
+
+    /**
+     * Add some value to worker reducers.
+     *
+     * @param valueToAdd Which value to add
+     */
+    private void addToWorkerReducers(int valueToAdd) {
+      int n = getNumReducers(getContext().getConfiguration());
+      for (int i = 0; i < n; i++) {
+        reduce("w" + i, new LongWritable(valueToAdd));
+      }
+    }
+  }
+
+  /**
+   * Get the number of reducers from configuration
+   *
+   * @param conf Configuration
+   * @return Number of reducers
+   */
+  private static int getNumReducers(Configuration conf) {
+    return conf.getInt(REDUCERS_NUM, 0);
+  }
+
+  /**
+   * Get the value which should be reduced by worker context
+   *
+   * @param conf Configuration
+   * @param superstep Superstep
+   * @return The value which should be reduced by worker context
+   */
+  private static int getWorkerContextReduced(Configuration conf,
+      long superstep) {
+    return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
+  }
+
+  /**
+   * Check if values are equal, throw an exception if they aren't
+   *
+   * @param expected Expected value
+   * @param actual Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+
+  @Override
+  public Set<BenchmarkOption> getBenchmarkOptions() {
+    return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
+  }
+
+  @Override
+  protected void prepareConfiguration(GiraphConfiguration conf,
+      CommandLine cmd) {
+    conf.setComputationClass(ReducersBenchmarkComputation.class);
+    conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
+    conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+    conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
+    conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+        BenchmarkOption.VERTICES.getOptionLongValue(cmd));
+    conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
+    conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
+    conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically the command line arguments.
+   * @throws Exception Any exception from the computation.
+   */
+  public static void main(final String[] args) throws Exception {
+    System.exit(ToolRunner.run(new ReducersBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 9b4f9d6..1e8d519 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.bsp;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
 import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterInfo;
@@ -26,9 +30,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * At most, there will be one active master at a time, but many threads can
  * be trying to be the active master.
@@ -139,11 +140,18 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
     long desiredSuperstep);
 
   /**
-   * Get master aggregator handler
+   * Get handler for global communication
+   *
+   * @return Global communication handler
+   */
+  MasterAggregatorHandler getGlobalCommHandler();
+
+  /**
+   * Handler for aggregators to reduce/broadcast translation
    *
-   * @return Master aggregator handler
+   * @return aggregator translation handler
    */
-  MasterAggregatorHandler getAggregatorHandler();
+  AggregatorToGlobalCommTranslation getAggregatorTranslationHandler();
 
   /**
    * Get MasterCompute object

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java b/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
new file mode 100644
index 0000000..539b3bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm;
+
+/**
+ * Type tag distinguishing different global communication messages.
+ */
+public enum GlobalCommType {
+  /** ReduceOperation object */
+  REDUCE_OPERATIONS,
+  /** Reduced value object */
+  REDUCED_VALUE,
+  /** Broadcasted value */
+  BROADCAST,
+  /** Special count used internally for counting requests */
+  SPECIAL_COUNT;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index b7718a7..aea93fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,8 +20,6 @@ package org.apache.giraph.comm;
 
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -36,19 +34,18 @@ public interface MasterClient {
   /**
    * Sends aggregator to its owner
    *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of the aggregator
+   * @param name Name of the object
+   * @param type Global communication type
+   * @param value Object value
    * @throws IOException
    */
-  void sendAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException;
+  void sendToOwner(String name, GlobalCommType type, Writable value)
+    throws IOException;
 
   /**
    * Flush aggregated values cache.
    */
-  void finishSendingAggregatedValues() throws IOException;
+  void finishSendingValues() throws IOException;
 
   /**
    * Flush all outgoing messages.  This will synchronously ensure that all

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
deleted file mode 100644
index 0010dba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregator values in the form of pair (name, value)
- */
-public class AggregatedValueOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
deleted file mode 100644
index 79bc08a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import java.io.IOException;
-
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregators in the form of triple (name, classname, value)
- */
-public class AggregatorOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    WritableUtils.writeWritableObject(aggregatorFactory, dataOutput);
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index a94ab38..ecb3a6b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -27,12 +27,6 @@ import org.apache.giraph.worker.WorkerInfo;
  * Class for aggregator constants and utility methods
  */
 public class AggregatorUtils {
-  /**
-   * Special aggregator name which will be used to send the total number of
-   * aggregators requests which should arrive
-   */
-  public static final String SPECIAL_COUNT_AGGREGATOR =
-      "__aggregatorRequestCount";
 
   /** How big a single aggregator request can be (in bytes) */
   public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index effc9bf..b1c0781 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -21,19 +21,21 @@ package org.apache.giraph.comm.aggregators;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterInfo;
-import org.apache.giraph.utils.Factory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -51,12 +53,12 @@ public class AllAggregatorServerData {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(AllAggregatorServerData.class);
-  /** Map of aggregator factories */
-  private final ConcurrentMap<String, WritableFactory<Aggregator<Writable>>>
-  aggregatorFactoriesMap = Maps.newConcurrentMap();
-  /** Map of values of aggregators from previous superstep */
+  /** Map of broadcasted values from master */
   private final ConcurrentMap<String, Writable>
-  aggregatedValuesMap = Maps.newConcurrentMap();
+  broadcastedMap = Maps.newConcurrentMap();
+  /** Map of registered reducers for current superstep */
+  private final ConcurrentMap<String, ReduceOperation<Object, Writable>>
+  reduceOpMap = Maps.newConcurrentMap();
   /**
    * Counts the requests with final aggregators from master.
    * It uses values from special aggregators
@@ -97,54 +99,36 @@ public class AllAggregatorServerData {
   }
 
   /**
-   * Register the class of the aggregator, received by master or worker.
-   *
-   * @param name              Aggregator name
-   * @param aggregatorFactory Aggregator factory
-   */
-  public void registerAggregatorClass(String name,
-      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
-    aggregatorFactoriesMap.put(name, aggregatorFactory);
-    progressable.progress();
-  }
-
-  /**
-   * Set the value of aggregator from previous superstep,
-   * received by master or worker.
-   *
-   * @param name Name of the aggregator
-   * @param value Value of the aggregator
-   */
-  public void setAggregatorValue(String name, Writable value) {
-    aggregatedValuesMap.put(name, value);
-    progressable.progress();
-  }
-
-  /**
-   * Create initial aggregated value for an aggregator. Used so requests
-   * would be able to deserialize data.
-   * registerAggregatorClass needs to be called first to ensure that we have
-   * the class of the aggregator.
-   *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
+   * Received value through global communication from master.
+   * @param name Name
+   * @param type Global communication type
+   * @param value Object value
    */
-  public Writable createAggregatorInitialValue(String name) {
-    WritableFactory<Aggregator<Writable>> aggregatorFactory =
-        aggregatorFactoriesMap.get(name);
-    synchronized (aggregatorFactory) {
-      return aggregatorFactory.create().createInitialValue();
+  public void receiveValueFromMaster(
+      String name, GlobalCommType type, Writable value) {
+    switch (type) {
+    case BROADCAST:
+      broadcastedMap.put(name, value);
+      break;
+
+    case REDUCE_OPERATIONS:
+      reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
+      break;
+
+    default:
+      throw new IllegalArgumentException("Unkown request type " + type);
     }
+    progressable.progress();
   }
 
   /**
    * Notify this object that an aggregator request from master has been
    * received.
    *
-   * @param aggregatorData Byte request with data received from master
+   * @param data Byte request with data received from master
    */
-  public void receivedRequestFromMaster(byte[] aggregatorData) {
-    masterData.add(aggregatorData);
+  public void receivedRequestFromMaster(byte[] data) {
+    masterData.add(data);
     masterBarrier.releaseOnePermit();
   }
 
@@ -200,35 +184,32 @@ public class AllAggregatorServerData {
    * arrived, and fill the maps for next superstep when ready.
    *
    * @param workerIds All workers in the job apart from the current one
-   * @param previousAggregatedValuesMap Map of values from previous
-   *                                    superstep to fill out
-   * @param currentAggregatorFactoryMap Map of aggregators factories for
-   *                                    current superstep to fill out.
+   * @param broadcastedMapToFill Broadcast map to fill out
+   * @param reducerMapToFill Registered reducer map to fill out.
    */
   public void fillNextSuperstepMapsWhenReady(
       Set<Integer> workerIds,
-      Map<String, Writable> previousAggregatedValuesMap,
-      Map<String, Factory<Aggregator<Writable>>> currentAggregatorFactoryMap) {
+      Map<String, Writable> broadcastedMapToFill,
+      Map<String, Reducer<Object, Writable>> reducerMapToFill) {
     workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
+      LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
     }
-    previousAggregatedValuesMap.clear();
-    previousAggregatedValuesMap.putAll(aggregatedValuesMap);
-    for (Map.Entry<String, WritableFactory<Aggregator<Writable>>> entry :
-        aggregatorFactoriesMap.entrySet()) {
-      Factory<Aggregator<Writable>> aggregatorFactory =
-          currentAggregatorFactoryMap.get(entry.getKey());
-      if (aggregatorFactory == null) {
-        currentAggregatorFactoryMap.put(entry.getKey(), entry.getValue());
-      }
+
+    Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
+        "broadcastedMap needs to be empty for filling");
+    Preconditions.checkArgument(reducerMapToFill.isEmpty(),
+        "reducerMap needs to be empty for filling");
+
+    broadcastedMapToFill.putAll(broadcastedMap);
+
+    for (Entry<String, ReduceOperation<Object, Writable>> entry :
+        reduceOpMap.entrySet()) {
+      reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
     }
-  }
 
-  /**
-   * Prepare for next superstep
-   */
-  public void reset() {
+    broadcastedMap.clear();
+    reduceOpMap.clear();
     masterData.clear();
     if (LOG.isDebugEnabled()) {
       LOG.debug("reset: Ready for next superstep");

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
new file mode 100644
index 0000000..0add1e9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.aggregators;
+
+import java.io.IOException;
+
+import org.apache.giraph.comm.GlobalCommType;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Implementation of {@link CountingOutputStream} which allows writing of
+ * reduced values in the form of pair (name, type, value)
+ *
+ * There are two modes:
+ * - when class of the value is written into the stream.
+ * - when it isn't, and reader needs to know Class of the value in order
+ *   to read it.
+ */
+public class GlobalCommValueOutputStream extends CountingOutputStream {
+  /** whether to write Class object for values into the stream */
+  private final boolean writeClass;
+
+  /**
+   * Constructor
+   *
+   * @param writeClass boolean whether to write Class object for values
+   */
+  public GlobalCommValueOutputStream(boolean writeClass) {
+    this.writeClass = writeClass;
+  }
+
+  /**
+   * Write global communication object to the stream
+   * and increment internal counter
+   *
+   * @param name Name
+   * @param type Global communication type
+   * @param value Object value
+   * @return Number of bytes occupied by the stream
+   * @throws IOException
+   */
+  public int addValue(String name, GlobalCommType type,
+      Writable value) throws IOException {
+    incrementCounter();
+    dataOutput.writeUTF(name);
+    dataOutput.writeByte(type.ordinal());
+    if (writeClass) {
+      WritableUtils.writeWritableObject(value, dataOutput);
+    } else {
+      value.write(dataOutput);
+    }
+    return getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index 2f3d5e5..9e92efc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -23,9 +23,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -62,13 +62,12 @@ public class OwnerAggregatorServerData {
   private static final Logger LOG =
       Logger.getLogger(OwnerAggregatorServerData.class);
   /** Map of aggregators which current worker owns */
-  private final ConcurrentMap<String, Aggregator<Writable>>
-  myAggregatorMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, Reducer<Object, Writable>>
+  myReducerMap = Maps.newConcurrentMap();
   /**
    * Counts the requests with partial aggregated values from other workers.
-   * It uses values from special aggregators
-   * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
-   * to know how many requests it has to receive.
+   * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it
+   * has to receive.
    */
   private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
@@ -85,49 +84,50 @@ public class OwnerAggregatorServerData {
   }
 
   /**
-   * Register an aggregator which current worker owns. Thread-safe.
+   * Register a reducer which current worker owns. Thread-safe.
    *
    * @param name Name of aggregator
-   * @param aggregatorFactory Aggregator factory
+   * @param reduceOp Reduce operation
    */
-  public void registerAggregator(String name,
-      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
-    if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) {
+  public void registerReducer(String name,
+      ReduceOperation<Object, Writable> reduceOp) {
+    if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) {
       LOG.debug("registerAggregator: The first registration after a reset()");
     }
-    myAggregatorMap.putIfAbsent(name, aggregatorFactory.create());
+    myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp));
     progressable.progress();
   }
 
   /**
-   * Aggregate partial value of one of current worker's aggregators.
+   * Reduce partial value of one of current worker's reducers.
    *
-   * Thread-safe. Call only after aggregators have been registered.
+   * Thread-safe. Call only after reducers have been registered.
    *
-   * @param name Name of the aggregator
-   * @param value Value to aggregate to it
+   * @param name Name of the reducer
+   * @param value Value to reduce to it
    */
-  public void aggregate(String name, Writable value) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      aggregator.aggregate(value);
+  public void reduce(String name, Writable value) {
+    Reducer<Object, Writable> reducer = myReducerMap.get(name);
+    synchronized (reducer) {
+      reducer.reducePartial(value);
     }
     progressable.progress();
   }
 
+
   /**
-   * Create initial aggregated value for an aggregator. Used so requests
+   * Create initial value for a reducer. Used so requests
    * would be able to deserialize data.
    *
-   * Thread-safe. Call only after aggregators have been registered.
+   * Thread-safe. Call only after reducer has been registered.
    *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
+   * @param name Name of the reducer
+   * @return Empty value
    */
-  public Writable createAggregatorInitialValue(String name) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      return aggregator.createInitialValue();
+  public Writable createInitialValue(String name) {
+    Reducer<Object, Writable> reducer = myReducerMap.get(name);
+    synchronized (reducer) {
+      return reducer.createInitialValue();
     }
   }
 
@@ -159,20 +159,20 @@ public class OwnerAggregatorServerData {
    * @return Iterable through final aggregated values which this worker owns
    */
   public Iterable<Map.Entry<String, Writable>>
-  getMyAggregatorValuesWhenReady(Set<Integer> workerIds) {
+  getMyReducedValuesWhenReady(Set<Integer> workerIds) {
     workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
     }
-    return Iterables.transform(myAggregatorMap.entrySet(),
-        new Function<Map.Entry<String, Aggregator<Writable>>,
+    return Iterables.transform(myReducerMap.entrySet(),
+        new Function<Map.Entry<String, Reducer<Object, Writable>>,
             Map.Entry<String, Writable>>() {
           @Override
           public Map.Entry<String, Writable> apply(
-              Map.Entry<String, Aggregator<Writable>> aggregator) {
+              Map.Entry<String, Reducer<Object, Writable>> aggregator) {
             return new AbstractMap.SimpleEntry<String, Writable>(
                 aggregator.getKey(),
-                aggregator.getValue().getAggregatedValue());
+                aggregator.getValue().getCurrentValue());
           }
         });
   }
@@ -181,9 +181,10 @@ public class OwnerAggregatorServerData {
    * Prepare for next superstep
    */
   public void reset() {
-    myAggregatorMap.clear();
+    myReducerMap.clear();
     if (LOG.isDebugEnabled()) {
       LOG.debug("reset: Ready for next superstep");
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
deleted file mode 100644
index 468ee5c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Takes and serializes aggregated values and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatedValueCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatedValueOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregated value to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    AggregatedValueOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatedValueOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Partition id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatedValueOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
deleted file mode 100644
index 8f880b4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-/**
- * Takes and serializes aggregators and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatorCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatorOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregator to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
-    AggregatorOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatorOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatorFactory,
-        aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Task id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatorOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        null, new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
new file mode 100644
index 0000000..5e10c2f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.aggregators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.giraph.comm.GlobalCommType;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Takes and serializes global communication values and keeps them grouped by
+ * owner partition id, to be sent in bulk.
+ * Includes broadcast messages, reducer registrations and special count.
+ */
+public class SendGlobalCommCache extends CountingCache {
+  /** Map from worker partition id to global communication output stream */
+  private final Map<Integer, GlobalCommValueOutputStream> globalCommMap =
+      Maps.newHashMap();
+
+  /** whether to write Class object for values into the stream */
+  private final boolean writeClass;
+
+  /**
+   * Constructor
+   *
+   * @param writeClass boolean whether to write Class object for values
+   */
+  public SendGlobalCommCache(boolean writeClass) {
+    this.writeClass = writeClass;
+  }
+
+  /**
+   * Add global communication value to the cache
+   *
+   * @param taskId Task id of worker which owns the value
+   * @param name Name
+   * @param type Global communication type
+   * @param value Value
+   * @return Number of bytes in serialized data for this worker
+   * @throws IOException
+   */
+  public int addValue(Integer taskId, String name,
+      GlobalCommType type, Writable value) throws IOException {
+    GlobalCommValueOutputStream out = globalCommMap.get(taskId);
+    if (out == null) {
+      out = new GlobalCommValueOutputStream(writeClass);
+      globalCommMap.put(taskId, out);
+    }
+    return out.addValue(name, type, value);
+  }
+
+  /**
+   * Remove and get values for certain worker
+   *
+   * @param taskId Partition id of worker owner
+   * @return Serialized global communication data for this worker
+   */
+  public byte[] removeSerialized(Integer taskId) {
+    incrementCounter(taskId);
+    GlobalCommValueOutputStream out = globalCommMap.remove(taskId);
+    if (out == null) {
+      return new byte[4];
+    } else {
+      return out.flush();
+    }
+  }
+
+  /**
+   * Creates special value which will hold the total number of global
+   * communication requests for worker with selected task id. This should be
+   * called after all values for the worker have been added to the cache.
+   *
+   * @param taskId Destination worker's task id
+   * @throws IOException
+   */
+  public void addSpecialCount(Integer taskId) throws IOException {
+    // current number of requests, plus one for the last flush
+    long totalCount = getCount(taskId) + 1;
+    addValue(taskId, GlobalCommType.SPECIAL_COUNT.name(),
+        GlobalCommType.SPECIAL_COUNT, new LongWritable(totalCount));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
index 360a39b..42009a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
@@ -18,25 +18,25 @@
 
 package org.apache.giraph.comm.aggregators;
 
-import org.apache.hadoop.io.Writable;
-
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * Aggregates worker aggregator requests and sends them off
  */
 public interface WorkerAggregatorRequestProcessor {
   /**
-   * Sends worker aggregated value to the owner of aggregator
+   * Sends worker reduced value to the owner of reducer
    *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
+   * @param name Name of the reducer
+   * @param reducedValue Reduced partial value
    * @throws java.io.IOException
-   * @return True if aggregated value will be sent, false if this worker is
-   * the owner of the aggregator
+   * @return True if reduced value will be sent, false if this worker is
+   * the owner of the reducer
    */
-  boolean sendAggregatedValue(String aggregatorName,
-      Writable aggregatedValue) throws IOException;
+  boolean sendReducedValue(String name,
+      Writable reducedValue) throws IOException;
 
   /**
    * Flush aggregated values cache.
@@ -46,19 +46,19 @@ public interface WorkerAggregatorRequestProcessor {
   void flush() throws IOException;
 
   /**
-   * Sends aggregated values to the master. This worker is the owner of these
-   * aggregators.
+   * Sends reduced values to the master. This worker is the owner of these
+   * reducers.
    *
-   * @param aggregatorData Serialized aggregator data
+   * @param data Serialized reduced values data
    * @throws IOException
    */
-  void sendAggregatedValuesToMaster(byte[] aggregatorData) throws IOException;
+  void sendReducedValuesToMaster(byte[] data) throws IOException;
 
   /**
-   * Sends aggregators to all other workers
+   * Sends reduced values to all other workers
    *
-   * @param aggregatorDataList Serialized aggregator data split into chunks
+   * @param reducedDataList Serialized reduced values data split into chunks
    */
-  void distributeAggregators(
-      Iterable<byte[]> aggregatorDataList) throws IOException;
+  void distributeReducedValues(
+      Iterable<byte[]> reducedDataList) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 51277c9..e110782 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -20,14 +20,13 @@ package org.apache.giraph.comm.netty;
 
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.SendAggregatorCache;
+import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -42,8 +41,8 @@ public class NettyMasterClient implements MasterClient {
   /** Worker information for current superstep */
   private final CentralizedServiceMaster<?, ?, ?> service;
   /** Cached map of partition ids to serialized aggregator data */
-  private final SendAggregatorCache sendAggregatorCache =
-      new SendAggregatorCache();
+  private final SendGlobalCommCache sendGlobalCommCache =
+      new SendGlobalCommCache(true);
   /** How big a single aggregator request can be */
   private final int maxBytesPerAggregatorRequest;
   /** Progressable used to report progress */
@@ -78,26 +77,25 @@ public class NettyMasterClient implements MasterClient {
   }
 
   @Override
-  public void sendAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
+  public void sendToOwner(String name, GlobalCommType sendType, Writable object)
+    throws IOException {
     WorkerInfo owner =
-        AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList());
-    int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(),
-        aggregatorName, aggregatorFactory, aggregatedValue);
+        AggregatorUtils.getOwner(name, service.getWorkerInfoList());
+    int currentSize = sendGlobalCommCache.addValue(owner.getTaskId(),
+        name, sendType, object);
     if (currentSize >= maxBytesPerAggregatorRequest) {
       flushAggregatorsToWorker(owner);
     }
   }
 
   @Override
-  public void finishSendingAggregatedValues() throws IOException {
+  public void finishSendingValues() throws IOException {
     for (WorkerInfo worker : service.getWorkerInfoList()) {
-      sendAggregatorCache.addCountAggregator(worker.getTaskId());
+      sendGlobalCommCache.addSpecialCount(worker.getTaskId());
       flushAggregatorsToWorker(worker);
       progressable.progress();
     }
-    sendAggregatorCache.reset();
+    sendGlobalCommCache.reset();
   }
 
   /**
@@ -106,10 +104,10 @@ public class NettyMasterClient implements MasterClient {
    * @param worker Worker which we want to send aggregators to
    */
   private void flushAggregatorsToWorker(WorkerInfo worker) {
-    byte[] aggregatorData =
-        sendAggregatorCache.removeAggregators(worker.getTaskId());
+    byte[] data =
+        sendGlobalCommCache.removeSerialized(worker.getTaskId());
     nettyClient.sendWritableRequest(
-        worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData,
+        worker.getTaskId(), new SendAggregatorsToOwnerRequest(data,
           service.getMasterInfo().getTaskId()));
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index 1c05910..60566f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.net.InetSocketAddress;
+
 import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.util.Progressable;
 
-import java.net.InetSocketAddress;
-
 /**
  * Netty implementation of {@link MasterServer}
  */
@@ -46,7 +46,7 @@ public class NettyMasterServer implements MasterServer {
       Progressable progressable,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     nettyServer = new NettyServer(conf,
-        new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
+        new MasterRequestServerHandler.Factory(service.getGlobalCommHandler()),
         service.getMasterInfo(), progressable, exceptionHandler);
     nettyServer.start();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index 8b5f293..3096c6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -18,21 +18,22 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.SendAggregatedValueCache;
-import org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest;
+import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
+import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
 import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-
 /**
  * Netty implementation of {@link WorkerAggregatorRequestProcessor}
  */
@@ -45,8 +46,8 @@ public class NettyWorkerAggregatorRequestProcessor
   /** Service worker */
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Cached map of partition ids to serialized aggregator data */
-  private final SendAggregatedValueCache sendAggregatedValueCache =
-      new SendAggregatedValueCache();
+  private final SendGlobalCommCache sendReducedValuesCache =
+      new SendGlobalCommCache(false);
   /** How big a single aggregator request can be */
   private final int maxBytesPerAggregatorRequest;
 
@@ -71,16 +72,16 @@ public class NettyWorkerAggregatorRequestProcessor
   }
 
   @Override
-  public boolean sendAggregatedValue(String aggregatorName,
-      Writable aggregatedValue) throws IOException {
+  public boolean sendReducedValue(String name,
+      Writable reducedValue) throws IOException {
     WorkerInfo owner =
-        AggregatorUtils.getOwner(aggregatorName,
+        AggregatorUtils.getOwner(name,
             serviceWorker.getWorkerInfoList());
     if (isThisWorker(owner)) {
       return false;
     } else {
-      int currentSize = sendAggregatedValueCache.addAggregator(
-          owner.getTaskId(), aggregatorName, aggregatedValue);
+      int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
+          name, GlobalCommType.REDUCED_VALUE, reducedValue);
       if (currentSize >= maxBytesPerAggregatorRequest) {
         flushAggregatorsToWorker(owner);
       }
@@ -92,12 +93,12 @@ public class NettyWorkerAggregatorRequestProcessor
   public void flush() throws IOException {
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
       if (!isThisWorker(workerInfo)) {
-        sendAggregatedValueCache.addCountAggregator(workerInfo.getTaskId());
+        sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
         flushAggregatorsToWorker(workerInfo);
         progressable.progress();
       }
     }
-    sendAggregatedValueCache.reset();
+    sendReducedValuesCache.reset();
   }
 
   /**
@@ -106,22 +107,21 @@ public class NettyWorkerAggregatorRequestProcessor
    * @param worker Worker which we want to send aggregators to
    */
   private void flushAggregatorsToWorker(WorkerInfo worker) {
-    byte[] aggregatorData =
-        sendAggregatedValueCache.removeAggregators(worker.getTaskId());
+    byte[] data =
+        sendReducedValuesCache.removeSerialized(worker.getTaskId());
     workerClient.sendWritableRequest(worker.getTaskId(),
-        new SendWorkerAggregatorsRequest(aggregatorData,
+        new SendWorkerAggregatorsRequest(data,
             serviceWorker.getWorkerInfo().getTaskId()));
   }
 
   @Override
-  public void sendAggregatedValuesToMaster(
-      byte[] aggregatorData) throws IOException {
+  public void sendReducedValuesToMaster(byte[] data) throws IOException {
     workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
-        new SendAggregatorsToMasterRequest(aggregatorData));
+        new SendReducedToMasterRequest(data));
   }
 
   @Override
-  public void distributeAggregators(
+  public void distributeReducedValues(
       Iterable<byte[]> aggregatorDataList) throws IOException {
     for (byte[] aggregatorData : aggregatorDataList) {
       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index e043314..02c72f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -18,10 +18,10 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.requests.MasterRequest;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.master.MasterAggregatorHandler;
 
 /** Handler for requests on master */
 public class MasterRequestServerHandler extends

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index c7561ee..26eaa8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -54,7 +54,7 @@ public enum RequestType {
   /** Send aggregated values from one worker's vertices */
   SEND_WORKER_AGGREGATORS_REQUEST(SendWorkerAggregatorsRequest.class),
   /** Send aggregated values from worker owner to master */
-  SEND_AGGREGATORS_TO_MASTER_REQUEST(SendAggregatorsToMasterRequest.class),
+  SEND_AGGREGATORS_TO_MASTER_REQUEST(SendReducedToMasterRequest.class),
   /** Send aggregators from master to worker owners */
   SEND_AGGREGATORS_TO_OWNER_REQUEST(SendAggregatorsToOwnerRequest.class),
   /** Send aggregators from worker owner to other workers */

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
deleted file mode 100644
index 2a05192..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.master.MasterAggregatorHandler;
-
-import java.io.IOException;
-
-/**
- * Request to send final aggregated values from worker which owns
- * aggregators to the master
- */
-public class SendAggregatorsToMasterRequest extends ByteArrayRequest
-    implements MasterRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendAggregatorsToMasterRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendAggregatorsToMasterRequest() {
-  }
-
-  @Override
-  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
-    try {
-      aggregatorHandler.acceptAggregatedValues(getDataInput());
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 10d8d02..2d5cc51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -21,11 +21,12 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -55,28 +56,32 @@ public class SendAggregatorsToOwnerRequest
 
   @Override
   public void doRequest(ServerData serverData) {
+    UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
+    UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
+
     DataInput input = getDataInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(input, conf);
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromMaster(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        Writable value = WritableUtils.readWritableObject(input, conf);
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          aggregatorData.receivedRequestCountFromMaster(
+              ((LongWritable) value).get(),
               getSenderTaskId());
         } else {
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorFactory);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
-          serverData.getOwnerAggregatorData().registerAggregator(
-              aggregatorName, aggregatorFactory);
+          aggregatorData.receiveValueFromMaster(name, type, value);
+
+          if (type == GlobalCommType.REDUCE_OPERATIONS) {
+            ReduceOperation<Object, Writable> reduceOpCopy =
+                (ReduceOperation<Object, Writable>)
+                WritableUtils.createCopy(reusedOut, reusedIn, value);
+
+            serverData.getOwnerAggregatorData().registerReducer(
+                name, reduceOpCopy);
+          }
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index d469e96..361bdc9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -21,11 +21,9 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -58,23 +56,17 @@ public class SendAggregatorsToWorkerRequest extends
     DataInput input = getDataInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(input, conf);
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        Writable value = WritableUtils.readWritableObject(input, conf);
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          aggregatorData.receivedRequestCountFromWorker(
+              ((LongWritable) value).get(),
               getSenderTaskId());
         } else {
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorFactory);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+          aggregatorData.receiveValueFromMaster(name, type, value);
         }
       }
     } catch (IOException e) {