You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/12/01 23:24:20 UTC

git commit: updated refs/heads/trunk to fda1bb3

Repository: giraph
Updated Branches:
  refs/heads/trunk bad44d472 -> fda1bb382


Improving and adding reducers

Summary:
- adding common reducers - Max/Min/Sum Reducer. As you can see - no code duplication
- adding NumericTypeOps functions to support the above
- added Varint encoding class
- renaming registerReduce -> registerReducer

Test Plan: mvn install

Reviewers: pavanka, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D28983


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fda1bb38
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fda1bb38
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fda1bb38

Branch: refs/heads/trunk
Commit: fda1bb3820ac574ed5d59cb779171e313798336d
Parents: bad44d4
Author: Igor Kabiljo <ik...@fb.com>
Authored: Mon Dec 1 14:21:12 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Dec 1 14:24:10 2014 -0800

----------------------------------------------------------------------
 .../matrix/dense/FloatDenseMatrix.java          |   8 +-
 .../matrix/dense/IntDenseMatrix.java            |   8 +-
 .../matrix/sparse/DoubleSparseVector.java       |  14 +-
 .../matrix/sparse/FloatSparseMatrix.java        |   8 +-
 .../matrix/sparse/IntSparseMatrix.java          |   8 +-
 .../matrix/sparse/LongSparseVector.java         |  14 +-
 .../giraph/benchmark/ReducersBenchmark.java     |   8 +-
 .../AggregatorToGlobalCommTranslation.java      |   4 +-
 .../giraph/master/MasterAggregatorHandler.java  |   6 +-
 .../org/apache/giraph/master/MasterCompute.java |   8 +-
 .../giraph/master/MasterGlobalCommUsage.java    |   4 +-
 .../apache/giraph/reducers/impl/MaxReduce.java  |  83 ++++++++
 .../apache/giraph/reducers/impl/MinReduce.java  |  83 ++++++++
 .../apache/giraph/reducers/impl/SumReduce.java  |  81 ++++++++
 .../giraph/reducers/impl/package-info.java      |  21 ++
 .../apache/giraph/types/ops/DoubleTypeOps.java  |  34 +++-
 .../apache/giraph/types/ops/FloatTypeOps.java   |  33 +++-
 .../org/apache/giraph/types/ops/IntTypeOps.java |  33 +++-
 .../apache/giraph/types/ops/LongTypeOps.java    |  33 +++-
 .../apache/giraph/types/ops/NumericTypeOps.java |  67 +++++++
 .../org/apache/giraph/types/ops/TypeOps.java    |   3 +
 .../apache/giraph/types/ops/TypeOpsUtils.java   |  27 +++
 .../types/ops/collections/BasicArrayList.java   |  41 ++++
 .../java/org/apache/giraph/utils/Varint.java    | 198 +++++++++++++++++++
 .../org/apache/giraph/utils/WritableUtils.java  |  39 ++++
 .../worker/WorkerAggregatorDelegator.java       |   5 +
 .../giraph/worker/WorkerAggregatorHandler.java  |  16 +-
 .../apache/giraph/worker/WorkerReduceUsage.java |   9 +
 28 files changed, 854 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
