You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:21 UTC

[38/47] Reduce/broadcast API

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
new file mode 100644
index 0000000..7171f04
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import java.io.IOException;
+
+import org.apache.giraph.master.MasterAggregatorHandler;
+
+/**
+ * Request to send final aggregated values from worker which owns
+ * aggregators to the master
+ */
+public class SendReducedToMasterRequest extends ByteArrayRequest
+    implements MasterRequest {
+
+  /**
+   * Constructor
+   *
+   * @param data Serialized aggregator data
+   */
+  public SendReducedToMasterRequest(byte[] data) {
+    super(data);
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendReducedToMasterRequest() {
+  }
+
+  @Override
+  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+    try {
+      aggregatorHandler.acceptReducedValues(getDataInput());
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: " +
+          "IOException occurred while processing request", e);
+    }
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 00a0c26..2f76e6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 /**
  * Request to send partial aggregated values for current superstep (values
  * which were computed by one worker's vertices)
@@ -56,20 +56,23 @@ public class SendWorkerAggregatorsRequest extends
     OwnerAggregatorServerData aggregatorData =
         serverData.getOwnerAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        if (aggregatorName.equals(
-            AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          LongWritable value = new LongWritable();
+          value.readFields(input);
+          aggregatorData.receivedRequestCountFromWorker(
+              value.get(),
               getSenderTaskId());
+        } else if (type == GlobalCommType.REDUCED_VALUE) {
+          Writable value = aggregatorData.createInitialValue(name);
+          value.readFields(input);
+          aggregatorData.reduce(name, value);
         } else {
-          Writable aggregatedValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatedValue.readFields(input);
-          aggregatorData.aggregate(aggregatorName, aggregatedValue);
+          throw new IllegalStateException(
+              "SendWorkerAggregatorsRequest received " + type);
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
index e7c3084..1ea6603 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
@@ -18,20 +18,20 @@
 
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 /**
  * See {@link Computation} for explanation of the interface.
  *
@@ -52,7 +52,7 @@ import java.util.Iterator;
 public abstract class AbstractComputation<I extends WritableComparable,
     V extends Writable, E extends Writable, M1 extends Writable,
     M2 extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    extends WorkerAggregatorDelegator<I, V, E>
     implements Computation<I, V, E, M1, M2> {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(AbstractComputation.class);
@@ -63,8 +63,6 @@ public abstract class AbstractComputation<I extends WritableComparable,
   private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
   /** Graph-wide BSP Mapper for this Computation */
   private GraphTaskManager<I, V, E> graphTaskManager;
-  /** Worker aggregator usage */
-  private WorkerAggregatorUsage workerAggregatorUsage;
   /** Worker context */
   private WorkerContext workerContext;
 
@@ -76,6 +74,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
    *                 superstep.  Each message is only guaranteed to have
    *                 a life expectancy as long as next() is not called.
    */
+  @Override
   public abstract void compute(Vertex<I, V, E> vertex,
       Iterable<M1> messages) throws IOException;
 
@@ -103,7 +102,7 @@ public abstract class AbstractComputation<I extends WritableComparable,
    * @param graphState Graph state
    * @param workerClientRequestProcessor Processor for handling requests
    * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
-   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerGlobalCommUsage Worker global communication usage
    * @param workerContext Worker context
    */
   @Override
@@ -111,12 +110,12 @@ public abstract class AbstractComputation<I extends WritableComparable,
       GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerAggregatorUsage workerAggregatorUsage,
+      WorkerGlobalCommUsage workerGlobalCommUsage,
       WorkerContext workerContext) {
     this.graphState = graphState;
     this.workerClientRequestProcessor = workerClientRequestProcessor;
     this.graphTaskManager = graphTaskManager;
-    this.workerAggregatorUsage = workerAggregatorUsage;
+    this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
     this.workerContext = workerContext;
   }
 
@@ -274,14 +273,4 @@ public abstract class AbstractComputation<I extends WritableComparable,
   public <W extends WorkerContext> W getWorkerContext() {
     return (W) workerContext;
   }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 7a7b40f..d310da9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -17,6 +17,9 @@
  */
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.TypesHolder;
@@ -24,13 +27,11 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 /**
  * Interface for an application for computation.
  *
@@ -55,7 +56,7 @@ public interface Computation<I extends WritableComparable,
     M2 extends Writable>
     extends TypesHolder<I, V, E, M1, M2>,
     ImmutableClassesGiraphConfigurable<I, V, E>,
-    WorkerAggregatorUsage {
+    WorkerGlobalCommUsage, WorkerAggregatorUsage {
   /**
    * Must be defined by user to do computation on a single Vertex.
    *
@@ -87,13 +88,13 @@ public interface Computation<I extends WritableComparable,
    * @param graphState Graph state
    * @param workerClientRequestProcessor Processor for handling requests
    * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
-   * @param workerAggregatorUsage Worker aggregator usage
+   * @param workerGlobalCommUsage Worker global communication usage
    * @param workerContext Worker context
    */
   void initialize(GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerAggregatorUsage workerAggregatorUsage, WorkerContext workerContext);
