You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/20 21:08:28 UTC

git commit: GIRAPH-547: Allow in-place modification of edges (apresta)

Updated Branches:
  refs/heads/trunk 40bc599b8 -> 5bb956cad


GIRAPH-547: Allow in-place modification of edges (apresta)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5bb956ca
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5bb956ca
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5bb956ca

Branch: refs/heads/trunk
Commit: 5bb956cad2d9c4beded036553f715f0e3156d665
Parents: 40bc599
Author: Alessandro Presta <al...@fb.com>
Authored: Wed Mar 13 19:18:39 2013 -0700
Committer: Alessandro Presta <al...@fb.com>
Committed: Wed Mar 20 13:08:17 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../conf/ImmutableClassesGiraphConfiguration.java  |   29 ++-
 .../org/apache/giraph/edge/ArrayListEdges.java     |   11 +-
 .../org/apache/giraph/edge/ByteArrayEdges.java     |    4 +-
 .../java/org/apache/giraph/edge/DefaultEdge.java   |    2 +-
 .../java/org/apache/giraph/edge/EdgeFactory.java   |   14 +-
 .../java/org/apache/giraph/edge/EdgeNoValue.java   |    2 +-
 .../java/org/apache/giraph/edge/EdgeStore.java     |    3 +-
 .../java/org/apache/giraph/edge/HashMapEdges.java  |   35 ++-
 .../org/apache/giraph/edge/HashMultimapEdges.java  |    4 +-
 .../apache/giraph/edge/LongDoubleArrayEdges.java   |   76 ++++++-
 .../apache/giraph/edge/LongDoubleHashMapEdges.java |   80 ++++++-
 .../org/apache/giraph/edge/LongNullArrayEdges.java |   44 +++--
 .../apache/giraph/edge/LongNullHashSetEdges.java   |   30 ++-
 .../org/apache/giraph/edge/MapMutableEdge.java     |   61 +++++
 .../java/org/apache/giraph/edge/MutableEdge.java   |   12 +-
 .../apache/giraph/edge/MutableEdgesIterable.java   |  102 ++++++++
 .../apache/giraph/edge/MutableEdgesWrapper.java    |  183 +++++++++++++++
 .../org/apache/giraph/edge/MutableVertexEdges.java |   42 ++++
 .../java/org/apache/giraph/edge/ReusableEdge.java  |   43 ++++
 .../giraph/edge/StrictRandomAccessVertexEdges.java |    9 +
 .../org/apache/giraph/graph/ComputeCallable.java   |    2 +
 .../main/java/org/apache/giraph/graph/Vertex.java  |   66 ++++++
 .../giraph/utils/ByteArrayVertexIdEdges.java       |    2 +-
 .../org/apache/giraph/utils/PairListWritable.java  |   76 ------
 .../org/apache/giraph/edge/TestNullValueEdges.java |   46 ++++-
 .../giraph/edge/TestStrictRandomAccessEdges.java   |    3 +
 .../apache/giraph/graph/TestVertexAndEdges.java    |  169 +++++++++++++-
 .../giraph/hive/input/edge/HiveEdgeReader.java     |   10 +-
 29 files changed, 1002 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 462f104..d1983d0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-547: Allow in-place modification of edges (apresta)
+
   GIRAPH-537: Fix log messages produced by aggregators (majakabiljo)
 
   GIRAPH-480: Add convergence detection to org.apache.giraph.examples.RandomWalkVertex (ssc)

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 7075999..4fedc46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.edge.ReuseObjectsVertexEdges;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -40,7 +42,6 @@ import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.edge.MutableEdge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.worker.WorkerContext;
@@ -508,15 +509,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
-   * Create a mutable user edge.
+   * Create a reusable edge.
    *
-   * @return Instantiated mutable user edge.
+   * @return Instantiated reusable edge.
    */