index ce75d6d..588a92a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java
@@ -26,9 +26,9 @@ import java.util.ArrayList;
  */
 public class FloatDenseMatrix {
   /** The number of rows in the matrix */
-  private int numRows;
+  private final int numRows;
   /** The number of columns in the matrix */
-  private int numColumns;
+  private final int numColumns;
   /** The rows of the matrix */
   private ArrayList<FloatDenseVector> rows = null;
 
@@ -109,7 +109,7 @@ public class FloatDenseMatrix {
    * @param i the row number
    * @return the row of the matrix
    */
-  FloatDenseVector getRow(int i) {
+  public FloatDenseVector getRow(int i) {
     return rows.get(i);
   }
 
@@ -118,7 +118,7 @@ public class FloatDenseMatrix {
    *
    * @param vec the vector to add
    */
-  void addRow(FloatDenseVector vec) {
+  public void addRow(FloatDenseVector vec) {
     if (rows.size() >= numRows) {
       throw new RuntimeException("Cannot add more rows!");
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
index ed85574..9ec5439 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java
@@ -26,9 +26,9 @@ import java.util.ArrayList;
  */
 public class IntDenseMatrix {
   /** The number of rows in the matrix */
-  private int numRows;
+  private final int numRows;
   /** The number of columns in the matrix */
-  private int numColumns;
+  private final int numColumns;
   /** The rows of the matrix */
   private ArrayList<IntDenseVector> rows = null;
 
@@ -109,7 +109,7 @@ public class IntDenseMatrix {
    * @param i the row number
    * @return the row of the matrix
    */
-  IntDenseVector getRow(int i) {
+  public IntDenseVector getRow(int i) {
     return rows.get(i);
   }
 
@@ -118,7 +118,7 @@ public class IntDenseMatrix {
    *
    * @param vec the vector to add
    */
-  void addRow(IntDenseVector vec) {
+  public void addRow(IntDenseVector vec) {
     if (rows.size() >= numRows) {
       throw new RuntimeException("Cannot add more rows!");
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
index fb54459..7abf0e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java
@@ -19,14 +19,13 @@
 package org.apache.giraph.aggregators.matrix.sparse;
 
 import it.unimi.dsi.fastutil.ints.Int2DoubleMap;
+import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -85,6 +84,15 @@ public class DoubleSparseVector implements Writable {
   }
 
   /**
+   * Increment value for a given key
+   * @param key Key
+   * @param value Increment
+   */
+  public void add(int key, double value) {
+    entries.addTo(key, value);
+  }
+
+  /**
    * Clear the contents of the vector.
    */
   public void clear() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
index a54ae31..e021555 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java
@@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
  */
 public class FloatSparseMatrix {
   /** The number of rows in the matrix */
-  private int numRows;
+  private final int numRows;
   /** The rows of the matrix */
-  private Int2ObjectOpenHashMap<FloatSparseVector> rows;
+  private final Int2ObjectOpenHashMap<FloatSparseVector> rows;
 
   /**
    * Create a new matrix with the given number of rows.
@@ -88,7 +88,7 @@ public class FloatSparseMatrix {
    * @param i the row number
    * @return the row of the matrix
    */
-  FloatSparseVector getRow(int i) {
+  public FloatSparseVector getRow(int i) {
     return rows.get(i);
   }
 
@@ -98,7 +98,7 @@ public class FloatSparseMatrix {
    * @param i the row
    * @param vec the vector to set as the row
    */
-  void setRow(int i, FloatSparseVector vec) {
+  public void setRow(int i, FloatSparseVector vec) {
     rows.put(i, vec);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
index b7cde77..29e8482 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java
@@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
  */
 public class IntSparseMatrix {
   /** The number of rows in the matrix */
-  private int numRows;
+  private final int numRows;
   /** The rows of the matrix */
-  private Int2ObjectOpenHashMap<IntSparseVector> rows;
+  private final Int2ObjectOpenHashMap<IntSparseVector> rows;
 
   /**
    * Create a new matrix with the given number of rows.
@@ -88,7 +88,7 @@ public class IntSparseMatrix {
    * @param i the row number
    * @return the row of the matrix
    */
-  IntSparseVector getRow(int i) {
+  public IntSparseVector getRow(int i) {
     return rows.get(i);
   }
 
@@ -98,7 +98,7 @@ public class IntSparseMatrix {
    * @param i the row
    * @param vec the vector to set as the row
    */
-  void setRow(int i, IntSparseVector vec) {
+  public void setRow(int i, IntSparseVector vec) {
     rows.put(i, vec);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
index 6337215..6a16525 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java
@@ -19,14 +19,13 @@
 package org.apache.giraph.aggregators.matrix.sparse;
 
 import it.unimi.dsi.fastutil.ints.Int2LongMap;
+import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -85,6 +84,15 @@ public class LongSparseVector implements Writable {
   }
 
   /**
+   * Increment value for a given key
+   * @param key Key
+   * @param value Increment
+   */
+  public void add(int key, long value) {
+    entries.addTo(key, value);
+  }
+
+  /**
    * Clear the contents of the vector.
    */
   public void clear() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
index 263274d..16c33e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -126,15 +126,15 @@ public class ReducersBenchmark extends GiraphBenchmark {
         String mi = "m" + i;
         String pi = "p" + i;
 
-        registerReduce(wi, TestLongSumReducer.INSTANCE);
-        registerReduce(mi, new TestLongSumReducer());
+        registerReducer(wi, TestLongSumReducer.INSTANCE);
+        registerReducer(mi, new TestLongSumReducer());
 
         if (superstep > 0) {
           broadcast(wi, getReduced(wi));
           broadcast(mi, new LongWritable(-superstep * i));
           broadcast(pi, getReduced(pi));
 
-          registerReduce(pi, new TestLongSumReducer(),
+          registerReducer(pi, new TestLongSumReducer(),
               (LongWritable) getReduced(pi));
 
           assertEquals(superstep * (getTotalNumVertices() * i) + w,
@@ -142,7 +142,7 @@ public class ReducersBenchmark extends GiraphBenchmark {
           assertEquals(superstep * getTotalNumVertices() * i,
               ((LongWritable) getReduced(pi)).get());
         } else {
-          registerReduce(pi, new TestLongSumReducer());
+          registerReducer(pi, new TestLongSumReducer());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
index eb25182..fa3f376 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java
@@ -135,10 +135,10 @@ public class AggregatorToGlobalCommTranslation
       AggregatorReduceOperation<Writable> cleanReduceOp =
           entry.getValue().createReduceOp();
       if (entry.getValue().isPersistent()) {
-        globalComm.registerReduce(
+        globalComm.registerReducer(
             entry.getKey(), cleanReduceOp, value);
       } else {
-        globalComm.registerReduce(
+        globalComm.registerReducer(
             entry.getKey(), cleanReduceOp);
       }
       entry.getValue().setCurrentValue(null);

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index ccee656..98de9d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -80,13 +80,13 @@ public class MasterAggregatorHandler
   }
 
   @Override
-  public final <S, R extends Writable> void registerReduce(
+  public final <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp) {
-    registerReduce(name, reduceOp, reduceOp.createInitialValue());
+    registerReducer(name, reduceOp, reduceOp.createInitialValue());
   }
 
   @Override
-  public <S, R extends Writable> void registerReduce(
+  public <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp,
       R globalInitialValue) {
     if (reducerMap.containsKey(name)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 68eb416..eb4144a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -190,15 +190,15 @@ public abstract class MasterCompute
   }
 
   @Override
-  public final <S, R extends Writable> void registerReduce(
+  public final <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp) {
-    serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp);
+    serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
   }
 
   @Override
-  public final <S, R extends Writable> void registerReduce(
+  public final <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
-    serviceMaster.getGlobalCommHandler().registerReduce(
+    serviceMaster.getGlobalCommHandler().registerReducer(
         name, reduceOp, globalInitialValue);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index c3ce0ea..7ee9048 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -33,7 +33,7 @@ public interface MasterGlobalCommUsage {
    * @param <S> Single value type
    * @param <R> Reduced value type
    */
-  <S, R extends Writable> void registerReduce(
+  <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp);
 
   /**
@@ -48,7 +48,7 @@ public interface MasterGlobalCommUsage {
    * @param <S> Single value type
    * @param <R> Reduced value type
    */
-  <S, R extends Writable> void registerReduce(
+  <S, R extends Writable> void registerReducer(
       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
new file mode 100644
index 0000000..9d603a1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Reducer for calculating max of values
+ * @param <T> Value type
+ */
+public class MaxReduce<T extends WritableComparable>
+    extends OnSameReduceOperation<T> {
+  /** DoubleWritable specialization */
+  public static final MaxReduce<DoubleWritable> DOUBLE =
+      new MaxReduce<>(DoubleTypeOps.INSTANCE);
+  /** LongWritable specialization */
+  public static final MaxReduce<LongWritable> LONG =
+      new MaxReduce<>(LongTypeOps.INSTANCE);
+
+  /** Value type operations */
+  private NumericTypeOps<T> typeOps;
+
+  /** Constructor used for deserialization only */
+  public MaxReduce() {
+  }
+
+  /**
+   * Constructor
+   * @param typeOps Value type operations
+   */
+  public MaxReduce(NumericTypeOps<T> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public T createInitialValue() {
+    return typeOps.createMinNegativeValue();
+  }
+
+  @Override
+  public T reduceSingle(T curValue, T valueToReduce) {
+    if (curValue.compareTo(valueToReduce) < 0) {
+      typeOps.set(curValue, valueToReduce);
+    }
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TypeOpsUtils.writeTypeOps(typeOps, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    typeOps = TypeOpsUtils.readTypeOps(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
new file mode 100644
index 0000000..9972340
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Reducer for calculating min of values
+ * @param <T> Value type
+ */
+public class MinReduce<T extends WritableComparable>
+    extends OnSameReduceOperation<T> {
+  /** DoubleWritable specialization */
+  public static final MinReduce<DoubleWritable> DOUBLE =
+      new MinReduce<>(DoubleTypeOps.INSTANCE);
+  /** LongWritable specialization */
+  public static final MinReduce<LongWritable> LONG =
+      new MinReduce<>(LongTypeOps.INSTANCE);
+
+  /** Value type operations */
+  private NumericTypeOps<T> typeOps;
+
+  /** Constructor used for deserialization only */
+  public MinReduce() {
+  }
+
+  /**
+   * Constructor
+   * @param typeOps Value type operations
+   */
+  public MinReduce(NumericTypeOps<T> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public T createInitialValue() {
+    return typeOps.createMaxPositiveValue();
+  }
+
+  @Override
+  public T reduceSingle(T curValue, T valueToReduce) {
+    if (curValue.compareTo(valueToReduce) > 0) {
+      typeOps.set(curValue, valueToReduce);
+    }
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TypeOpsUtils.writeTypeOps(typeOps, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    typeOps = TypeOpsUtils.readTypeOps(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
new file mode 100644
index 0000000..3138733
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.types.ops.DoubleTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reducer for calculating sum of values
+ * @param <T> Value type
+ */
+public class SumReduce<T extends Writable>
+    extends OnSameReduceOperation<T> {
+  /** DoubleWritable specialization */
+  public static final SumReduce<DoubleWritable> DOUBLE =
+      new SumReduce<>(DoubleTypeOps.INSTANCE);
+  /** LongWritable specialization */
+  public static final SumReduce<LongWritable> LONG =
+      new SumReduce<>(LongTypeOps.INSTANCE);
+
+  /** Value type operations */
+  private NumericTypeOps<T> typeOps;
+
+  /** Constructor used for deserialization only */
+  public SumReduce() {
+  }
+
+  /**
+   * Constructor
+   * @param typeOps Value type operations
+   */
+  public SumReduce(NumericTypeOps<T> typeOps) {
+    this.typeOps = typeOps;
+  }
+
+  @Override
+  public T createInitialValue() {
+    return typeOps.createZero();
+  }
+
+  @Override
+  public T reduceSingle(T curValue, T valueToReduce) {
+    typeOps.plusInto(curValue, valueToReduce);
+    return curValue;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TypeOpsUtils.writeTypeOps(typeOps, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    typeOps = TypeOpsUtils.readTypeOps(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
new file mode 100644
index 0000000..ba61ce8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph reducers.
+ */
+package org.apache.giraph.reducers.impl;

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
index af8c38f..1ca7796 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java
@@ -21,7 +21,9 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicDoubleArrayLi
 import org.apache.hadoop.io.DoubleWritable;
 
 /** TypeOps implementation for working with DoubleWritable type */
-public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> {
+public enum DoubleTypeOps
+    implements PrimitiveTypeOps<DoubleWritable>,
+    NumericTypeOps<DoubleWritable> {
   /** Singleton instance */
   INSTANCE();
 
@@ -49,4 +51,34 @@ public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> {
   public BasicDoubleArrayList createArrayList(int capacity) {
     return new BasicDoubleArrayList(capacity);
   }
+
+  @Override
+  public DoubleWritable createMinNegativeValue() {
+    return new DoubleWritable(Double.NEGATIVE_INFINITY);
+  }
+
+  @Override
+  public DoubleWritable createMaxPositiveValue() {
+    return new DoubleWritable(Double.POSITIVE_INFINITY);
+  }
+
+  @Override
+  public DoubleWritable createZero() {
+    return new DoubleWritable(0);
+  }
+
+  @Override
+  public void plusInto(DoubleWritable value, DoubleWritable increment) {
+    value.set(value.get() + increment.get());
+  }
+
+  @Override
+  public void multiplyInto(DoubleWritable value, DoubleWritable multiplier) {
+    value.set(value.get() * multiplier.get());
+  }
+
+  @Override
+  public void negate(DoubleWritable value) {
+    value.set(-value.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
index 3ca8409..3c69868 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java
@@ -21,7 +21,8 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicFloatArrayLis
 import org.apache.hadoop.io.FloatWritable;
 
 /** TypeOps implementation for working with FloatWritable type */
-public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> {
+public enum FloatTypeOps
+    implements PrimitiveTypeOps<FloatWritable>, NumericTypeOps<FloatWritable> {
   /** Singleton instance */
   INSTANCE();
 
@@ -49,4 +50,34 @@ public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> {
   public BasicFloatArrayList createArrayList(int capacity) {
     return new BasicFloatArrayList(capacity);
   }
+
+  @Override
+  public FloatWritable createMinNegativeValue() {
+    return new FloatWritable(Float.NEGATIVE_INFINITY);
+  }
+
+  @Override
+  public FloatWritable createMaxPositiveValue() {
+    return new FloatWritable(Float.POSITIVE_INFINITY);
+  }
+
+  @Override
+  public FloatWritable createZero() {
+    return new FloatWritable(0);
+  }
+
+  @Override
+  public void plusInto(FloatWritable value, FloatWritable increment) {
+    value.set(value.get() + increment.get());
+  }
+
+  @Override
+  public void multiplyInto(FloatWritable value, FloatWritable multiplier) {
+    value.set(value.get() * multiplier.get());
+  }
+
+  @Override
+  public void negate(FloatWritable value) {
+    value.set(-value.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
index f9a32c0..57e1b53 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet;
 import org.apache.hadoop.io.IntWritable;
 
 /** TypeOps implementation for working with IntWritable type */
-public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> {
+public enum IntTypeOps
+    implements PrimitiveIdTypeOps<IntWritable>, NumericTypeOps<IntWritable> {
   /** Singleton instance */
   INSTANCE;
 
@@ -65,4 +66,34 @@ public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> {
       int capacity) {
     return new BasicInt2ObjectOpenHashMap<>(capacity);
   }
+
+  @Override
+  public IntWritable createMinNegativeValue() {
+    return new IntWritable(Integer.MIN_VALUE);
+  }
+
+  @Override
+  public IntWritable createMaxPositiveValue() {
+    return new IntWritable(Integer.MAX_VALUE);
+  }
+
+  @Override
+  public IntWritable createZero() {
+    return new IntWritable(0);
+  }
+
+  @Override
+  public void plusInto(IntWritable value, IntWritable increment) {
+    value.set(value.get() + increment.get());
+  }
+
+  @Override
+  public void multiplyInto(IntWritable value, IntWritable multiplier) {
+    value.set(value.get() * multiplier.get());
+  }
+
+  @Override
+  public void negate(IntWritable value) {
+    value.set(-value.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
index 4e5ca54..d7fa198 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet;
 import org.apache.hadoop.io.LongWritable;
 
 /** TypeOps implementation for working with LongWritable type */
-public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> {
+public enum LongTypeOps
+    implements PrimitiveIdTypeOps<LongWritable>, NumericTypeOps<LongWritable> {
   /** Singleton instance */
   INSTANCE;
 
@@ -65,4 +66,34 @@ public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> {
       int capacity) {
     return new BasicLong2ObjectOpenHashMap<>(capacity);
   }
+
+  @Override
+  public LongWritable createMinNegativeValue() {
+    return new LongWritable(Long.MIN_VALUE);
+  }
+
+  @Override
+  public LongWritable createMaxPositiveValue() {
+    return new LongWritable(Long.MAX_VALUE);
+  }
+
+  @Override
+  public LongWritable createZero() {
+    return new LongWritable(0);
+  }
+
+  @Override
+  public void plusInto(LongWritable value, LongWritable increment) {
+    value.set(value.get() + increment.get());
+  }
+
+  @Override
+  public void multiplyInto(LongWritable value, LongWritable multiplier) {
+    value.set(value.get() * multiplier.get());
+  }
+
+  @Override
+  public void negate(LongWritable value) {
+    value.set(-value.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
new file mode 100644
index 0000000..396c914
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops;
+
+/**
+ * Numeric type operations, allowing working generically with types,
+ * but still having efficient code.
+ *
+ * Using any of the provided operations should lead to no boxing/unboxing.
+ *
+ * @param <T> Type
+ */
+public interface NumericTypeOps<T> extends TypeOps<T> {
+  /**
+   * Minimal negative value representable via current type.
+   * Negative infinity for floating point numbers.
+   * @return New object with min negative value
+   */
+  T createMinNegativeValue();
+  /**
+   * Maximal positive value representable via current type.
+   * Positive infinity for floating point numbers.
+   * @return New object with max positive value
+   */
+  T createMaxPositiveValue();
+  /**
+   * Value of zero
+   * @return New object with value of zero
+   */
+  T createZero();
+
+  /**
+   * value+=adder
+   *
+   * @param value Value to modify
+   * @param increment Increment
+   */
+  void plusInto(T value, T increment);
+  /**
+   * value*=multiplier
+   *
+   * @param value Value to modify
+   * @param multiplier Multiplier
+   */
+  void multiplyInto(T value, T multiplier);
+
+  /**
+   * -value
+   * @param value Value to negate
+   */
+  void negate(T value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
index b7f9479..c4bd702 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java
@@ -23,6 +23,9 @@ package org.apache.giraph.types.ops;
  * but still having efficient code.
  * For example, by reducing object allocation via reuse.
  *
+ * Use enum singleton pattern, having single enum value - INSTANCE.
+ * Serialization code depends on implementations being enums.
+ *
  * @param <T> Type
  */
 public interface TypeOps<T> {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
index df5f2bd..785fda1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java
@@ -17,6 +17,11 @@
  */
 package org.apache.giraph.types.ops;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.DoubleWritable;
@@ -146,4 +151,26 @@ public class TypeOpsUtils {
           type + " not supported in TypeOps");
     }
   }
+
+  /**
+   * Write TypeOps object into a stream
+   * @param typeOps type ops instance
+   * @param output output stream
+   * @param <T> Corresponding type
+   */
+  public static <T> void writeTypeOps(TypeOps<T> typeOps,
+      DataOutput output) throws IOException {
+    WritableUtils.writeEnum((Enum) typeOps, output);
+  }
+
+  /**
+   * Read TypeOps object from the stream
+   * @param input input stream
+   * @param <O> Concrete TypeOps type
+   * @return type ops instance
+   */
+  public static <O extends TypeOps<?>> O readTypeOps(
+      DataInput input) throws IOException {
+    return  (O) WritableUtils.readEnum(input);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
index df5ca24..a96fb69 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java
@@ -58,6 +58,17 @@ public abstract class BasicArrayList<T> implements Writable {
    */
   public abstract int size();
   /**
+   * Sets the size of this list.
+   *
+   * <P>
+   * If the specified size is smaller than the current size,
+   * the last elements are discarded.
+   * Otherwise, they are filled with 0/<code>null</code>/<code>false</code>.
+   *
+   * @param newSize the new size.
+   */
+  public abstract void size(int newSize);
+  /**
    * Capacity of currently allocated memory
    * @return capacity
    */
@@ -168,6 +179,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }
@@ -250,6 +266,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }
@@ -332,6 +353,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }
@@ -414,6 +440,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }
@@ -496,6 +527,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }
@@ -578,6 +614,11 @@ public abstract class BasicArrayList<T> implements Writable {
     }
 
     @Override
+    public void size(int newSize) {
+      list.size(newSize);
+    }
+
+    @Override
     public int capacity() {
       return list.elements().length;
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
new file mode 100644
index 0000000..89d4e90
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+/**
+ * This Code is Copied from main/java/org/apache/mahout/math/Varint.java
+ *
+ * Only modification is throwing exceptions for passing negative values to
+ * unsigned functions, instead of serializing them.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * <p>
+ * Encodes signed and unsigned values using a common variable-length scheme,
+ * found for example in <a
+ * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+ * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
+ * but will use slightly more bytes to encode large values.
+ * </p>
+ * <p/>
+ * <p>
+ * Signed values are further encoded using so-called zig-zag encoding in order
+ * to make them "compatible" with variable-length encoding.
+ * </p>
+ */
+public final class Varint {
+
+  /**
+   * private constructor
+   */
+  private Varint() {
+  }
+
+  /**
+   * Encodes a value using the variable-length encoding from <a
+   * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
+   * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be
+   * negative. If values can be negative, use
+   * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats
+   * negative input as like a large unsigned value.
+   *
+   * @param value
+   *          value to encode
+   * @param out
+   *          to write bytes to
+   * @throws IOException
+   *           if {@link DataOutput} throws {@link IOException}
+   */
+  public static void writeUnsignedVarLong(
+      long value, DataOutput out) throws IOException {
+    if (value < 0) {
+      throw new IllegalArgumentException(
+          "Negative value passed into writeUnsignedVarLong - " + value);
+    }
+    while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+      out.writeByte(((int) value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    out.writeByte((int) value & 0x7F);
+  }
+
+  /**
+   * @see #writeUnsignedVarLong(long, DataOutput)
+   * @param value
+   *          value to encode
+   * @param out
+   *          to write bytes to
+   */
+  public static void writeUnsignedVarInt(
+      int value, DataOutput out) throws IOException {
+    if (value < 0) {
+      throw new IllegalArgumentException(
+          "Negative value passed into writeUnsignedVarInt - " + value);
+    }
+    while ((value & 0xFFFFFF80) != 0L) {
+      out.writeByte((value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    out.writeByte(value & 0x7F);
+  }
+
+  /**
+   * @param in
+   *          to read bytes from
+   * @return decode value
+   * @throws IOException
+   *           if {@link DataInput} throws {@link IOException}
+   * @throws IllegalArgumentException
+   *           if variable-length value does not terminate after 9 bytes have
+   *           been read
+   * @see #writeUnsignedVarLong(long, DataOutput)
+   */
+  public static long readUnsignedVarLong(DataInput in) throws IOException {
+    long value = 0L;
+    int i = 0;
+    long b = in.readByte();
+    while ((b & 0x80L) != 0) {
+      value |= (b & 0x7F) << i;
+      i += 7;
+      if (i > 63) {
+        throw new IllegalArgumentException(
+            "Variable length quantity is too long");
+      }
+      b = in.readByte();
+    }
+    return value | (b << i);
+  }
+
+  /**
+   * @throws IllegalArgumentException
+   *           if variable-length value does not terminate after
+   *           5 bytes have been read
+   * @throws IOException
+   *           if {@link DataInput} throws {@link IOException}
+   * @param in to read bytes from.
+   * @return decode value.
+   */
+  public static int readUnsignedVarInt(DataInput in) throws IOException {
+    int value = 0;
+    int i = 0;
+    int b = in.readByte();
+    while ((b & 0x80) != 0) {
+      value |= (b & 0x7F) << i;
+      i += 7;
+      if (i > 35) {
+        throw new IllegalArgumentException(
+            "Variable length quantity is too long");
+      }
+      b = in.readByte();
+    }
+    return value | (b << i);
+  }
+  /**
+   * Simulation for what will happen when writing an unsigned long value
+   * as varlong.
+   * @param value the value
+   * @return the number of bytes needed to write value.
+   * @throws IOException
+   */
+  public static long sizeOfUnsignedVarLong(long value) throws IOException {
+    long cnt = 0;
+    while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+      cnt++;
+      value >>>= 7;
+    }
+    return ++cnt;
+  }
+
+  /**
+   * Simulation for what will happen when writing an unsigned int value
+   * as varint.
+   * @param value the value
+   * @return the number of bytes needed to write value.
+   * @throws IOException
+   */
+  public static long sizeOfUnsignedVarInt(int value) throws IOException {
+    long cnt = 0;
+    while ((value & 0xFFFFFF80) != 0L) {
+      cnt++;
+      value >>>= 7;
+    }
+    return ++cnt;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3c37bec..592ef7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -857,4 +858,42 @@ public class WritableUtils {
     return res;
   }
 
+  /**
+   * Writes enum into a stream, by serializing class name and it's index
+   * @param enumValue Enum value
+   * @param output Output stream
+   * @param <T> Enum type
+   */
+  public static <T extends Enum<T>> void writeEnum(T enumValue,
+      DataOutput output) throws IOException {
+    writeClass(
+        enumValue != null ? enumValue.getDeclaringClass() : null, output);
+    if (enumValue != null) {
+      Varint.writeUnsignedVarInt(enumValue.ordinal(), output);
+    }
+  }
+
+  /**
+   * Reads enum from the stream, serialized by writeEnum
+   * @param input Input stream
+   * @param <T> Enum type
+   * @return Enum value
+   */
+  public static <T extends Enum<T>> T readEnum(DataInput input) throws
+      IOException {
+    Class<T> clazz = readClass(input);
+    if (clazz != null) {
+      int ordinal = Varint.readUnsignedVarInt(input);
+      try {
+        T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null);
+        return values[ordinal];
+      } catch (IllegalAccessException | IllegalArgumentException
+          | InvocationTargetException | NoSuchMethodException
+          | SecurityException e) {
+        throw new IOException("Cannot read enum", e);
+      }
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
index 916e7a0..6472850 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java
@@ -54,6 +54,11 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
   }
 
   @Override
+  public void reducePartial(String name, Writable value) {
+    workerGlobalCommUsage.reducePartial(name, value);
+  }
+
+  @Override
   public final <B extends Writable> B getBroadcast(String name) {
     return workerGlobalCommUsage.getBroadcast(name);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index ee47542..96d239d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -113,7 +113,8 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
    * @param name Name of the reducer
    * @param valueToReduce Partial value to reduce
    */
-  protected void reducePartial(String name, Writable valueToReduce) {
+  @Override
+  public void reducePartial(String name, Writable valueToReduce) {
     Reducer<Object, Writable> reducer = reducerMap.get(name);
     if (reducer != null) {
       progressable.progress();
@@ -329,6 +330,19 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage {
     }
 
     @Override
+    public void reducePartial(String name, Writable value) {
+      Reducer<Object, Writable> reducer = threadReducerMap.get(name);
+      if (reducer != null) {
+        progressable.progress();
+        reducer.reducePartial(value);
+      } else {
+        throw new IllegalStateException("reducePartial: " +
+            AggregatorUtils.getUnregisteredAggregatorMessage(name,
+                threadReducerMap.size() != 0, conf));
+      }
+    }
+
+    @Override
     public <B extends Writable> B getBroadcast(String name) {
       return WorkerAggregatorHandler.this.getBroadcast(name);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
index 9c2e90d..fe7cd32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.worker;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * Methods on worker can provide values to reduce through this interface
  */
@@ -27,4 +29,11 @@ public interface WorkerReduceUsage {
    * @param value Single value to reduce
    */
   void reduce(String name, Object value);
+
+  /**
+   * Reduce given partial value.
+   * @param name Name of the reducer
+   * @param value Single value to reduce
+   */
+  void reducePartial(String name, Writable value);
 }