+      WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext);
 
   /**
    * Retrieves the current superstep.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index d9c4302..33f2255 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -36,7 +36,7 @@ import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.utils.Trimmable;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerProgress;
-import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
+import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -135,7 +135,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
             context, configuration, serviceWorker);
-    WorkerThreadAggregatorUsage aggregatorUsage =
+    WorkerThreadGlobalCommUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
     WorkerContext workerContext = serviceWorker.getWorkerContext();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index ba5d2fa..eb9fad3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,6 +18,19 @@
 
 package org.apache.giraph.graph;
 
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -26,9 +39,7 @@ import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.job.JobProgressTracker;
-import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.master.BspServiceMaster;
-import org.apache.giraph.master.MasterAggregatorUsage;
 import org.apache.giraph.master.MasterThread;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
@@ -40,6 +51,7 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
@@ -60,19 +72,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The Giraph-specific business logic for a single BSP
  * compute node in whatever underlying type of cluster
@@ -149,7 +148,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /** Timer for WorkerContext#preSuperstep() */
   private GiraphTimer wcPreSuperstepTimer;
   /** The Hadoop Mapper#Context for this job */
-  private Mapper<?, ?, ?, ?>.Context context;
+  private final Mapper<?, ?, ?, ?>.Context context;
   /** is this GraphTaskManager the master? */
   private boolean isMaster;
 
@@ -497,15 +496,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     return graphFunctions;
   }
 
-  /**
-   * Get master aggregator usage, a subset of the functionality
-   *
-   * @return Master aggregator usage interface
-   */
-  public final MasterAggregatorUsage getMasterAggregatorUsage() {
-    return serviceMaster.getAggregatorHandler();
-  }
-
   public final WorkerContext getWorkerContext() {
     return serviceWorker.getWorkerContext();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 1bc48e3..83a0369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -19,9 +19,9 @@
 package org.apache.giraph.io;
 
 import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+
 import org.apache.giraph.edge.Edge;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,11 +36,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 @SuppressWarnings("rawtypes")
 public abstract class EdgeReader<I extends WritableComparable,
-    E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
-        I, Writable, E> implements WorkerAggregatorUsage {
-
-  /** Aggregator usage for edge reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
+    E extends Writable> extends WorkerAggregatorDelegator<
+        I, Writable, E> {
 
   /**
    * Use the input split and context to setup reading the edges.
@@ -56,21 +53,6 @@ public abstract class EdgeReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading an edge.
-   * It is invoked just after initialization.
-   * E.g.,
-   * edgeReader.initialize(inputSplit, context);
-   * edgeReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for edge reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
-  /**
    * Read the next edge.
    *
    * @return false iff there are no more edges
@@ -117,14 +99,4 @@ public abstract class EdgeReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
index b7ce97c..7c71585 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
 import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * Will read the mapping from an input split.
  *
@@ -38,12 +37,7 @@ import java.io.IOException;
  */
 public abstract class MappingReader<I extends WritableComparable,
     V extends Writable, E extends Writable, B extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements WorkerAggregatorUsage {
-
-  /** Aggregator usage for vertex reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
-
+    extends WorkerAggregatorDelegator<I, V, E> {
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -57,22 +51,6 @@ public abstract class MappingReader<I extends WritableComparable,
     TaskAttemptContext context)
     throws IOException, InterruptedException;
 
-
-  /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading a vertex.
-   * It is invoked just after initialization.
-   * E.g.,
-   * vertexReader.initialize(inputSplit, context);
-   * vertexReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for vertex reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
   /**
    *
    * @return false iff there are no more vertices
@@ -111,14 +89,4 @@ public abstract class MappingReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 94a4083..64ec800 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import java.io.IOException;
+
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * Analogous to Hadoop's RecordReader for vertices.  Will read the
  * vertices from an input split.
@@ -39,11 +38,7 @@ import java.io.IOException;
 @SuppressWarnings("rawtypes")
 public abstract class VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements WorkerAggregatorUsage {
-  /** Aggregator usage for vertex reader */
-  private WorkerAggregatorUsage workerAggregatorUsage;
-
+    WorkerAggregatorDelegator<I, V, E> {
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -58,21 +53,6 @@ public abstract class VertexReader<I extends WritableComparable,
     throws IOException, InterruptedException;
 
   /**
-   * Set aggregator usage. It provides the functionality
-   * of aggregation operation in reading a vertex.
-   * It is invoked just after initialization.
-   * E.g.,
-   * vertexReader.initialize(inputSplit, context);
-   * vertexReader.setAggregator(aggregatorUsage);
-   * This method is only for use by the infrastructure.
-   *
-   * @param agg aggregator usage for vertex reader
-   */
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    workerAggregatorUsage = agg;
-  }
-
-  /**
    *
    * @return false iff there are no more vertices
    * @throws IOException
@@ -108,14 +88,4 @@ public abstract class VertexReader<I extends WritableComparable,
    * @throws InterruptedException
    */
   public abstract float getProgress() throws IOException, InterruptedException;
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index 9b5e8c6..05dd5bc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -72,9 +72,10 @@ public class WrappedEdgeReader<I extends WritableComparable,
   }
 
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    // Set aggregator usage for edge reader
-    baseEdgeReader.setWorkerAggregatorUse(agg);
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
+    // Set global communication usage for edge reader
+    baseEdgeReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
index 7d1c4c9..659776b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.MappingReader;
 import org.apache.giraph.job.HadoopUtils;
 import org.apache.giraph.mapping.MappingEntry;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -74,11 +74,11 @@ public class WrappedMappingReader<I extends WritableComparable,
         HadoopUtils.makeTaskAttemptContext(getConf(), context));
   }
 