-  public MutableEdge<I, E> createMutableEdge() {
+  public ReusableEdge<I, E> createReusableEdge() {
     if (isEdgeValueNullWritable()) {
-      return (MutableEdge<I, E>) EdgeFactory.createMutable(createVertexId());
+      return (ReusableEdge<I, E>) EdgeFactory.createReusable(createVertexId());
     } else {
-      return EdgeFactory.createMutable(createVertexId(), createEdgeValue());
+      return EdgeFactory.createReusable(createVertexId(), createEdgeValue());
     }
   }
 
@@ -562,6 +563,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * True if the {@link VertexEdges} implementation copies the passed edges
+   * to its own data structure, i.e. it doesn't keep references to Edge
+   * objects, target vertex ids or edge values passed to add() or
+   * initialize().
+   * This makes it possible to reuse edge objects passed to the data
+   * structure, to minimize object instantiation (see for example
+   * EdgeStore#addPartitionEdges()).
+   *
+   * @return True iff we can reuse the edge objects
+   */
+  public boolean reuseEdgeObjects() {
+    return ReuseObjectsVertexEdges.class.isAssignableFrom(
+        getVertexEdgesClass());
+  }
+
+  /**
    * Create a user {@link VertexEdges}
    *
    * @return Instantiated user VertexEdges

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
index 98b1aef..dda7568 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java
@@ -37,7 +37,8 @@ import java.util.Iterator;
  * @param <E> Edge value
  */
 public class ArrayListEdges<I extends WritableComparable, E extends Writable>
-    extends ConfigurableVertexEdges<I, E> {
+    extends ConfigurableVertexEdges<I, E>
+    implements MutableVertexEdges<I, E> {
   /** List of edges. */
   private ArrayList<Edge<I, E>> edgeList;
 
@@ -89,6 +90,14 @@ public class ArrayListEdges<I extends WritableComparable, E extends Writable>
   }
 
   @Override
+  @SuppressWarnings("unchecked")
+  public Iterator<MutableEdge<I, E>> mutableIterator() {
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) iterator();
+  }
+
+  @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(edgeList.size());
     for (Edge<I, E> edge : edgeList) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
index 6201d25..16748ef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java
@@ -142,8 +142,8 @@ public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
         getConf().createExtendedDataInput(
             serializedEdges, 0, serializedEdgesBytesUsed);
     /** Representative edge object. */
-    private MutableEdge<I, E> representativeEdge =
-        getConf().createMutableEdge();
+    private ReusableEdge<I, E> representativeEdge =
+        getConf().createReusableEdge();
 
     @Override
     public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
index 461bff3..d7cfb2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/DefaultEdge.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class DefaultEdge<I extends WritableComparable, E extends Writable>
-    implements MutableEdge<I, E> {
+    implements ReusableEdge<I, E> {
   /** Target vertex id */
   private I targetVertexId = null;
   /** Edge value */

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
index 3599207..c0d1a0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeFactory.java
@@ -41,7 +41,7 @@ public class EdgeFactory {
   public static <I extends WritableComparable,
                  E extends Writable>
   Edge<I, E> create(I id, E value) {
-    return createMutable(id, value);
+    return createReusable(id, value);
   }
 
   /**
@@ -53,11 +53,11 @@ public class EdgeFactory {
    */
   public static <I extends WritableComparable>
   Edge<I, NullWritable> create(I id) {
-    return createMutable(id);
+    return createReusable(id);
   }
 
   /**
-   * Create a mutable edge pointing to a given ID with a value
+   * Create a reusable edge pointing to a given ID with a value
    *
    * @param id target ID
    * @param value edge value
@@ -67,23 +67,23 @@ public class EdgeFactory {
    */
   public static <I extends WritableComparable,
                  E extends Writable>
-  MutableEdge<I, E> createMutable(I id, E value) {
+  ReusableEdge<I, E> createReusable(I id, E value) {
     if (value instanceof NullWritable) {
-      return (MutableEdge<I, E>) createMutable(id);
+      return (ReusableEdge<I, E>) createReusable(id);
     } else {
       return new DefaultEdge<I, E>(id, value);
     }
   }
 
   /**
-   * Create a mutable edge pointing to a given ID with a value
+   * Create a reusable edge pointing to a given ID with a value
    *
    * @param id target ID
    * @param <I> Vertex ID type
    * @return Edge pointing to ID with value
    */
   public static <I extends WritableComparable>
-  MutableEdge<I, NullWritable> createMutable(I id) {
+  ReusableEdge<I, NullWritable> createReusable(I id) {
     return new EdgeNoValue<I>(id);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
index dd22aec..306a413 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeNoValue.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex ID
  */
 public class EdgeNoValue<I extends WritableComparable>
-    implements MutableEdge<I, NullWritable> {
+    implements ReusableEdge<I, NullWritable> {
   /** Target vertex id */
   private I targetVertexId = null;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 1f6e9bb..234c267 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -75,8 +75,7 @@ public class EdgeStore<I extends WritableComparable,
     this.progressable = progressable;
     transientEdges = new MapMaker().concurrencyLevel(
         configuration.getNettyServerExecutionConcurrency()).makeMap();
-    reuseEdgeObjects = ReuseObjectsVertexEdges.class.isAssignableFrom(
-        configuration.getVertexEdgesClass());
+    reuseEdgeObjects = configuration.reuseEdgeObjects();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
index 2600992..9fa7b64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.edge;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.UnmodifiableIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -42,7 +41,8 @@ import java.util.Map;
  */
 public class HashMapEdges<I extends WritableComparable, E extends Writable>
     extends ConfigurableVertexEdges<I, E>
-    implements StrictRandomAccessVertexEdges<I, E> {
+    implements StrictRandomAccessVertexEdges<I, E>,
+    MutableVertexEdges<I, E> {
   /** Map from target vertex id to edge value. */
   private HashMap<I, E> edgeMap;
 
@@ -86,6 +86,13 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
   }
 
   @Override
+  public void setEdgeValue(I targetVertexId, E edgeValue) {
+    if (edgeMap.containsKey(targetVertexId)) {
+      edgeMap.put(targetVertexId, edgeValue);
+    }
+  }
+
+  @Override
   public int size() {
     return edgeMap.size();
   }
@@ -93,13 +100,20 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
   @Override
   public Iterator<Edge<I, E>> iterator() {
     // Returns an iterator that reuses objects.
-    return new UnmodifiableIterator<Edge<I, E>>() {
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) mutableIterator();
+  }
+
+  @Override
+  public Iterator<MutableEdge<I, E>> mutableIterator() {
+    return new Iterator<MutableEdge<I, E>>() {
       /** Wrapped map iterator. */
       private Iterator<Map.Entry<I, E>> mapIterator =
           edgeMap.entrySet().iterator();
       /** Representative edge object. */
-      private MutableEdge<I, E> representativeEdge =
-          getConf().createMutableEdge();
+      private MapMutableEdge<I, E> representativeEdge =
+          new MapMutableEdge<I, E>();
 
       @Override
       public boolean hasNext() {
@@ -107,12 +121,15 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable>
       }
 
       @Override
-      public Edge<I, E> next() {
-        Map.Entry<I, E> nextEntry = mapIterator.next();
-        representativeEdge.setTargetVertexId(nextEntry.getKey());
-        representativeEdge.setValue(nextEntry.getValue());
+      public MutableEdge<I, E> next() {
+        representativeEdge.setEntry(mapIterator.next());
         return representativeEdge;
       }
+
+      @Override
+      public void remove() {
+        mapIterator.remove();
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
index 143d7a4..123d49f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMultimapEdges.java
@@ -111,8 +111,8 @@ public class HashMultimapEdges<I extends WritableComparable, E extends Writable>
       private Iterator<Map.Entry<I, E>> mapIterator =
           edgeMultimap.entries().iterator();
       /** Representative edge object. */
-      private MutableEdge<I, E> representativeEdge =
-          getConf().createMutableEdge();
+      private ReusableEdge<I, E> representativeEdge =
+          getConf().createReusableEdge();
 
       @Override
       public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
index f164484..0487d92 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java
@@ -41,7 +41,8 @@ import java.util.Iterator;
  */
 public class LongDoubleArrayEdges
     extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
-    implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+    implements ReuseObjectsVertexEdges<LongWritable, DoubleWritable>,
+    MutableVertexEdges<LongWritable, DoubleWritable> {
   /** Array of target vertex ids. */
   private LongArrayList neighbors;
   /** Array of edge values. */
@@ -97,7 +98,7 @@ public class LongDoubleArrayEdges
    *
    * @param i Position of edge to be removed
    */
-  private void remove(int i) {
+  private void removeAt(int i) {
     // The order of the edges is irrelevant, so we can simply replace
     // the deleted edge with the rightmost element, thus achieving constant
     // time.
@@ -108,18 +109,19 @@ public class LongDoubleArrayEdges
       neighbors.set(i, neighbors.popLong());
       edgeValues.set(i, edgeValues.popDouble());
     }
+    // If needed after the removal, trim the arrays.
+    trim();
   }
 
   @Override
   public void remove(LongWritable targetVertexId) {
-    // Thanks to the constant-time implementation of remove(int),
+    // Thanks to the constant-time implementation of removeAt(int),
     // we can remove all matching edges in linear time.
     for (int i = neighbors.size() - 1; i >= 0; --i) {
       if (neighbors.get(i) == targetVertexId.get()) {
-        remove(i);
+        removeAt(i);
       }
     }
-    trim();
   }
 
   @Override
@@ -153,6 +155,70 @@ public class LongDoubleArrayEdges
     };
   }
 
+  /** Helper class for a mutable edge that modifies the backing arrays. */
+  private class LongDoubleArrayMutableEdge
+      extends DefaultEdge<LongWritable, DoubleWritable> {
+    /** Index of the edge in the backing arrays. */
+    private int index;
+
+    /** Constructor. */
+    public LongDoubleArrayMutableEdge() {
+      super(new LongWritable(), new DoubleWritable());
+    }
+
+    /**
+     * Make the edge point to the given index in the backing arrays.
+     *
+     * @param index Index in the arrays
+     */
+    public void setIndex(int index) {
+      // Update the id and value objects from the superclass.
+      getTargetVertexId().set(neighbors.get(index));
+      getValue().set(edgeValues.get(index));
+      // Update the index.
+      this.index = index;
+    }
+
+    @Override
+    public void setValue(DoubleWritable value) {
+      // Update the value object from the superclass.
+      getValue().set(value.get());
+      // Update the value stored in the backing array.
+      edgeValues.set(index, value.get());
+    }
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, DoubleWritable>>
+  mutableIterator() {
+    return new Iterator<MutableEdge<LongWritable, DoubleWritable>>() {
+      /** Current position in the array. */
+      private int offset = 0;
+      /** Representative edge object. */
+      private LongDoubleArrayMutableEdge representativeEdge =
+          new LongDoubleArrayMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return offset < neighbors.size();
+      }
+
+      @Override
+      public MutableEdge<LongWritable, DoubleWritable> next() {
+        representativeEdge.setIndex(offset++);
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        // Since removeAt() might replace the deleted edge with the last edge
+        // in the array, we need to decrease the offset so that the latter
+        // won't be skipped.
+        removeAt(--offset);
+      }
+    };
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(neighbors.size());

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
index 68bd85f..867a356 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java
@@ -42,7 +42,8 @@ import java.util.Iterator;
 public class LongDoubleHashMapEdges
     extends ConfigurableVertexEdges<LongWritable, DoubleWritable>
     implements StrictRandomAccessVertexEdges<LongWritable, DoubleWritable>,
-    ReuseObjectsVertexEdges<LongWritable, DoubleWritable> {
+    ReuseObjectsVertexEdges<LongWritable, DoubleWritable>,
+    MutableVertexEdges<LongWritable, DoubleWritable> {
   /** Hash map from target vertex id to edge value. */
   private Long2DoubleOpenHashMap edgeMap;
   /** Representative edge value object, used by getEdgeValue(). */
@@ -96,6 +97,14 @@ public class LongDoubleHashMapEdges
   }
 
   @Override
+  public void setEdgeValue(LongWritable targetVertexId,
+                           DoubleWritable edgeValue) {
+    if (edgeMap.containsKey(targetVertexId.get())) {
+      edgeMap.put(targetVertexId.get(), edgeValue.get());
+    }
+  }
+
+  @Override
   public int size() {
     return edgeMap.size();
   }
@@ -108,8 +117,8 @@ public class LongDoubleHashMapEdges
       private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
           edgeMap.long2DoubleEntrySet().fastIterator();
       /** Representative edge object. */
-      private MutableEdge<LongWritable, DoubleWritable> representativeEdge =
-          getConf().createMutableEdge();
+      private ReusableEdge<LongWritable, DoubleWritable> representativeEdge =
+          getConf().createReusableEdge();
 
       @Override
       public boolean hasNext() {
@@ -126,6 +135,71 @@ public class LongDoubleHashMapEdges
     };
   }
 
+  /** Helper class for a mutable edge that modifies the backing map entry. */
+  private static class LongDoubleHashMapMutableEdge
+      extends DefaultEdge<LongWritable, DoubleWritable> {
+    /** Backing entry for the edge in the map. */
+    private Long2DoubleMap.Entry entry;
+
+    /** Constructor. */
+    public LongDoubleHashMapMutableEdge() {
+      super(new LongWritable(), new DoubleWritable());
+    }
+
+    /**
+     * Make the edge point to the given entry in the backing map.
+     *
+     * @param entry Backing entry
+     */
+    public void setEntry(Long2DoubleMap.Entry entry) {
+      // Update the id and value objects from the superclass.
+      getTargetVertexId().set(entry.getLongKey());
+      getValue().set(entry.getValue());
+      // Update the entry.
+      this.entry = entry;
+    }
+
+    @Override
+    public void setValue(DoubleWritable value) {
+      // Update the value object from the superclass.
+      getValue().set(value.get());
+      // Update the value stored in the backing map.
+      entry.setValue(value.get());
+    }
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, DoubleWritable>> mutableIterator() {
+    return new Iterator<MutableEdge<LongWritable, DoubleWritable>>() {
+      /**
+       * Wrapped map iterator.
+       * Note: we cannot use the fast iterator in this case,
+       * because we need to call setValue() on an entry.
+       */
+      private ObjectIterator<Long2DoubleMap.Entry> mapIterator =
+          edgeMap.long2DoubleEntrySet().iterator();
+      /** Representative edge object. */
+      private LongDoubleHashMapMutableEdge representativeEdge =
+          new LongDoubleHashMapMutableEdge();
+
+      @Override
+      public boolean hasNext() {
+        return mapIterator.hasNext();
+      }
+
+      @Override
+      public MutableEdge<LongWritable, DoubleWritable> next() {
+        representativeEdge.setEntry(mapIterator.next());
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        mapIterator.remove();
+      }
+    };
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(edgeMap.size());

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
index 528acb2..de4e310 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.edge;
 
-import com.google.common.collect.UnmodifiableIterator;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongIterator;
 import org.apache.hadoop.io.LongWritable;
@@ -39,7 +38,8 @@ import java.util.Iterator;
  */
 public class LongNullArrayEdges
     extends ConfigurableVertexEdges<LongWritable, NullWritable>
-    implements ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+    implements ReuseObjectsVertexEdges<LongWritable, NullWritable>,
+    MutableVertexEdges<LongWritable, NullWritable> {
   /** Array of target vertex ids. */
   private LongArrayList neighbors;
 
@@ -89,7 +89,7 @@ public class LongNullArrayEdges
    *
    * @param i Position of edge to be removed
    */
-  private void remove(int i) {
+  private void removeAt(int i) {
     // The order of the edges is irrelevant, so we can simply replace
     // the deleted edge with the rightmost element, thus achieving constant
     // time.
@@ -98,18 +98,19 @@ public class LongNullArrayEdges
     } else {
       neighbors.set(i, neighbors.popLong());
     }
+    // If needed after the removal, trim the array.
+    trim();
   }
 
   @Override
   public void remove(LongWritable targetVertexId) {
-    // Thanks to the constant-time implementation of remove(int),
+    // Thanks to the constant-time implementation of removeAt(int),
     // we can remove all matching edges in linear time.
     for (int i = neighbors.size() - 1; i >= 0; --i) {
       if (neighbors.get(i) == targetVertexId.get()) {
-        remove(i);
+        removeAt(i);
       }
     }
-    trim();
   }
 
   @Override
@@ -120,23 +121,38 @@ public class LongNullArrayEdges
   @Override
   public Iterator<Edge<LongWritable, NullWritable>> iterator() {
     // Returns an iterator that reuses objects.
-    return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
-      /** Wrapped neighbors iterator. */
-      private LongIterator neighborsIt = neighbors.iterator();
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) mutableIterator();
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
+    return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
+      /** Current position in the array. */
+      private int offset = 0;
       /** Representative edge object. */
-      private Edge<LongWritable, NullWritable> representativeEdge =
-          getConf().createEdge();
+      private MutableEdge<LongWritable, NullWritable> representativeEdge =
+          getConf().createReusableEdge();
 
       @Override
       public boolean hasNext() {
-        return neighborsIt.hasNext();
+        return offset < neighbors.size();
       }
 
       @Override
-      public Edge<LongWritable, NullWritable> next() {
-        representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
+      public MutableEdge<LongWritable, NullWritable> next() {
+        representativeEdge.getTargetVertexId().set(neighbors.get(offset++));
         return representativeEdge;
       }
+
+      @Override
+      public void remove() {
+        // Since removeAt() might replace the deleted edge with the last edge
+        // in the array, we need to decrease the offset so that the latter
+        // won't be skipped.
+        removeAt(--offset);
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
index 26c57ae..094d471 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.edge;
 
-import com.google.common.collect.UnmodifiableIterator;
 import it.unimi.dsi.fastutil.longs.LongIterator;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import org.apache.hadoop.io.LongWritable;
@@ -40,8 +39,8 @@ import java.util.Iterator;
  */
 public class LongNullHashSetEdges
     extends ConfigurableVertexEdges<LongWritable, NullWritable>
-    implements StrictRandomAccessVertexEdges<LongWritable, NullWritable>,
-    ReuseObjectsVertexEdges<LongWritable, NullWritable> {
+    implements ReuseObjectsVertexEdges<LongWritable, NullWritable>,
+    MutableVertexEdges<LongWritable, NullWritable> {
   /** Hash set of target vertex ids. */
   private LongOpenHashSet neighbors;
 
@@ -81,11 +80,6 @@ public class LongNullHashSetEdges
   }
 
   @Override
-  public NullWritable getEdgeValue(LongWritable targetVertexId) {
-    return NullWritable.get();
-  }
-
-  @Override
   public int size() {
     return neighbors.size();
   }
@@ -93,12 +87,19 @@ public class LongNullHashSetEdges
   @Override
   public Iterator<Edge<LongWritable, NullWritable>> iterator() {
     // Returns an iterator that reuses objects.
-    return new UnmodifiableIterator<Edge<LongWritable, NullWritable>>() {
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) mutableIterator();
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
+    return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
       /** Wrapped neighbors iterator. */
       private LongIterator neighborsIt = neighbors.iterator();
       /** Representative edge object. */
-      private MutableEdge<LongWritable, NullWritable> representativeEdge =
-          getConf().createMutableEdge();
+      private ReusableEdge<LongWritable, NullWritable> representativeEdge =
+          getConf().createReusableEdge();
 
       @Override
       public boolean hasNext() {
@@ -106,10 +107,15 @@ public class LongNullHashSetEdges
       }
 
       @Override
-      public Edge<LongWritable, NullWritable> next() {
+      public MutableEdge<LongWritable, NullWritable> next() {
         representativeEdge.getTargetVertexId().set(neighborsIt.nextLong());
         return representativeEdge;
       }
+
+      @Override
+      public void remove() {
+        neighborsIt.remove();
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
new file mode 100644
index 0000000..cb8702c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MapMutableEdge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Map;
+
+/**
+ * Helper class for a mutable edge that modifies the backing map entry.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MapMutableEdge<I extends WritableComparable, E extends Writable>
+    implements MutableEdge<I, E> {
+  /** Backing entry for the edge in the map. */
+  private Map.Entry<I, E> entry;
+
+  /**
+   * Set the backing entry for this edge in the map.
+   *
+   * @param entry Backing entry
+   */
+  public void setEntry(Map.Entry<I, E> entry) {
+    this.entry = entry;
+  }
+
+  @Override
+  public void setValue(E value) {
+    // Replace the value in the map.
+    entry.setValue(value);
+  }
+
+  @Override
+  public I getTargetVertexId() {
+    return entry.getKey();
+  }
+
+  @Override
+  public E getValue() {
+    return entry.getValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
index bf00b4f..78e34e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdge.java
@@ -22,9 +22,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
- * A complete edge, the target vertex and the edge value.  Can only be one
- * edge with a destination vertex id per edge map. This edge can be mutated,
- * that is you can set it's target vertex ID and edge value.
+ * An edge whose value can be modified.
  *
  * @param <I> Vertex index
  * @param <E> Edge value
@@ -32,16 +30,10 @@ import org.apache.hadoop.io.WritableComparable;
 public interface MutableEdge<I extends WritableComparable, E extends Writable>
     extends Edge<I, E> {
   /**
-   * Set the destination vertex index of this edge.
-   *
-   * @param targetVertexId new destination vertex
-   */
-  void setTargetVertexId(I targetVertexId);
-
-  /**
    * Set the value for this edge.
    *
    * @param value new edge value
    */
   void setValue(E value);
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
new file mode 100644
index 0000000..9dab09c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesIterable.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * Helper class to provide a mutable iterable over the edges when the chosen
+ * {@link VertexEdges} doesn't offer a specialized one.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MutableEdgesIterable<I extends WritableComparable,
+    E extends Writable> implements Iterable<MutableEdge<I, E>> {
+  /** Vertex that owns the out-edges. */
+  private Vertex<I, ?, E, ?> vertex;
+
+  /**
+   * Constructor.
+   *
+   * @param vertex Owning vertex
+   */
+  public MutableEdgesIterable(Vertex<I, ?, E, ?> vertex) {
+    this.vertex = vertex;
+  }
+
+  @Override
+  public Iterator<MutableEdge<I, E>> iterator() {
+    final MutableEdgesWrapper<I, E> mutableEdgesWrapper =
+        MutableEdgesWrapper.wrap((VertexEdges<I, E>) vertex.getEdges(),
+            vertex.getConf());
+    vertex.setEdges(mutableEdgesWrapper);
+
+    return new Iterator<MutableEdge<I, E>>() {
+      /** Iterator over the old edges. */
+      private Iterator<Edge<I, E>> oldEdgesIterator =
+          mutableEdgesWrapper.getOldEdgesIterator();
+      /** New edges data structure. */
+      private VertexEdges<I, E> newEdges = mutableEdgesWrapper.getNewEdges();
+
+      @Override
+      public boolean hasNext() {
+        // If the current edge is not null,
+        // we need to add it to the new edges.
+        Edge<I, E> currentEdge = mutableEdgesWrapper.getCurrentEdge();
+        if (currentEdge != null) {
+          newEdges.add(currentEdge);
+          mutableEdgesWrapper.setCurrentEdge(null);
+        }
+        if (!oldEdgesIterator.hasNext()) {
+          vertex.setEdges(newEdges);
+          return false;
+        } else {
+          return true;
+        }
+      }
+
+      @Override
+      public MutableEdge<I, E> next() {
+        // If the current edge is not null,
+        // we need to add it to the new edges.
+        MutableEdge<I, E> currentEdge =
+            mutableEdgesWrapper.getCurrentEdge();
+        if (currentEdge != null) {
+          newEdges.add(currentEdge);
+        }
+        // Read the next edge and return it.
+        currentEdge = (MutableEdge<I, E>) oldEdgesIterator.next();
+        mutableEdgesWrapper.setCurrentEdge(currentEdge);
+        return currentEdge;
+      }
+
+      @Override
+      public void remove() {
+        // Set the current edge to null, so that it's not added to the
+        // new edges.
+        mutableEdgesWrapper.setCurrentEdge(null);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
new file mode 100644
index 0000000..5b20bf7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableEdgesWrapper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Helper class that wraps the current out-edges and inserts them into a new
+ * data structure as they are iterated over.
+ * Used by Vertex to provide a mutable iterator when the chosen
+ * {@link VertexEdges} doesn't offer a specialized one.
+ * The edges are "unwrapped" back to the chosen {@link VertexEdges} data
+ * structure as soon as possible: either when the iterator is exhausted,
+ * or after compute() if iteration has been terminated early.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class MutableEdgesWrapper<I extends WritableComparable,
+    E extends Writable> implements VertexEdges<I, E> {
+  /** New edges data structure (initially empty). */
+  private final VertexEdges<I, E> newEdges;
+  /** Iterator over the old edges. */
+  private final Iterator<Edge<I, E>> oldEdgesIterator;
+  /** Last edge that was returned during iteration. */
+  private MutableEdge<I, E> currentEdge;
+
+  /**
+   * Private constructor: instantiation happens through the {@code wrap()}
+   * factory method.
+   *
+   * @param oldEdges Current out-edges
+   * @param newEdges New (empty) edges data structure
+   */
+  private MutableEdgesWrapper(VertexEdges<I, E> oldEdges,
+                              VertexEdges<I, E> newEdges) {
+    oldEdgesIterator = oldEdges.iterator();
+    this.newEdges = newEdges;
+  }
+
+  /**
+   * Factory method to create a new wrapper over the existing out-edges.
+   *
+   * @param edges Current out-edges
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @return The wrapped edges
+   */
+  public static <I extends WritableComparable, E extends Writable>
+  MutableEdgesWrapper<I, E> wrap(
+      VertexEdges<I, E> edges,
+      ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
+    MutableEdgesWrapper<I, E> wrapper = new MutableEdgesWrapper<I, E>(
+        edges, conf.createAndInitializeVertexEdges(edges.size()));
+    return wrapper;
+  }
+
+  /**
+   * Moves all the remaining edges to the new data structure, and returns it.
+   *
+   * @return The new {@link VertexEdges} data structure.
+   */
+  public VertexEdges<I, E> unwrap() {
+    if (currentEdge != null) {
+      newEdges.add(currentEdge);
+      currentEdge = null;
+    }
+    while (oldEdgesIterator.hasNext()) {
+      newEdges.add(oldEdgesIterator.next());
+    }
+    return newEdges;
+  }
+
+  /**
+   * Get the new {@link VertexEdges} data structure.
+   *
+   * @return New edges
+   */
+  public VertexEdges<I, E> getNewEdges() {
+    return newEdges;
+  }
+
+  /**
+   * Get the iterator over the old edges data structure.
+   *
+   * @return Old edges iterator
+   */
+  public Iterator<Edge<I, E>> getOldEdgesIterator() {
+    return oldEdgesIterator;
+  }
+
+  /**
+   * Get the last edge returned by the mutable iterator.
+   *
+   * @return Last edge iterated on
+   */
+  public MutableEdge<I, E> getCurrentEdge() {
+    return currentEdge;
+  }
+
+  /**
+   * Set the last edge returned by the mutable iterator.
+   *
+   * @param edge Last edge iterated on
+   */
+  public void setCurrentEdge(MutableEdge<I, E> edge) {
+    currentEdge = edge;
+  }
+
+  @Override
+  public void initialize(Iterable<Edge<I, E>> edges) {
+    throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+        "never be initialized.");
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+        "never be initialized.");
+  }
+
+  @Override
+  public void initialize() {
+    throw new IllegalStateException("initialize: MutableEdgesWrapper should " +
+        "never be initialized.");
+  }
+
+  @Override
+  public void add(Edge<I, E> edge) {
+    unwrap().add(edge);
+  }
+
+  @Override
+  public void remove(I targetVertexId) {
+    unwrap().remove(targetVertexId);
+  }
+
+  @Override
+  public int size() {
+    return unwrap().size();
+  }
+
+  @Override
+  public Iterator<Edge<I, E>> iterator() {
+    return unwrap().iterator();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new IllegalStateException("write: MutableEdgesWrapper should " +
+        "never be serialized.");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new IllegalStateException("readFields: MutableEdgesWrapper should " +
+        "never be deserialized.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
new file mode 100644
index 0000000..503de92
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/MutableVertexEdges.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * Interface for {@link VertexEdges} implementations that have an optimized
+ * mutable edge iterator.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface MutableVertexEdges<I extends WritableComparable,
+    E extends Writable> extends VertexEdges<I, E> {
+  /**
+   * Returns an iterator over edges that can be modified in-place,
+   * either by changing the current edge value or by removing the current edge.
+   *
+   * @return A mutable edge iterator
+   */
+  Iterator<MutableEdge<I, E>> mutableIterator();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java b/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
new file mode 100644
index 0000000..d55e25b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/ReusableEdge.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A complete edge, the target vertex and the edge value.  Can only be one
+ * edge with a destination vertex id per edge map. This edge can be reused,
+ * that is you can set it's target vertex ID and edge value.
+ * Note: this class is useful for certain optimizations,
+ * but it's not meant to be exposed to the user. Look at {@link MutableEdge}
+ * instead.
+ *
+ * @param <I> Vertex index
+ * @param <E> Edge value
+ */
+public interface ReusableEdge<I extends WritableComparable, E extends Writable>
+    extends MutableEdge<I, E> {
+  /**
+   * Set the destination vertex index of this edge.
+   *
+   * @param targetVertexId new destination vertex
+   */
+  void setTargetVertexId(I targetVertexId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
index 36381a7..5fdc2d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
@@ -39,4 +39,13 @@ public interface StrictRandomAccessVertexEdges<I extends WritableComparable,
    * @return Edge value
    */
   E getEdgeValue(I targetVertexId);
+
+  /**
+   * Set the edge value for the given target vertex id (if an edge to that
+   * vertex exists).
+   *
+   * @param targetVertexId Target vertex id
+   * @param edgeValue Edge value
+   */
+  void setEdgeValue(I targetVertexId, E edgeValue);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 439ee5b..4840471 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -223,6 +223,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
           } finally {
             computeOneTimerContext.stop();
           }
+          // Need to unwrap the mutated edges (possibly)
+          vertex.unwrapMutableEdges();
           // Need to save the vertex changes (possibly)
           partition.saveVertex(vertex);
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index 66f081a..3031274 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -23,6 +23,10 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.MultiRandomAccessVertexEdges;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.edge.MutableEdgesIterable;
+import org.apache.giraph.edge.MutableEdgesWrapper;
+import org.apache.giraph.edge.MutableVertexEdges;
 import org.apache.giraph.edge.StrictRandomAccessVertexEdges;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.partition.PartitionContext;
@@ -186,6 +190,42 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
+   * Get an iterable of out-edges that can be modified in-place.
+   * This can mean changing the current edge value or removing the current edge
+   * (by using the iterator version).
+   * Note: if
+   *
+   * @return An iterable of mutable out-edges
+   */
+  public Iterable<MutableEdge<I, E>> getMutableEdges() {
+    // If the VertexEdges implementation has a specialized mutable iterator,
+    // we use that; otherwise, we build a new data structure as we iterate
+    // over the current edges.
+    if (edges instanceof MutableVertexEdges) {
+      return new Iterable<MutableEdge<I, E>>() {
+        @Override
+        public Iterator<MutableEdge<I, E>> iterator() {
+          return ((MutableVertexEdges<I, E>) edges).mutableIterator();
+        }
+      };
+    } else {
+      return new MutableEdgesIterable<I, E>(this);
+    }
+  }
+
+  /**
+   * If a {@link MutableEdgesWrapper} was used to provide a mutable iterator,
+   * copy any remaining edges to the new {@link VertexEdges} data
+   * structure and keep a direct reference to it (thus discarding the wrapper).
+   * Called by the Giraph infrastructure after computation.
+   */
+  public void unwrapMutableEdges() {
+    if (edges instanceof MutableEdgesWrapper) {
+      edges = ((MutableEdgesWrapper<I, E>) edges).unwrap();
+    }
+  }
+
+  /**
    * Get the number of outgoing edges on this vertex.
    *
    * @return the total number of outbound edges from this vertex
@@ -205,6 +245,8 @@ public abstract class Vertex<I extends WritableComparable,
    * @return Edge value (or null if missing)
    */
   public E getEdgeValue(I targetVertexId) {
+    // If the VertexEdges implementation has a specialized random-access
+    // method, we use that; otherwise, we scan the edges.
     if (edges instanceof StrictRandomAccessVertexEdges) {
       return ((StrictRandomAccessVertexEdges<I, E>) edges)
           .getEdgeValue(targetVertexId);
@@ -219,6 +261,28 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
+   * If an edge to the target vertex exists, set it to the given edge value.
+   * This only makes sense with strict graphs.
+   *
+   * @param targetVertexId Target vertex id
+   * @param edgeValue Edge value
+   */
+  public void setEdgeValue(I targetVertexId, E edgeValue) {
+    // If the VertexEdges implementation has a specialized random-access
+    // method, we use that; otherwise, we scan the edges.
+    if (edges instanceof StrictRandomAccessVertexEdges) {
+      ((StrictRandomAccessVertexEdges<I, E>) edges).setEdgeValue(
+          targetVertexId, edgeValue);
+    } else {
+      for (MutableEdge<I, E> edge : getMutableEdges()) {
+        if (edge.getTargetVertexId().equals(targetVertexId)) {
+          edge.setValue(edgeValue);
+        }
+      }
+    }
+  }
+
+  /**
    * Get an iterable over the values of all edges with the given target
    * vertex id. This only makes sense for multigraphs (i.e. graphs with
    * parallel edges).
@@ -230,6 +294,8 @@ public abstract class Vertex<I extends WritableComparable,
    * @return Iterable of edge values
    */
   public Iterable<E> getAllEdgeValues(final I targetVertexId) {
+    // If the VertexEdges implementation has a specialized random-access
+    // method, we use that; otherwise, we scan the edges.
     if (edges instanceof MultiRandomAccessVertexEdges) {
       return ((MultiRandomAccessVertexEdges<I, E>) edges)
           .getAllEdgeValues(targetVertexId);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 8a5fb01..2c5f2f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -47,7 +47,7 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
 
   @Override
   public Edge<I, E> createData() {
-    return getConf().createMutableEdge();
+    return getConf().createReusableEdge();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java b/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
deleted file mode 100644
index 413656b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/PairListWritable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Writable implementation of {@link PairList}.
- *
- * @param <U> Type of the first element in a pair
- * @param <V> Type of the second element in a pair
- */
-public abstract class PairListWritable<U extends Writable,
-    V extends Writable> extends PairList<U, V> implements Writable {
-  /**
-   * Create an empty instance of the first element in the pair,
-   * so we could read it from {@DataInput}.
-   *
-   * @return New instance of the first element in the pair
-   */
-  protected abstract U newFirstInstance();
-
-  /**
-   * Create an empty instance of the second element in the pair,
-   * so we could read it from {@DataInput}.
-   *
-   * @return New instance of the second element in the pair
-   */
-  protected abstract V newSecondInstance();
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    int size = getSize();
-    output.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      firstList.get(i).write(output);
-      secondList.get(i).write(output);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    int size = input.readInt();
-    firstList = Lists.newArrayListWithCapacity(size);
-    secondList = Lists.newArrayListWithCapacity(size);
-    while (size-- > 0) {
-      U first = newFirstInstance();
-      first.readFields(input);
-      V second = newSecondInstance();
-      second.readFields(input);
-      add(first, second);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java b/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
index ef3fbc8..79bf093 100644
--- a/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/TestNullValueEdges.java
@@ -25,6 +25,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.giraph.graph.TestVertexAndEdges.instantiateVertexEdges;
@@ -35,7 +36,7 @@ import static org.junit.Assert.assertEquals;
  */
 public class TestNullValueEdges {
   /** {@link VertexEdges} classes to be tested. */
-  private Collection<Class<? extends VertexEdges>>
+  private Collection<Class<? extends MutableVertexEdges>>
       edgesClasses = Lists.newArrayList();
 
   @Before
@@ -65,10 +66,51 @@ public class TestNullValueEdges {
     edges.initialize(initialEdges);
     assertEquals(3, edges.size());
 
-    edges.add(EdgeFactory.createMutable(new LongWritable(4)));
+    edges.add(EdgeFactory.createReusable(new LongWritable(4)));
     assertEquals(4, edges.size());
 
     edges.remove(new LongWritable(2));
     assertEquals(3, edges.size());
   }
+
+  /**
+   * Test in-place edge mutations via the iterable returned by {@link
+   * org.apache.giraph.graph.Vertex#getMutableEdges()}.
+   */
+  @Test
+  public void testMutateEdges() {
+    for (Class<? extends MutableVertexEdges> edgesClass : edgesClasses) {
+      testMutateEdgesClass(edgesClass);
+    }
+  }
+
+  private void testMutateEdgesClass(
+      Class<? extends MutableVertexEdges> edgesClass) {
+    MutableVertexEdges<LongWritable, NullWritable> edges =
+       (MutableVertexEdges<LongWritable, NullWritable>)
+           instantiateVertexEdges(edgesClass);
+
+    edges.initialize();
+
+    // Add 10 edges with id i, for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      edges.add(EdgeFactory.create(new LongWritable(i)));
+    }
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEquals(5, edges.size());
+    // The edge ids should be all odd
+    for (Edge<LongWritable, NullWritable> edge : edges) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java b/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
index e6daa94..d3ad069 100644
--- a/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/TestStrictRandomAccessEdges.java
@@ -74,5 +74,8 @@ public class TestStrictRandomAccessEdges {
 
       assertEquals(3.0, edges.getEdgeValue(new LongWritable(3)).get(), 0.0);
       assertNull(edges.getEdgeValue(new LongWritable(55)));
+
+      edges.setEdgeValue(new LongWritable(2), new DoubleWritable(33.0));
+      assertEquals(33.0, edges.getEdgeValue(new LongWritable(2)).get(), 0);
     }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index 425abe7..fb5b685 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -28,6 +28,7 @@ import org.apache.giraph.edge.HashMapEdges;
 import org.apache.giraph.edge.HashMultimapEdges;
 import org.apache.giraph.edge.LongDoubleArrayEdges;
 import org.apache.giraph.edge.LongDoubleHashMapEdges;
+import org.apache.giraph.edge.MutableEdge;
 import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
@@ -44,8 +45,11 @@ import org.apache.hadoop.io.LongWritable;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -53,7 +57,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Test {@link Vertex} functionality across the provided {@link org.apache.giraph.edge.VertexEdges}
+ * Test {@link Vertex} functionality across the provided {@link VertexEdges}
  * classes.
  */
 public class TestVertexAndEdges {
@@ -72,8 +76,82 @@ public class TestVertexAndEdges {
     public void compute(Iterable<LongWritable> messages) { }
   }
 
+  /**
+   * A basic {@link VertexEdges} implementation that doesn't provide any
+   * special functionality. Used to test the default implementations of
+   * Vertex#getEdgeValue(), Vertex#getMutableEdges(), etc.
+   */
+  public static class TestVertexEdges
+      implements VertexEdges<LongWritable, DoubleWritable> {
+    private List<Edge<LongWritable, DoubleWritable>> edgeList;
+
+
+    @Override
+    public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
+      this.edgeList = Lists.newArrayList(edges);
+    }
+
+    @Override
+    public void initialize(int capacity) {
+      this.edgeList = Lists.newArrayListWithCapacity(capacity);
+    }
+
+    @Override
+    public void initialize() {
+      this.edgeList = Lists.newArrayList();
+    }
+
+    @Override
+    public void add(Edge<LongWritable, DoubleWritable> edge) {
+      edgeList.add(edge);
+    }
+
+    @Override
+    public void remove(LongWritable targetVertexId) {
+      for (Iterator<Edge<LongWritable, DoubleWritable>> edges =
+               edgeList.iterator(); edges.hasNext();) {
+        Edge<LongWritable, DoubleWritable> edge = edges.next();
+        if (edge.getTargetVertexId().equals(targetVertexId)) {
+          edges.remove();
+        }
+      }
+    }
+
+    @Override
+    public int size() {
+      return edgeList.size();
+    }
+
+    @Override
+    public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
+      return edgeList.iterator();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(edgeList.size());
+      for (Edge<LongWritable, DoubleWritable> edge : edgeList) {
+        edge.getTargetVertexId().write(out);
+        edge.getValue().write(out);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int numEdges = in.readInt();
+      initialize(numEdges);
+      for (int i = 0; i < numEdges; ++i) {
+        Edge<LongWritable, DoubleWritable> edge = EdgeFactory.createReusable(
+            new LongWritable(), new DoubleWritable());
+        WritableUtils.readEdge(in, edge);
+        edgeList.add(edge);
+      }
+    }
+  }
+
   @Before
   public void setUp() {
+    edgesClasses.add(TestVertexEdges.class);
     edgesClasses.add(ByteArrayEdges.class);
     edgesClasses.add(ArrayListEdges.class);
     edgesClasses.add(HashMapEdges.class);
@@ -157,6 +235,95 @@ public class TestVertexAndEdges {
     for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
       assert(edge.getTargetVertexId().get() != 500);
     }
+
+    vertex.setEdgeValue(new LongWritable(10), new DoubleWritable(33.0));
+    assertEquals(33.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0);
+  }
+
+  /**
+   * Test in-place edge mutations via the iterable returned by {@link
+   * Vertex#getMutableEdges()}.
+   */
+  @Test
+  public void testMutateEdges() {
+    for (Class<? extends VertexEdges> edgesClass : edgesClasses) {
+      testMutateEdgesClass(edgesClass);
+    }
+  }
+
+  private void testMutateEdgesClass(Class<? extends VertexEdges> edgesClass) {
+    Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
+        instantiateVertex(edgesClass);
+    VertexEdges<LongWritable, DoubleWritable> vertexEdges =
+        instantiateVertexEdges(edgesClass);
+
+    vertexEdges.initialize();
+    vertex.initialize(new LongWritable(0), new FloatWritable(0), vertexEdges);
+
+    // Add 10 edges with id i, value i for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      vertex.addEdge(EdgeFactory.create(
+          new LongWritable(i), new DoubleWritable(i)));
+    }
+
+    // Use the mutable iterable to multiply each edge value by 2
+    for (MutableEdge<LongWritable, DoubleWritable> edge :
+        vertex.getMutableEdges()) {
+      edge.setValue(new DoubleWritable(edge.getValue().get() * 2));
+    }
+
+    // We should still have 10 edges
+    assertEquals(10, vertex.getNumEdges());
+    // The edge values should now be double the ids
+    for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+      long id = edge.getTargetVertexId().get();
+      double value = edge.getValue().get();
+      assertEquals(id * 2, value, 0);
+    }
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, DoubleWritable>> edgeIt =
+        vertex.getMutableEdges().iterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEquals(5, vertex.getNumEdges());
+    // The edge ids should be all odd
+    for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+    }
+
+    // Breaking iteration early should not make us lose edges.
+    // This version uses repeated calls to next():
+    Iterator<MutableEdge<LongWritable, DoubleWritable>> it =
+        vertex.getMutableEdges().iterator();
+    it.next();
+    it.next();
+    assertEquals(5, vertex.getNumEdges());
+
+    // This version uses a for-each loop, and the break statement:
+    int i = 2;
+    for (MutableEdge<LongWritable, DoubleWritable> edge :
+        vertex.getMutableEdges()) {
+      System.out.println(edge.toString());
+      if (i-- == 0) {
+        break;
+      }
+    }
+    assertEquals(5, vertex.getNumEdges());
+
+    // This version uses a normal, immutable iterable:
+    i = 2;
+    for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+      if (i-- == 0) {
+        break;
+      }
+    }
+    assertEquals(5, vertex.getNumEdges());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/5bb956ca/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index e0c502c..6fb183a 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -20,7 +20,7 @@ package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.edge.ReusableEdge;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
@@ -62,7 +62,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
    * If we are reusing edges this will be the single edge to read into.
    * Otherwise if it's null we will create a new edge each time.
    */
-  private MutableEdge<I, E> edgeToReuse = null;
+  private ReusableEdge<I, E> edgeToReuse = null;
 
   /**
    * Get underlying Hive record reader used.
@@ -117,7 +117,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
     conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
     instantiateHiveToEdgeFromConf();
     if (conf.getBoolean(REUSE_EDGE_KEY, false)) {
-      edgeToReuse = conf.createMutableEdge();
+      edgeToReuse = conf.createReusableEdge();
     }
   }
 
@@ -160,9 +160,9 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable>
   public Edge<I, E> getCurrentEdge() throws IOException,
       InterruptedException {
     HiveRecord record = hiveRecordReader.getCurrentValue();
-    MutableEdge<I, E> edge = edgeToReuse;
+    ReusableEdge<I, E> edge = edgeToReuse;
     if (edge == null) {
-      edge = conf.createMutableEdge();
+      edge = conf.createReusableEdge();
     }
     edge.setValue(hiveToEdge.getEdgeValue(record));
     edge.setTargetVertexId(hiveToEdge.getTargetVertexId(record));