You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:20 UTC

[37/47] Reduce/broadcast API

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
new file mode 100644
index 0000000..9f821b4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Object responsible for performing reducing operation.
+ * Simple wrapper of ReduceOperation object and current value holding
+ * partially reduced result.
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ */
+public class Reducer<S, R extends Writable> implements Writable {
+  /** Reduce operations */
+  private ReduceOperation<S, R> reduceOp;
+  /** Current (partially) reduced value*/
+  private R currentValue;
+
+  /**
+   * Constructor
+   */
+  public Reducer() {
+  }
+  /**
+   * Constructor
+   * @param reduceOp Reduce operations
+   */
+  public Reducer(ReduceOperation<S, R> reduceOp) {
+    this.reduceOp = reduceOp;
+    this.currentValue = reduceOp.createInitialValue();
+  }
+  /**
+   * Constructor
+   * @param reduceOp Reduce operations
+   * @param currentValue current reduced value
+   */
+  public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) {
+    this.reduceOp = reduceOp;
+    this.currentValue = currentValue;
+  }
+
+  /**
+   * Reduce given value into current reduced value.
+   * @param valueToReduce Single value to reduce
+   */
+  public void reduceSingle(S valueToReduce) {
+    reduceOp.reduceSingle(currentValue, valueToReduce);
+  }
+  /**
+   * Reduce given partially reduced value into current reduced value.
+   * @param valueToReduce Partial value to reduce
+   */
+  public void reducePartial(R valueToReduce) {
+    reduceOp.reducePartial(currentValue, valueToReduce);
+  }
+  /**
+   * Return new initial reduced value.
+   * @return New initial reduced value
+   */
+  public R createInitialValue() {
+    return reduceOp.createInitialValue();
+  }
+
+  public ReduceOperation<S, R> getReduceOp() {
+    return reduceOp;
+  }
+
+  public R getCurrentValue() {
+    return currentValue;
+  }
+
+  public void setCurrentValue(R currentValue) {
+    this.currentValue = currentValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeWritableObject(reduceOp, out);
+    currentValue.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reduceOp = WritableUtils.readWritableObject(in, null);
+    currentValue = reduceOp.createInitialValue();
+    currentValue.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
new file mode 100644
index 0000000..eeefdeb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph reducers.
+ */
+package org.apache.giraph.reducers;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index 5e046cc..3d654b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,8 +17,12 @@
  */
 package org.apache.giraph.utils;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -29,6 +33,7 @@ import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -57,11 +62,8 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 
 /**
  * Translate command line args into Configuration Key-Value pairs.
@@ -147,6 +149,10 @@ public final class ConfigurationUtils {
       ImmutableClassesGiraphConfiguration configuration) {
     if (configuration != null) {
       configuration.configureIfPossible(object);
+    } else if (object instanceof GiraphConfigurationSettable) {
+      throw new IllegalArgumentException(
+          "Trying to configure configurable object without value, " +
+          object.getClass());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3c5cbad..923d369 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -739,4 +739,38 @@ public class WritableUtils {
     }
   }
 
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param reusableOut Reusable output stream to serialize into
+   * @param reusableIn Reusable input stream to deserialize out of
+   * @param original Original value of which to make a copy
+   * @param <T> Type of the object
+   * @return Copy of the original value
+   */
+  public static <T extends Writable> T createCopy(
+      UnsafeByteArrayOutputStream reusableOut,
+      UnsafeReusableByteArrayInput reusableIn, T original) {
+    T copy = (T) createWritable(original.getClass(), null);
+
+    try {
+      reusableOut.reset();
+      original.write(reusableOut);
+      reusableIn.initialize(
+          reusableOut.getByteArray(), 0, reusableOut.getPos());
+      copy.readFields(reusableIn);
+
+      if (reusableIn.available() != 0) {
+        throw new RuntimeException("Serialization of " +
+            original.getClass() + " encountered issues, " +
+            reusableIn.available() + " bytes left to be read");
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "IOException occurred while trying to create a copy " +
+          original.getClass(), e);
+    }
+    return copy;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 120678f..f61e817 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -18,6 +18,27 @@
 
 package org.apache.giraph.worker;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import net.iharder.Base64;
+
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -66,10 +87,10 @@ import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -96,26 +117,6 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
@@ -162,10 +163,10 @@ public class BspServiceWorker<I extends WritableComparable,
   private final WorkerContext workerContext;
 
   /** Handler for aggregators */
-  private final WorkerAggregatorHandler aggregatorHandler;
+  private final WorkerAggregatorHandler globalCommHandler;
 
   /** Superstep output */
-  private SuperstepOutput<I, V, E> superstepOutput;
+  private final SuperstepOutput<I, V, E> superstepOutput;
 
   /** array of observers to call back to */
   private final WorkerObserver[] observers;
@@ -212,10 +213,10 @@ public class BspServiceWorker<I extends WritableComparable,
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
 
-    aggregatorHandler = new WorkerAggregatorHandler(this, conf, context);
+    globalCommHandler = new WorkerAggregatorHandler(this, conf, context);
 
     workerContext = conf.createWorkerContext();
-    workerContext.setWorkerAggregatorUsage(aggregatorHandler);
+    workerContext.setWorkerGlobalCommUsage(globalCommHandler);
 
     superstepOutput = conf.createSuperstepOutput(context);
 
@@ -584,7 +585,7 @@ public class BspServiceWorker<I extends WritableComparable,
 
     // Initialize aggregator at worker side during setup.
     // Do this just before vertex and edge loading.
-    aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 
     VertexEdgeCount vertexEdgeCount;
     long entriesLoaded;
@@ -895,7 +896,7 @@ public class BspServiceWorker<I extends WritableComparable,
       postSuperstepCallbacks();
     }
 
-    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+    globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
 
     MessageStore<I, Writable> incomingMessageStore =
         getServerData().getIncomingMessageStore();
@@ -1920,15 +1921,16 @@ else[HADOOP_NON_SECURE]*/
     return workerServer.getServerData();
   }
 