-
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
-    // Set aggregator usage for vertex reader
-    baseMappingReader.setWorkerAggregatorUse(agg);
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
+    // Set global communication usage for edge reader
+    baseMappingReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 8e25602..8c23cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -18,18 +18,18 @@
 
 package org.apache.giraph.io.internal;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.job.HadoopUtils;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * For internal use only.
  *
@@ -73,9 +73,10 @@ public class WrappedVertexReader<I extends WritableComparable,
   }
 
   @Override
-  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+  public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) {
+    super.setWorkerGlobalCommUsage(usage);
     // Set aggregator usage for vertex reader
-    baseVertexReader.setWorkerAggregatorUse(agg);
+    baseVertexReader.setWorkerGlobalCommUsage(usage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
new file mode 100644
index 0000000..1673f6d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Translates aggregation operation to reduce operations.
+ *
+ * @param <A> Aggregation object type
+ */
+public class AggregatorReduceOperation<A extends Writable>
+    extends OnSameReduceOperation<A> {
+  /** Aggregator factory */
+  private WritableFactory<? extends Aggregator<A>> aggregatorFactory;
+  /** Aggregator */
+  private Aggregator<A> aggregator;
+
+  /** Constructor */
+  public AggregatorReduceOperation() {
+  }
+
+  /**
+   * Constructor
+   * @param aggregatorFactory Aggregator factory
+   */
+  public AggregatorReduceOperation(
+      WritableFactory<? extends Aggregator<A>> aggregatorFactory) {
+    this.aggregatorFactory = aggregatorFactory;
+    this.aggregator = aggregatorFactory.create();
+    this.aggregator.setAggregatedValue(null);
+  }
+
+  @Override
+  public A createInitialValue() {
+    return aggregator.createInitialValue();
+  }
+
+  /**
+   * Creates copy of this object
+   * @return copy
+   */
+  public AggregatorReduceOperation<A> createCopy() {
+    return new AggregatorReduceOperation<>(aggregatorFactory);
+  }
+
+  @Override
+  public synchronized void reduceSingle(A curValue, A valueToReduce) {
+    aggregator.setAggregatedValue(curValue);
+    aggregator.aggregate(valueToReduce);
+    if (curValue != aggregator.getAggregatedValue()) {
+      throw new IllegalStateException(
+          "Aggregator " + aggregator + " aggregates by creating new value");
+    }
+    aggregator.setAggregatedValue(null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeWritableObject(aggregatorFactory, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    aggregatorFactory = WritableUtils.readWritableObject(in, null);
+    aggregator = aggregatorFactory.create();
+    this.aggregator.setAggregatedValue(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
new file mode 100644
index 0000000..7492fc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.aggregators.ClassAggregatorFactory;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class that translates aggregator handling on the master to
+ * reduce and broadcast operations supported by the MasterAggregatorHandler.
+ */
+public class AggregatorToGlobalCommTranslation
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements MasterAggregatorUsage, Writable {
+  /** Class providing reduce and broadcast interface to use */
+  private final MasterGlobalCommUsage globalComm;
+  /** List of registered aggregators */
+  private final HashMap<String, AggregatorWrapper<Writable>>
+  registeredAggregators = new HashMap<>();
+
+  /**
+   * Constructor
+   * @param globalComm Global communication interface
+   */
+  public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) {
+    this.globalComm = globalComm;
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return globalComm.getReduced(name);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name);
+    aggregator.setCurrentValue(value);
+  }
+
+  /**
+   * Called after master compute, to do aggregator->reduce/broadcast
+   * translation
+   */
+  public void postMasterCompute() {
+    // broadcast what master set, or if it didn't broadcast reduced value
+    // register reduce with the same value
+    for (Entry<String, AggregatorWrapper<Writable>> entry :
+        registeredAggregators.entrySet()) {
+      Writable value = entry.getValue().currentValue != null ?
+          entry.getValue().getCurrentValue() :
+            globalComm.getReduced(entry.getKey());
+      if (value == null) {
+        value = entry.getValue().getReduceOp().createInitialValue();
+      }
+
+      globalComm.broadcast(entry.getKey(), value);
+      // Always register clean instance of reduceOp, not to conflict with
+      // reduceOp from previous superstep.
+      AggregatorReduceOperation<Writable> cleanReduceOp =
+          entry.getValue().createReduceOp();
+      if (entry.getValue().isPersistent()) {
+        globalComm.registerReduce(
+            entry.getKey(), cleanReduceOp, value);
+      } else {
+        globalComm.registerReduce(
+            entry.getKey(), cleanReduceOp);
+      }
+      entry.getValue().setCurrentValue(null);
+    }
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass);
+    return registerAggregator(name, aggregatorFactory, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      WritableFactory<? extends Aggregator<A>> aggregator) throws
+      InstantiationException, IllegalAccessException {
+    return registerAggregator(name, aggregator, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerPersistentAggregator(String name,
+      Class<? extends Aggregator<A>> aggregatorClass) throws
+      InstantiationException, IllegalAccessException {
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass);
+    return registerAggregator(name, aggregatorFactory, true) != null;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(registeredAggregators.size());
+    for (Entry<String, AggregatorWrapper<Writable>> entry :
+        registeredAggregators.entrySet()) {
+      out.writeUTF(entry.getKey());
+      entry.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    registeredAggregators.clear();
+    int numAggregators = in.readInt();
+    for (int i = 0; i < numAggregators; i++) {
+      String name = in.readUTF();
+      AggregatorWrapper<Writable> agg = new AggregatorWrapper<>();
+      agg.readFields(in);
+      registeredAggregators.put(name, agg);
+    }
+  }
+
+  /**
+   * Helper function for registering aggregators.
+   *
+   * @param name              Name of the aggregator
+   * @param aggregatorFactory Aggregator factory
+   * @param persistent        Whether aggregator is persistent or not
+   * @param <A>               Aggregated value type
+   * @return Newly registered aggregator or aggregator which was previously
+   *         created with selected name, if any
+   */
+  private <A extends Writable> AggregatorWrapper<A> registerAggregator
+  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+      boolean persistent) throws InstantiationException,
+      IllegalAccessException {
+    AggregatorWrapper<A> aggregatorWrapper =
+        (AggregatorWrapper<A>) registeredAggregators.get(name);
+    if (aggregatorWrapper == null) {
+      aggregatorWrapper =
+          new AggregatorWrapper<A>(aggregatorFactory, persistent);
+      registeredAggregators.put(
+          name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+    }
+    return aggregatorWrapper;
+  }
+
+  /**
+   * Object holding all needed data related to single Aggregator
+   * @param <A> Aggregated value type
+   */
+  private static class AggregatorWrapper<A extends Writable>
+      implements Writable {
+    /** False iff aggregator should be reset at the end of each super step */
+    private boolean persistent;
+    /** Translation of aggregator to reduce operations */
+    private AggregatorReduceOperation<A> reduceOp;
+    /** Current value, set by master manually */
+    private A currentValue;
+
+    /** Constructor */
+    public AggregatorWrapper() {
+    }
+
+    /**
+     * Constructor
+     * @param aggregatorFactory Aggregator factory
+     * @param persistent Is persistent
+     */
+    public AggregatorWrapper(
+        WritableFactory<? extends Aggregator<A>> aggregatorFactory,
+        boolean persistent) {
+      this.persistent = persistent;
+      this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory);
+    }
+
+    public AggregatorReduceOperation<A> getReduceOp() {
+      return reduceOp;
+    }
+
+    /**
+     * Create a fresh instance of AggregatorReduceOperation
+     * @return fresh instance of AggregatorReduceOperation
+     */
+    public AggregatorReduceOperation<A> createReduceOp() {
+      return reduceOp.createCopy();
+    }
+
+    public A getCurrentValue() {
+      return currentValue;
+    }
+
+    public void setCurrentValue(A currentValue) {
+      this.currentValue = currentValue;
+    }
+
+    public boolean isPersistent() {
+      return persistent;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeBoolean(persistent);
+      reduceOp.write(out);
+
+      Preconditions.checkState(currentValue == null, "AggregatorWrapper " +
+          "shouldn't have value at the end of the superstep");
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      persistent = in.readBoolean();
+      reduceOp = new AggregatorReduceOperation<>();
+      reduceOp.readFields(in);
+      currentValue = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index efa5b87..ab1289d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -18,11 +18,39 @@
 
 package org.apache.giraph.master;
 
+import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
+import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
+import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
+import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.bsp.SuperstepState;
@@ -33,23 +61,17 @@ import org.apache.giraph.comm.netty.NettyMasterServer;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.counters.GiraphStats;
-import org.apache.giraph.graph.InputSplitPaths;
-import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphFunctions;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.partition.BasicPartitionOwner;
-import org.apache.giraph.partition.MasterGraphPartitioner;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.partition.PartitionUtils;
 import org.apache.giraph.metrics.AggregatedMetrics;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphTimer;
@@ -57,13 +79,18 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
-import org.apache.giraph.utils.CheckpointingUtils;
-import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
-import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.partition.BasicPartitionOwner;
+import org.apache.giraph.partition.MasterGraphPartitioner;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.CheckpointingUtils;
+import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
@@ -89,32 +116,6 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import net.iharder.Base64;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
-import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
-import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceMaster}.
@@ -167,8 +168,10 @@ public class BspServiceMaster<I extends WritableComparable,
   /** All the partition stats from the last superstep */
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
-  /** Handler for aggregators */
-  private MasterAggregatorHandler aggregatorHandler;
+  /** Handler for global communication */
+  private MasterAggregatorHandler globalCommHandler;
+  /** Handler for aggregators to reduce/broadcast translation */
+  private AggregatorToGlobalCommTranslation aggregatorTranslation;
   /** Master class */
   private MasterCompute masterCompute;
   /** IPC Client */
@@ -232,7 +235,7 @@ public class BspServiceMaster<I extends WritableComparable,
     this.checkpointStatus = CheckpointStatus.NONE;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    GiraphStats.init((Mapper.Context) context);
+    GiraphStats.init(context);
   }
 
   @Override
@@ -738,8 +741,13 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
-  public MasterAggregatorHandler getAggregatorHandler() {
-    return aggregatorHandler;
+  public MasterAggregatorHandler getGlobalCommHandler() {
+    return globalCommHandler;
+  }
+
+  @Override
+  public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
+    return aggregatorTranslation;
   }
 
   @Override
@@ -811,7 +819,8 @@ public class BspServiceMaster<I extends WritableComparable,
     });
 
 
-    aggregatorHandler.readFields(finalizedStream);
+    globalCommHandler.readFields(finalizedStream);
+    aggregatorTranslation.readFields(finalizedStream);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
 
@@ -883,9 +892,12 @@ public class BspServiceMaster<I extends WritableComparable,
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
               setValue(getTaskPartition());
-          aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
-              getContext());
-          aggregatorHandler.initialize(this);
+          globalCommHandler = new MasterAggregatorHandler(
+              getConfiguration(), getContext());
+          aggregatorTranslation = new AggregatorToGlobalCommTranslation(
+              globalCommHandler);
+
+          globalCommHandler.initialize(this);
           masterCompute = getConfiguration().createMasterCompute();
           masterCompute.setMasterService(this);
 
@@ -1097,7 +1109,8 @@ public class BspServiceMaster<I extends WritableComparable,
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
     }
-    aggregatorHandler.write(finalizedOutputStream);
+    globalCommHandler.write(finalizedOutputStream);
+    aggregatorTranslation.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
     lastCheckpointedSuperstep = superstep;
@@ -1502,7 +1515,8 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private void initializeAggregatorInputSuperstep()
     throws InterruptedException {
-    aggregatorHandler.prepareSuperstep(masterClient);
+    globalCommHandler.prepareSuperstep();
+
     prepareMasterCompute(getSuperstep());
     try {
       masterCompute.initialize();
@@ -1516,7 +1530,10 @@ public class BspServiceMaster<I extends WritableComparable,
       throw new RuntimeException(
         "initializeAggregatorInputSuperstep: Failed in access", e);
     }
-    aggregatorHandler.finishSuperstep(masterClient);
+    aggregatorTranslation.postMasterCompute();
+    globalCommHandler.finishSuperstep();
+
+    globalCommHandler.sendDataToOwners(masterClient);
   }
 
   /**
@@ -1579,18 +1596,18 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
+    // We need to finalize aggregators from previous superstep
+    if (getSuperstep() >= 0) {
+      aggregatorTranslation.postMasterCompute();
+      globalCommHandler.finishSuperstep();
+    }
+
     masterClient.openConnections();
 
     GiraphStats.getInstance().
         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
     assignPartitionOwners();
 
-    // We need to finalize aggregators from previous superstep (send them to
-    // worker owners) after new worker assignments
-    if (getSuperstep() >= 0) {
-      aggregatorHandler.finishSuperstep(masterClient);
-    }
-
     // Finalize the valid checkpoint file prefixes and possibly
     // the aggregators.
     if (checkpointStatus != CheckpointStatus.NONE) {
@@ -1616,6 +1633,11 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
+    // We need to send aggregators to worker owners after new worker assignments
+    if (getSuperstep() >= 0) {
+      globalCommHandler.sendDataToOwners(masterClient);
+    }
+
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
       initializeAggregatorInputSuperstep();
@@ -1645,7 +1667,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    aggregatorHandler.prepareSuperstep(masterClient);
+    globalCommHandler.prepareSuperstep();
     SuperstepClasses superstepClasses =
       prepareMasterCompute(getSuperstep() + 1);
     doMasterCompute();
@@ -1710,7 +1732,7 @@ public class BspServiceMaster<I extends WritableComparable,
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
-    aggregatorHandler.writeAggregators(getSuperstep(), superstepState);
+    globalCommHandler.writeAggregators(getSuperstep(), superstepState);
 
     return superstepState;
   }
@@ -1935,7 +1957,7 @@ public class BspServiceMaster<I extends WritableComparable,
         failJob(new Exception("Checkpoint and halt requested. " +
             "Killing this job."));
       }
-      aggregatorHandler.close();
+      globalCommHandler.close();
       masterClient.closeConnections();
       masterServer.close();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 2b0cdd6..5f7bd73 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -15,263 +15,224 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.master;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.AbstractMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.aggregators.AggregatorWrapper;
 import org.apache.giraph.aggregators.AggregatorWriter;
-import org.apache.giraph.aggregators.ClassAggregatorFactory;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.MasterLoggingAggregator;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
-    Writable {
+/** Handler for reduce/broadcast on the master */
+public class MasterAggregatorHandler
+    implements MasterGlobalCommUsage, Writable {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(MasterAggregatorHandler.class);
-  /**
-   * Map of aggregators.
-   * This map is used to store final aggregated values received from worker
-   * owners, and also to read and write values provided during master.compute.
-   */
-  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
+
+  /** Map of reducers registered for the next worker computation */
+  private final Map<String, Reducer<Object, Writable>> reducerMap =
+      Maps.newHashMap();
+  /** Map of values to be sent to workers for next computation */
+  private final Map<String, Writable> broadcastMap =
       Maps.newHashMap();
-  /** Aggregator writer */
+  /** Values reduced from previous computation */
+  private final Map<String, Writable> reducedMap =
+      Maps.newHashMap();
+
+  /** Aggregator writer - for writing reduced values */
   private final AggregatorWriter aggregatorWriter;
   /** Progressable used to report progress */
   private final Progressable progressable;
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 
   /**
    * Constructor
    *
-   * @param conf         Giraph configuration
-   * @param progressable Progressable used for reporting progress
+   * @param conf Configuration
+   * @param progressable Progress reporter
    */
   public MasterAggregatorHandler(
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
       Progressable progressable) {
-    this.conf = conf;
     this.progressable = progressable;
     aggregatorWriter = conf.createAggregatorWriter();
-    MasterLoggingAggregator.registerAggregator(this, conf);
   }
 
   @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      LOG.warn("getAggregatedValue: " +
-          AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              aggregatorMap.size() != 0, conf));
-      return null;
-    } else {
-      return (A) aggregator.getPreviousAggregatedValue();
-    }
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp) {
+    registerReduce(name, reduceOp, reduceOp.createInitialValue());
   }
 
   @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      throw new IllegalStateException(
-          "setAggregatedValue: " +
-              AggregatorUtils.getUnregisteredAggregatorMessage(name,
-                  aggregatorMap.size() != 0, conf));
+  public <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    if (reducerMap.containsKey(name)) {
+      throw new IllegalArgumentException(
+          "Reducer with name " + name + " was already registered");
+    }
+    if (reduceOp == null) {
+      throw new IllegalArgumentException("null reduce cannot be registered");
     }
-    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
-  }
 
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass, conf);
-    return registerAggregator(name, aggregatorFactory, false) != null;
+    Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
+    reducerMap.put(name, (Reducer<Object, Writable>) reducer);
   }
 
   @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      WritableFactory<? extends Aggregator<A>> aggregator) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregator, false) != null;
+  public <T extends Writable> T getReduced(String name) {
+    return (T) reducedMap.get(name);
   }
 
   @Override
-  public <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    ClassAggregatorFactory<A> aggregatorFactory =
-        new ClassAggregatorFactory<A>(aggregatorClass, conf);
-    return registerAggregator(name, aggregatorFactory, true) != null;
-  }
-
-  /**
-   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
-   * the name of aggregator. Throw an exception if he tries to use it.
-   *
-   * @param name Name of the aggregator to check.
-   */
-  private void checkAggregatorName(String name) {
-    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-      throw new IllegalStateException("checkAggregatorName: " +
-          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
-          " is not allowed for the name of aggregator");
+  public void broadcast(String name, Writable object) {
+    if (broadcastMap.containsKey(name)) {
+      throw new IllegalArgumentException(
+          "Value already broadcasted for name " + name);
     }
-  }
-
-  /**
-   * Helper function for registering aggregators.
-   *
-   * @param name              Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param persistent        Whether aggregator is persistent or not
-   * @param <A>               Aggregated value type
-   * @return Newly registered aggregator or aggregator which was previously
-   *         created with selected name, if any
-   */
-  private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
-      boolean persistent) throws InstantiationException,
-      IllegalAccessException {
-    AggregatorWrapper<A> aggregatorWrapper =
-        (AggregatorWrapper<A>) aggregatorMap.get(name);
-    if (aggregatorWrapper == null) {
-      aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorFactory, persistent, conf);
-      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
+    if (object == null) {
+      throw new IllegalArgumentException("null cannot be broadcasted");
     }
-    return aggregatorWrapper;
+
+    broadcastMap.put(name, object);
   }
 