+
   @Override
   public WorkerAggregatorHandler getAggregatorHandler() {
-    return aggregatorHandler;
+    return globalCommHandler;
   }
 
   @Override
   public void prepareSuperstep() {
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+      globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 35ad94b..89f74b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.VertexEdgeCount;
@@ -37,8 +39,6 @@ import org.apache.log4j.Logger;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
-import java.io.IOException;
-
 /**
  * Load as many edge input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -62,7 +62,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       EdgeInputSplitsCallable.class);
 
   /** Aggregator handler */
-  private final WorkerThreadAggregatorUsage aggregatorUsage;
+  private final WorkerThreadGlobalCommUsage globalCommUsage;
   /** Bsp service worker (only use thread-safe methods) */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Edge input format */
@@ -105,7 +105,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     this.bspServiceWorker = bspServiceWorker;
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
     // Initialize aggregator usage.
-    this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
+    this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
       .newThreadAggregatorUsage();
     edgeInputFilter = configuration.getEdgeInputFilter();
     canEmbedInIds = bspServiceWorker
@@ -147,7 +147,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
 
     edgeReader.initialize(inputSplit, context);
     // Set aggregator usage to edge reader
-    edgeReader.setWorkerAggregatorUse(aggregatorUsage);
+    edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     long inputSplitEdgesLoaded = 0;
     long inputSplitEdgesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index a2279a9..f6dca25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -18,21 +18,21 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.MappingReader;
-import org.apache.giraph.mapping.MappingStore;
 import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.mapping.MappingStore;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Load as many mapping input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -89,11 +89,11 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
         mappingInputFormat.createMappingReader(inputSplit, context);
     mappingReader.setConf(configuration);
 
-    WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+    WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
         .getAggregatorHandler().newThreadAggregatorUsage();
 
     mappingReader.initialize(inputSplit, context);
-    mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+    mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     int entriesLoaded = 0;
     MappingStore<I, B> mappingStore =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 4c85765..00a2781 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.worker;
 
+import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
@@ -42,8 +44,6 @@ import org.apache.log4j.Logger;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
-import java.io.IOException;
-
 /**
  * Load as many vertex input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -79,7 +79,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * Whether the chosen {@link OutEdges} implementation allows for Edge
    * reuse.
    */
-  private boolean reuseEdgeObjects;
+  private final boolean reuseEdgeObjects;
   /** Used to translate Edges during vertex input phase based on localData */
   private final TranslateEdge<I, E> translateEdge;
 
@@ -152,13 +152,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.setConf(configuration);
 
-    WorkerThreadAggregatorUsage aggregatorUsage =
+    WorkerThreadGlobalCommUsage globalCommUsage =
       this.bspServiceWorker
         .getAggregatorHandler().newThreadAggregatorUsage();
 
     vertexReader.initialize(inputSplit, context);
     // Set aggregator usage to vertex reader
-    vertexReader.setWorkerAggregatorUse(aggregatorUsage);
+    vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
 
     long inputSplitVerticesLoaded = 0;
     long inputSplitVerticesFiltered = 0;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
new file mode 100644
index 0000000..5238a07
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Class for delegating WorkerAggregatorUsage and
+ * WorkerGlobalCommUsage methods to corresponding interface.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements WorkerAggregatorUsage, WorkerGlobalCommUsage {
+
+  /** Worker aggregator usage */
+  private WorkerGlobalCommUsage workerGlobalCommUsage;
+
+  /**
+   * Set worker global communication usage
+   *
+   * @param workerGlobalCommUsage Worker global communication usage
+   */
+  public void setWorkerGlobalCommUsage(
+      WorkerGlobalCommUsage workerGlobalCommUsage) {
+    this.workerGlobalCommUsage = workerGlobalCommUsage;
+  }
+
+  @Override
+  public final void reduce(String name, Object value) {
+    workerGlobalCommUsage.reduce(name, value);
+  }
+
+  @Override
+  public final <B extends Writable> B getBroadcast(String name) {
+    return workerGlobalCommUsage.getBroadcast(name);
+  }
+
+  @Override
+  public final <A extends Writable> void aggregate(String name, A value) {
+    reduce(name, value);
+  }
+
+  @Override
+  public final <A extends Writable> A getAggregatedValue(String name) {
+    return this.<A>getBroadcast(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 45ca665..05a13a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -15,22 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.worker;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.Factory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -38,35 +42,18 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-/**
- * Handler for aggregators on worker. Provides the aggregated values and
- * performs aggregations from user vertex code (thread-safe). Also has
- * methods for all superstep coordination related to aggregators.
- *
- * At the beginning of any superstep any worker calls prepareSuperstep(),
- * which blocks until the final aggregates from the previous superstep have
- * been delivered to the worker.
- * Next, during the superstep worker can call aggregate() and
- * getAggregatedValue() (both methods are thread safe) the former
- * computes partial aggregates for this superstep from the worker,
- * the latter returns (read-only) final aggregates from the previous superstep.
- * Finally, at the end of the superstep, the worker calls finishSuperstep(),
- * which propagates non-owned partial aggregates to the owner workers,
- * and sends the final aggregate from the owner worker to the master.
- */
-public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
+/** Handler for reduce/broadcast on the workers */
+public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(WorkerAggregatorHandler.class);
-  /** Map of values from previous superstep */
-  private final Map<String, Writable> previousAggregatedValueMap =
+  /** Map of broadcasted values */
+  private final Map<String, Writable> broadcastedMap =
       Maps.newHashMap();
-  /** Map of aggregator factories for current superstep */
-  private final Map<String, Factory<Aggregator<Writable>>>
-  currentAggregatorFactoryMap = Maps.newHashMap();
-  /** Map of aggregators for current superstep */
-  private final Map<String, Aggregator<Writable>> currentAggregatorMap =
+  /** Map of reducers currently being reduced */
+  private final Map<String, Reducer<Object, Writable>> reducerMap =
       Maps.newHashMap();
+
   /** Service worker */
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Progressable for reporting progress */
@@ -96,29 +83,48 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
-    if (aggregator != null) {
+  public <B extends Writable> B getBroadcast(String name) {
+    B value = (B) broadcastedMap.get(name);
+    if (value == null) {
+      LOG.warn("getBroadcast: " +
+          AggregatorUtils.getUnregisteredAggregatorMessage(name,
+              broadcastedMap.size() != 0, conf));
+    }
+    return value;
+  }
+
+  @Override
+  public void reduce(String name, Object value) {
+    Reducer<Object, Writable> reducer = reducerMap.get(name);
+    if (reducer != null) {
       progressable.progress();
-      synchronized (aggregator) {
-        aggregator.aggregate(value);
+      synchronized (reducer) {
+        reducer.reduceSingle(value);
       }
     } else {
-      throw new IllegalStateException("aggregate: " +
+      throw new IllegalStateException("reduce: " +
           AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              currentAggregatorMap.size() != 0, conf));
+              reducerMap.size() != 0, conf));
     }
   }
 
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    A value = (A) previousAggregatedValueMap.get(name);
-    if (value == null) {
-      LOG.warn("getAggregatedValue: " +
+  /**
+   * Combine partially reduced value into currently reduced value.
+   * @param name Name of the reducer
+   * @param valueToReduce Partial value to reduce
+   */
+  protected void reducePartial(String name, Writable valueToReduce) {
+    Reducer<Object, Writable> reducer = reducerMap.get(name);
+    if (reducer != null) {
+      progressable.progress();
+      synchronized (reducer) {
+        reducer.reducePartial(valueToReduce);
+      }
+    } else {
+      throw new IllegalStateException("reduce: " +
           AggregatorUtils.getUnregisteredAggregatorMessage(name,
-              previousAggregatedValueMap.size() != 0, conf));
+              reducerMap.size() != 0, conf));
     }
-    return value;
   }
 
   /**
@@ -128,53 +134,35 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    */
   public void prepareSuperstep(
       WorkerAggregatorRequestProcessor requestProcessor) {
+    broadcastedMap.clear();
+    reducerMap.clear();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Start preparing aggregators");
     }
-    AllAggregatorServerData allAggregatorData =
+    AllAggregatorServerData allGlobalCommData =
         serviceWorker.getServerData().getAllAggregatorData();
     // Wait for my aggregators
     Iterable<byte[]> dataToDistribute =
-        allAggregatorData.getDataFromMasterWhenReady(
+        allGlobalCommData.getDataFromMasterWhenReady(
             serviceWorker.getMasterInfo());
     try {
       // Distribute my aggregators
-      requestProcessor.distributeAggregators(dataToDistribute);
+      requestProcessor.distributeReducedValues(dataToDistribute);
     } catch (IOException e) {
       throw new IllegalStateException("prepareSuperstep: " +
           "IOException occurred while trying to distribute aggregators", e);
     }
     // Wait for all other aggregators and store them
-    allAggregatorData.fillNextSuperstepMapsWhenReady(
-        getOtherWorkerIdsSet(), previousAggregatedValueMap,
-        currentAggregatorFactoryMap);
-    fillAndInitAggregatorsMap(currentAggregatorMap);
-    allAggregatorData.reset();
+    allGlobalCommData.fillNextSuperstepMapsWhenReady(
+        getOtherWorkerIdsSet(), broadcastedMap,
+        reducerMap);
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Aggregators prepared");
     }
   }
 
   /**
-   * Fills aggregators map from currentAggregatorFactoryMap.
-   * All aggregators in this map will be set to initial value.
-   * @param aggregatorMap Map to fill.
-   */
-  private void fillAndInitAggregatorsMap(
-      Map<String, Aggregator<Writable>> aggregatorMap) {
-    for (Map.Entry<String, Factory<Aggregator<Writable>>> entry :
-        currentAggregatorFactoryMap.entrySet()) {
-      Aggregator<Writable> aggregator =
-          aggregatorMap.get(entry.getKey());
-      if (aggregator == null) {
-        aggregatorMap.put(entry.getKey(), entry.getValue().create());
-      } else {
-        aggregator.reset();
-      }
-    }
-  }
-
-  /**
    * Send aggregators to their owners and in the end to the master
    *
    * @param requestProcessor Request processor for aggregators
@@ -186,19 +174,19 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
           "workers will send their aggregated values " +
           "once they are done with superstep computation");
     }
-    OwnerAggregatorServerData ownerAggregatorData =
+    OwnerAggregatorServerData ownerGlobalCommData =
         serviceWorker.getServerData().getOwnerAggregatorData();
     // First send partial aggregated values to their owners and determine
     // which aggregators belong to this worker
-    for (Map.Entry<String, Aggregator<Writable>> entry :
-        currentAggregatorMap.entrySet()) {
+    for (Map.Entry<String, Reducer<Object, Writable>> entry :
+        reducerMap.entrySet()) {
       try {
-        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
-            entry.getValue().getAggregatedValue());
+        boolean sent = requestProcessor.sendReducedValue(entry.getKey(),
+            entry.getValue().getCurrentValue());
         if (!sent) {
           // If it's my aggregator, add it directly
-          ownerAggregatorData.aggregate(entry.getKey(),
-              entry.getValue().getAggregatedValue());
+          ownerGlobalCommData.reduce(entry.getKey(),
+              entry.getValue().getCurrentValue());
         }
       } catch (IOException e) {
         throw new IllegalStateException("finishSuperstep: " +
@@ -216,20 +204,21 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     }
 
     // Wait to receive partial aggregated values from all other workers
-    Iterable<Map.Entry<String, Writable>> myAggregators =
-        ownerAggregatorData.getMyAggregatorValuesWhenReady(
+    Iterable<Map.Entry<String, Writable>> myReducedValues =
+        ownerGlobalCommData.getMyReducedValuesWhenReady(
             getOtherWorkerIdsSet());
 
     // Send final aggregated values to master
-    AggregatedValueOutputStream aggregatorOutput =
-        new AggregatedValueOutputStream();
-    for (Map.Entry<String, Writable> entry : myAggregators) {
+    GlobalCommValueOutputStream globalOutput =
+        new GlobalCommValueOutputStream(false);
+    for (Map.Entry<String, Writable> entry : myReducedValues) {
       try {
-        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+        int currentSize = globalOutput.addValue(entry.getKey(),
+            GlobalCommType.REDUCED_VALUE,
             entry.getValue());
         if (currentSize > maxBytesPerAggregatorRequest) {
-          requestProcessor.sendAggregatedValuesToMaster(
-              aggregatorOutput.flush());
+          requestProcessor.sendReducedValuesToMaster(
+              globalOutput.flush());
         }
         progressable.progress();
       } catch (IOException e) {
@@ -239,7 +228,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
       }
     }
     try {
-      requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+      requestProcessor.sendReducedValuesToMaster(globalOutput.flush());
     } catch (IOException e) {
       throw new IllegalStateException("finishSuperstep: " +
           "IOException occured while sending aggregators to master", e);
@@ -247,7 +236,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     // Wait for master to receive aggregated values before proceeding
     serviceWorker.getWorkerClient().waitAllRequests();
 
-    ownerAggregatorData.reset();
+    ownerGlobalCommData.reset();
     if (LOG.isDebugEnabled()) {
       LOG.debug("finishSuperstep: Aggregators finished");
     }
@@ -259,9 +248,9 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
    *
    * @return New aggregator usage
    */
-  public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+  public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() {
     if (AggregatorUtils.useThreadLocalAggregators(conf)) {
-      return new ThreadLocalWorkerAggregatorUsage();
+      return new ThreadLocalWorkerGlobalCommUsage();
     } else {
       return this;
     }
@@ -290,56 +279,70 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   /**
-   * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
-   * We can use one instance of this object per thread to prevent
-   * synchronizing on each aggregate() call. In the end of superstep,
-   * values from each of these will be aggregated back to {@link
-   * WorkerAggregatorHandler}
-   */
-  public class ThreadLocalWorkerAggregatorUsage
-      implements WorkerThreadAggregatorUsage {
-    /** Thread-local aggregator map */
-    private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+  * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}.
+  * We can use one instance of this object per thread to prevent
+  * synchronizing on each aggregate() call. In the end of superstep,
+  * values from each of these will be aggregated back to {@link
+  * WorkerThreadGlobalCommUsage}
+  */
+  public class ThreadLocalWorkerGlobalCommUsage
+    implements WorkerThreadGlobalCommUsage {
+    /** Thread-local reducer map */
+    private final Map<String, Reducer<Object, Writable>> threadReducerMap;
 
     /**
-     * Constructor
-     *
-     * Creates new instances of all aggregators from
-     * {@link WorkerAggregatorHandler}
-     */
-    public ThreadLocalWorkerAggregatorUsage() {
-      threadAggregatorMap = Maps.newHashMapWithExpectedSize(
-          WorkerAggregatorHandler.this.currentAggregatorMap.size());
-      fillAndInitAggregatorsMap(threadAggregatorMap);
+    * Constructor
+    *
+    * Creates new instances of all reducers from
+    * {@link WorkerAggregatorHandler}
+    */
+    public ThreadLocalWorkerGlobalCommUsage() {
+      threadReducerMap = Maps.newHashMapWithExpectedSize(
+          WorkerAggregatorHandler.this.reducerMap.size());
+
+      UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+      UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput();
+
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          reducerMap.entrySet()) {
+        ReduceOperation<Object, Writable> globalReduceOp =
+            entry.getValue().getReduceOp();
+
+        ReduceOperation<Object, Writable> threadLocalCopy =
+            WritableUtils.createCopy(out, in, globalReduceOp);
+
+        threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy));
+      }
     }
 
     @Override
-    public <A extends Writable> void aggregate(String name, A value) {
-      Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
-      if (aggregator != null) {
+    public void reduce(String name, Object value) {
+      Reducer<Object, Writable> reducer = threadReducerMap.get(name);
+      if (reducer != null) {
         progressable.progress();
-        aggregator.aggregate(value);
+        reducer.reduceSingle(value);
       } else {
-        throw new IllegalStateException("aggregate: " +
+        throw new IllegalStateException("reduce: " +
             AggregatorUtils.getUnregisteredAggregatorMessage(name,
-                threadAggregatorMap.size() != 0, conf));
+                threadReducerMap.size() != 0, conf));
       }
     }
 
     @Override
-    public <A extends Writable> A getAggregatedValue(String name) {
-      return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+    public <B extends Writable> B getBroadcast(String name) {
+      return WorkerAggregatorHandler.this.getBroadcast(name);
     }
 
     @Override
     public void finishThreadComputation() {
       // Aggregate the values this thread's vertices provided back to
       // WorkerAggregatorHandler
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-          threadAggregatorMap.entrySet()) {
-        WorkerAggregatorHandler.this.aggregate(entry.getKey(),
-            entry.getValue().getAggregatedValue());
+      for (Entry<String, Reducer<Object, Writable>> entry :
+          threadReducerMap.entrySet()) {
+        WorkerAggregatorHandler.this.reducePartial(entry.getKey(),
+            entry.getValue().getCurrentValue());
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 7a55d56..b977ea1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -18,30 +18,28 @@
 
 package org.apache.giraph.worker;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
 /**
  * WorkerContext allows for the execution of user code
  * on a per-worker basis. There's one WorkerContext per worker.
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext
-  extends DefaultImmutableClassesGiraphConfigurable
-  implements WorkerAggregatorUsage, Writable {
+  extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
+  implements Writable {
   /** Global graph state */
   private GraphState graphState;
-  /** Worker aggregator usage */
-  private WorkerAggregatorUsage workerAggregatorUsage;
 
   /** Service worker */
   private CentralizedServiceWorker serviceWorker;
@@ -71,16 +69,6 @@ public abstract class WorkerContext
   }
 
   /**
-   * Set worker aggregator usage
-   *
-   * @param workerAggregatorUsage Worker aggregator usage
-   */
-  public void setWorkerAggregatorUsage(
-      WorkerAggregatorUsage workerAggregatorUsage) {
-    this.workerAggregatorUsage = workerAggregatorUsage;
-  }
-
-  /**
    * Initialize the WorkerContext.
    * This method is executed once on each Worker before the first
    * superstep starts.
@@ -196,16 +184,6 @@ public abstract class WorkerContext
     return graphState.getContext();
   }
 
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    workerAggregatorUsage.aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return workerAggregatorUsage.<A>getAggregatedValue(name);
-  }
-
   /**
    * Call this to log a line to command line of the job. Use in moderation -
    * it's a synchronous call to Job client

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
new file mode 100644
index 0000000..39566f5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Methods on worker can access broadcasted values and provide
+ * values to reduce through this interface
+ */
+public interface WorkerGlobalCommUsage {
+  /**
+   * Reduce given value.
+   * @param name Name of the reducer
+   * @param value Single value to reduce
+   */
+  void reduce(String name, Object value);
+  /**
+   * Get value broadcasted from master
+   * @param name Name of the broadcasted value
+   * @return Broadcasted value
+   * @param <B> Broadcast value type
+   */
+  <B extends Writable> B getBroadcast(String name);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
deleted file mode 100644
index 194127e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-/**
- * {@link WorkerAggregatorUsage} which can be used in each of the
- * computation threads.
- */
-public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
-  /**
-   * Call this after thread's computation is finished,
-   * i.e. when all vertices have provided their values to aggregators
-   */
-  void finishThreadComputation();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
new file mode 100644
index 0000000..8edbdc7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadGlobalCommUsage extends WorkerGlobalCommUsage {
+  /**
+   * Call this after thread's computation is finished,
+   * i.e. when all vertices have provided their values to aggregators
+   */
+  void finishThreadComputation();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 488e1ea..26459c0 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -18,6 +18,24 @@
 
 package org.apache.giraph;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.SimpleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
@@ -62,24 +80,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /**
  * Unit test for many simple BSP applications.
  */
@@ -456,8 +456,8 @@ public class
           assertEquals(maxSuperstep + 2, maxValues.size());
           assertEquals(maxSuperstep + 2, vertexCounts.size());
 
-          assertEquals(maxPageRank, (double) maxValues.get(maxSuperstep), 0d);
-          assertEquals(minPageRank, (double) minValues.get(maxSuperstep), 0d);
+          assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d);
+          assertEquals(minPageRank, minValues.get(maxSuperstep), 0d);
           assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
 
         } finally {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index eb3f686..602ab32 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -18,34 +18,19 @@
 
 package org.apache.giraph.aggregators;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 import org.apache.giraph.BspCase;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.examples.AggregatorsTestComputation;
 import org.apache.giraph.examples.SimpleCheckpoint;
 import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
 import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 /** Tests if aggregators are handled on a proper way */
 public class TestAggregatorsHandling extends BspCase {
@@ -54,21 +39,6 @@ public class TestAggregatorsHandling extends BspCase {
     super(TestAggregatorsHandling.class.getName());
   }
 
-  private Map<String, AggregatorWrapper<Writable>> getAggregatorMap
-      (MasterAggregatorHandler aggregatorHandler) {
-    try {
-      Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField
-          ("aggregatorMap");
-      aggregtorMapField.setAccessible(true);
-      return (Map<String, AggregatorWrapper<Writable>>)
-          aggregtorMapField.get(aggregatorHandler);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException(e);
-    } catch (NoSuchFieldException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
   /** Tests if aggregators are handled on a proper way during supersteps */
   @Test
   public void testAggregatorsHandling() throws IOException,
@@ -88,64 +58,6 @@ public class TestAggregatorsHandling extends BspCase {
     assertTrue(job.run(true));
   }
 
-  /** Test if aggregators serialization captures everything */
-  @Test
-  public void testMasterAggregatorsSerialization() throws
-      IllegalAccessException, InstantiationException, IOException {
-    ImmutableClassesGiraphConfiguration conf =
-        Mockito.mock(ImmutableClassesGiraphConfiguration.class);
-    Mockito.when(conf.getAggregatorWriterClass()).thenReturn(
-        TextAggregatorWriter.class);
-    Progressable progressable = Mockito.mock(Progressable.class);
-    MasterAggregatorHandler handler =
-        new MasterAggregatorHandler(conf, progressable);
-
-    String regularAggName = "regular";
-    LongWritable regularValue = new LongWritable(5);
-    handler.registerAggregator(regularAggName, LongSumAggregator.class);
-    handler.setAggregatedValue(regularAggName, regularValue);
-
-    String persistentAggName = "persistent";
-    DoubleWritable persistentValue = new DoubleWritable(10.5);
-    handler.registerPersistentAggregator(persistentAggName,
-        DoubleOverwriteAggregator.class);
-    handler.setAggregatedValue(persistentAggName, persistentValue);
-
-    for (AggregatorWrapper<Writable> aggregator :
-        getAggregatorMap(handler).values()) {
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-    }
-
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.write(new DataOutputStream(out));
-
-    MasterAggregatorHandler restartedHandler =
-        new MasterAggregatorHandler(conf, progressable);
-    restartedHandler.readFields(
-        new DataInputStream(new ByteArrayInputStream(out.toByteArray())));
-
-    assertEquals(2, getAggregatorMap(restartedHandler).size());
-
-    AggregatorWrapper<Writable> regularAgg =
-        getAggregatorMap(restartedHandler).get(regularAggName);
-    assertTrue(regularAgg.getAggregatorFactory().create().getClass().equals(
-        LongSumAggregator.class));
-    assertEquals(regularValue, regularAgg.getPreviousAggregatedValue());
-    assertEquals(regularValue,
-        restartedHandler.<LongWritable>getAggregatedValue(regularAggName));
-    assertFalse(regularAgg.isPersistent());
-
-    AggregatorWrapper<Writable> persistentAgg =
-        getAggregatorMap(restartedHandler).get(persistentAggName);
-    assertTrue(persistentAgg.getAggregatorFactory().create().getClass().equals
-        (DoubleOverwriteAggregator.class));
-    assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue());
-    assertEquals(persistentValue,
-        restartedHandler.<LongWritable>getAggregatedValue(persistentAggName));
-    assertTrue(persistentAgg.isPersistent());
-  }
-
   /**
    * Test if aggregators are are handled properly when restarting from a
    * checkpoint