-  /**
-   * Prepare aggregators for current superstep
-   *
-   * @param masterClient IPC client on master
-   */
-  public void prepareSuperstep(MasterClient masterClient) {
+  /** Prepare reduced values for current superstep's master compute */
+  public void prepareSuperstep() {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Start preparing aggregators");
+      LOG.debug("prepareSuperstep: Start preparing reducers");
     }
-    // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isPersistent()) {
-        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
+
+    Preconditions.checkState(reducedMap.isEmpty(),
+        "reducedMap must be empty before start of the superstep");
+    Preconditions.checkState(broadcastMap.isEmpty(),
+        "broadcastMap must be empty before start of the superstep");
+
+    for (Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
+      Writable value = entry.getValue().getCurrentValue();
+      if (value == null) {
+        value = entry.getValue().createInitialValue();
       }
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-      aggregator.resetCurrentAggregator();
-      progressable.progress();
+
+      reducedMap.put(entry.getKey(), value);
     }
-    MasterLoggingAggregator.logAggregatedValue(this, conf);
+
+    reducerMap.clear();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Aggregators prepared");
     }
   }
 
-  /**
-   * Finalize aggregators for current superstep and share them with workers
-   *
-   * @param masterClient IPC client on master
-   */
-  public void finishSuperstep(MasterClient masterClient) {
+  /** Finalize aggregators for current superstep */
+  public void finishSuperstep() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("finishSuperstep: Start finishing aggregators");
     }
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isChanged()) {
-        // if master compute changed the value, use the one he chose
-        aggregator.setPreviousAggregatedValue(
-            aggregator.getCurrentAggregatedValue());
-        // reset aggregator for the next superstep
-        aggregator.resetCurrentAggregator();
-      }
-      progressable.progress();
+
+    reducedMap.clear();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
     }
+  }
 
-    // send aggregators to their owners
-    // TODO: if aggregator owner and it's value didn't change,
-    //       we don't need to resend it
+  /**
+   * Send data to workers (through owner workers)
+   *
+   * @param masterClient IPC client on master
+   */
+  public void sendDataToOwners(MasterClient masterClient) {
+    // send broadcast values and reduceOperations to their owners
     try {
-      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-          aggregatorMap.entrySet()) {
-        masterClient.sendAggregator(entry.getKey(),
-            entry.getValue().getAggregatorFactory(),
-            entry.getValue().getPreviousAggregatedValue());
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          reducerMap.entrySet()) {
+        masterClient.sendToOwner(entry.getKey(),
+            GlobalCommType.REDUCE_OPERATIONS,
+            entry.getValue().getReduceOp());
         progressable.progress();
       }
-      masterClient.finishSendingAggregatedValues();
+
+      for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+        masterClient.sendToOwner(entry.getKey(),
+            GlobalCommType.BROADCAST,
+            entry.getValue());
+        progressable.progress();
+      }
+      masterClient.finishSendingValues();
+
+      broadcastMap.clear();
     } catch (IOException e) {
       throw new IllegalStateException("finishSuperstep: " +
           "IOException occurred while sending aggregators", e);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Aggregators finished");
-    }
   }
 
   /**
-   * Accept aggregated values sent by worker. Every aggregator will be sent
+   * Accept reduced values sent by worker. Every value will be sent
    * only once, by its owner.
    * We don't need to count the number of these requests because global
    * superstep barrier will happen after workers ensure all requests of this
    * type have been received and processed by master.
    *
-   * @param aggregatedValuesInput Input in which aggregated values are
+   * @param reducedValuesInput Input in which aggregated values are
    *                              written in the following format:
-   *                              number_of_aggregators
-   *                              name_1  value_1
-   *                              name_2  value_2
+   *                              numReducers
+   *                              name_1  REDUCED_VALUE  value_1
+   *                              name_2  REDUCED_VALUE  value_2
    *                              ...
    * @throws IOException
    */
-  public void acceptAggregatedValues(
-      DataInput aggregatedValuesInput) throws IOException {
-    int numAggregators = aggregatedValuesInput.readInt();
-    for (int i = 0; i < numAggregators; i++) {
-      String aggregatorName = aggregatedValuesInput.readUTF();
-      AggregatorWrapper<Writable> aggregator =
-          aggregatorMap.get(aggregatorName);
-      if (aggregator == null) {
+  public void acceptReducedValues(
+      DataInput reducedValuesInput) throws IOException {
+    int numReducers = reducedValuesInput.readInt();
+    for (int i = 0; i < numReducers; i++) {
+      String name = reducedValuesInput.readUTF();
+      GlobalCommType type =
+          GlobalCommType.values()[reducedValuesInput.readByte()];
+      if (type != GlobalCommType.REDUCED_VALUE) {
         throw new IllegalStateException(
-            "acceptAggregatedValues: " +
-                "Master received aggregator which isn't registered: " +
-                aggregatorName);
+            "SendReducedToMasterRequest received " + type);
+      }
+      Reducer<Object, Writable> reducer = reducerMap.get(name);
+      if (reducer == null) {
+        throw new IllegalStateException(
+            "acceptReducedValues: " +
+                "Master received reduced value which isn't registered: " +
+                name);
+      }
+
+      Writable valueToReduce = reducer.createInitialValue();
+      valueToReduce.readFields(reducedValuesInput);
+
+      if (reducer.getCurrentValue() != null) {
+        reducer.reducePartial(valueToReduce);
+      } else {
+        reducer.setCurrentValue(valueToReduce);
       }
-      Writable aggregatorValue = aggregator.createInitialValue();
-      aggregatorValue.readFields(aggregatedValuesInput);
-      aggregator.setCurrentAggregatedValue(aggregatorValue);
       progressable.progress();
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("acceptAggregatedValues: Accepted one set with " +
-          numAggregators + " aggregated values");
+      LOG.debug("acceptReducedValues: Accepted one set with " +
+          numReducers + " aggregated values");
     }
   }
 
@@ -281,23 +242,10 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
    * @param superstep      Superstep which just finished
    * @param superstepState State of the superstep which just finished
    */
-  public void writeAggregators(long superstep, SuperstepState superstepState) {
+  public void writeAggregators(
+      long superstep, SuperstepState superstepState) {
     try {
-      Iterable<Map.Entry<String, Writable>> iter =
-          Iterables.transform(
-              aggregatorMap.entrySet(),
-              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
-                  Map.Entry<String, Writable>>() {
-                @Override
-                public Map.Entry<String, Writable> apply(
-                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
-                  progressable.progress();
-                  return new AbstractMap.SimpleEntry<String,
-                      Writable>(entry.getKey(),
-                      entry.getValue().getPreviousAggregatedValue());
-                }
-              });
-      aggregatorWriter.writeAggregator(iter,
+      aggregatorWriter.writeAggregator(reducedMap.entrySet(),
           (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
               AggregatorWriter.LAST_SUPERSTEP : superstep);
     } catch (IOException e) {
@@ -333,43 +281,44 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(aggregatorMap.size());
-    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-        aggregatorMap.entrySet()) {
+    // At the end of superstep, only reduceOpMap can be non-empty
+    Preconditions.checkState(reducedMap.isEmpty(),
+        "reducedMap must be empty at the end of the superstep");
+
+    out.writeInt(reducerMap.size());
+    for (Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
       out.writeUTF(entry.getKey());
-      WritableUtils.writeWritableObject(
-          entry.getValue().getAggregatorFactory(), out);
-      out.writeBoolean(entry.getValue().isPersistent());
-      entry.getValue().getPreviousAggregatedValue().write(out);
+      entry.getValue().write(out);
       progressable.progress();
     }
+
+    out.writeInt(broadcastMap.size());
+    for (Entry<String, Writable> entry : broadcastMap.entrySet()) {
+      out.writeUTF(entry.getKey());
+      WritableUtils.writeWritableObject(entry.getValue(), out);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    aggregatorMap.clear();
-    int numAggregators = in.readInt();
-    try {
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = in.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(in, conf);
-        boolean isPersistent = in.readBoolean();
-        AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator(
-            aggregatorName,
-            aggregatorFactory,
-            isPersistent);
-        Writable value = aggregatorWrapper.createInitialValue();
-        value.readFields(in);
-        aggregatorWrapper.setPreviousAggregatedValue(value);
-        progressable.progress();
-      }
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("readFields: " +
-          "InstantiationException occurred", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("readFields: " +
-          "IllegalAccessException occurred", e);
+    reducedMap.clear();
+    broadcastMap.clear();
+    reducerMap.clear();
+
+    int numReducers = in.readInt();
+    for (int i = 0; i < numReducers; i++) {
+      String name = in.readUTF();
+      Reducer<Object, Writable> reducer = new Reducer<>();
+      reducer.readFields(in);
+      reducerMap.put(name, reducer);
+    }
+
+    int numBroadcast = in.readInt();
+    for (int i = 0; i < numBroadcast; i++) {
+      String name = in.readUTF();
+      Writable value = WritableUtils.readWritableObject(in, null);
+      broadcastMap.put(name, value);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 552cca9..72e4d0a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -24,6 +24,7 @@ import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.reducers.ReduceOperation;
 import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -43,7 +44,7 @@ import org.apache.hadoop.mapreduce.Mapper;
  */
 public abstract class MasterCompute
     extends DefaultImmutableClassesGiraphConfigurable
-    implements MasterAggregatorUsage, Writable {
+    implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
   /** Master aggregator usage */
@@ -190,10 +191,33 @@ public abstract class MasterCompute
   }
 
   @Override
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp) {
+    serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp);
+  }
+
+  @Override
+  public final <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    serviceMaster.getGlobalCommHandler().registerReduce(
+        name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public final <T extends Writable> T getReduced(String name) {
+    return serviceMaster.getGlobalCommHandler().getReduced(name);
+  }
+
+  @Override
+  public final void broadcast(String name, Writable object) {
+    serviceMaster.getGlobalCommHandler().broadcast(name, object);
+  }
+
+  @Override
   public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)
     throws InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerAggregator(
+    return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
         name, aggregatorClass);
   }
 
@@ -201,7 +225,7 @@ public abstract class MasterCompute
   public final <A extends Writable> boolean registerAggregator(
     String name, WritableFactory<? extends Aggregator<A>> aggregator)
     throws InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerAggregator(
+    return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
         name, aggregator);
   }
 
@@ -210,19 +234,21 @@ public abstract class MasterCompute
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    return serviceMaster.getAggregatorHandler().registerPersistentAggregator(
-        name, aggregatorClass);
+    return serviceMaster.getAggregatorTranslationHandler()
+        .registerPersistentAggregator(name, aggregatorClass);
   }
 
   @Override
   public final <A extends Writable> A getAggregatedValue(String name) {
-    return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name);
+    return serviceMaster.getAggregatorTranslationHandler()
+        .<A>getAggregatedValue(name);
   }
 
   @Override
   public final <A extends Writable> void setAggregatedValue(
       String name, A value) {
-    serviceMaster.getAggregatorHandler().setAggregatedValue(name, value);
+    serviceMaster.getAggregatorTranslationHandler()
+        .setAggregatedValue(name, value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
new file mode 100644
index 0000000..c3ce0ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsage {
+  /**
+   * Register reducer to be reduced in the next worker computation,
+   * using given name and operations.
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp);
+
+  /**
+   * Register reducer to be reduced in the next worker computation, using
+   * given name and operations, starting globally from globalInitialValue.
+   * (globalInitialValue is reduced only once, each worker will still start
+   * from neutral initial value)
+   *
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param globalInitialValue Global initial value
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReduce(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+  /**
+   * Get reduced value from previous worker computation.
+   * @param name Name of the reducer
+   * @return Reduced value
+   * @param <R> Reduced value type
+   */
+  <R extends Writable> R getReduced(String name);
+
+  /**
+   * Broadcast given value to all workers for next computation.
+   * @param name Name of the broadcast object
+   * @param value Value
+   */
+  void broadcast(String name, Writable value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
new file mode 100644
index 0000000..a675f4d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * ReduceOperation object when single object being reduced is of
+ * same type as reduced value.
+ *
+ * @param <R> Reduced object type.
+ */
+public abstract class OnSameReduceOperation<R extends Writable>
+    implements ReduceOperation<R, R> {
+  @Override
+  public final void reducePartial(R curValue, R valueToReduce) {
+    reduceSingle(curValue, valueToReduce);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
new file mode 100644
index 0000000..434e21a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reduce operations defining how to reduce single values
+ * passed on workers, into partial values on workers, and then
+ * into a single global reduced value.
+ *
+ * Object should be thread safe. Most frequently it should be
+ * immutable object, so that functions can execute concurrently.
+ * Rarely when object is mutable ({@link AggregatorReduceOperation}),
+ * i.e. stores reusable object inside, accesses should be synchronized.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public interface ReduceOperation<S, R extends Writable> extends Writable {
+  /**
+   * Return new reduced value which is neutral to reduce operation.
+   *
+   * @return Neutral value
+   */
+  R createInitialValue();
+  /**
+   * Add a new value.
+   * Needs to be commutative and associative
+   *
+   * @param curValue Partial value into which to reduce and store the result
+   * @param valueToReduce Single value to be reduced
+   */
+  void reduceSingle(R curValue, S valueToReduce);
+  /**
+   * Add partially reduced value to current partially reduced value.
+   *
+   * @param curValue Partial value into which to reduce and store the result
+   * @param valueToReduce Partial value to be reduced
+   */
+  void reducePartial(R curValue, R valueToReduce);
+}