You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/07 06:37:41 UTC

[6/8] GIRAPH-528: Decouple vertex implementation from edge storage (apresta)

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
new file mode 100644
index 0000000..36381a7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for {@link VertexEdges} implementations that provide efficient
+ * random access to the edges given the target vertex id.
+ * This version is for strict graphs (i.e. assumes no parallel edges).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface StrictRandomAccessVertexEdges<I extends WritableComparable,
+    E extends Writable> extends VertexEdges<I, E> {
+  /**
+   * Return the edge value for the given target vertex id (or null if there
+   * is no edge pointing to it).
+   *
+   * @param targetVertexId Target vertex id
+   * @return Edge value
+   */
+  E getEdgeValue(I targetVertexId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
new file mode 100644
index 0000000..bb885b7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for data structures that store out-edges for a vertex.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface VertexEdges<I extends WritableComparable, E extends Writable>
+    extends Iterable<Edge<I, E>>, Writable {
+  /**
+   * Initialize the data structure and set the edges from an iterable.
+   * This method (or one of the two alternatives) must be called
+   * after instantiation, unless readFields() is called.
+   * Note: whether parallel edges are allowed or not depends on the
+   * implementation.
+   *
+   * @param edges Iterable of edges
+   */
+  void initialize(Iterable<Edge<I, E>> edges);
+
+  /**
+   * Initialize the data structure with the specified initial capacity.
+   * This method (or one of the two alternatives) must be called
+   * after instantiation, unless readFields() is called.
+   *
+   * @param capacity Initial capacity
+   */
+  void initialize(int capacity);
+
+  /**
+   * Initialize the data structure with the default initial capacity.
+   * This method (or one of the two alternatives) must be called
+   * after instantiation, unless readFields() is called.
+   *
+   */
+  void initialize();
+
+  /**
+   * Add an edge.
+   * Note: whether parallel edges are allowed or not depends on the
+   * implementation.
+   *
+   * @param edge Edge to add
+   */
+  void add(Edge<I, E> edge);
+
+  /**
+   * Remove all edges to the given target vertex.
+   * Note: the implementation will vary depending on whether parallel edges
+   * are allowed or not.
+   *
+   * @param targetVertexId Target vertex id
+   */
+  void remove(I targetVertexId);
+
+  /**
+   * Return the number of edges.
+   *
+   * @return Number of edges
+   */
+  int size();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
new file mode 100644
index 0000000..2281509
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Vertex implementations.
+ */
+package org.apache.giraph.edge;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index c7aff7c..439ee5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -34,7 +34,6 @@ import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
 import org.apache.hadoop.io.Writable;
@@ -212,8 +211,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
         // Make sure every vertex has this thread's
         // graphState before computing
         vertex.setGraphState(graphState);
-        Iterable<M> messages =
-            messageStore.getVertexMessages(vertex.getId());
+        Iterable<M> messages = messageStore.getVertexMessages(vertex.getId());
         if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
           vertex.wakeUp();
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
deleted file mode 100644
index 039f0d7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.base.Objects;
-
-/**
- * A complete edge, the target vertex and the edge value.  Can only be one
- * edge with a destination vertex id per edge map.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class DefaultEdge<I extends WritableComparable, E extends Writable>
-    implements MutableEdge<I, E> {
-  /** Target vertex id */
-  private I targetVertexId = null;
-  /** Edge value */
-  private E value = null;
-
-  /**
-   * Constructor for reflection
-   */
-  public DefaultEdge() { }
-
-  /**
-   * Create the edge with final values. Don't call, use EdgeFactory instead.
-   *
-   * @param targetVertexId Desination vertex id.
-   * @param value Value of the edge.
-   */
-  DefaultEdge(I targetVertexId, E value) {
-    this.targetVertexId = targetVertexId;
-    this.value = value;
-  }
-
-  @Override
-  public I getTargetVertexId() {
-    return targetVertexId;
-  }
-
-  @Override
-  public E getValue() {
-    return value;
-  }
-
-  @Override
-  public void setTargetVertexId(I targetVertexId) {
-    this.targetVertexId = targetVertexId;
-  }
-
-  @Override
-  public void setValue(E value) {
-    this.value = value;
-  }
-
-  @Override
-  public String toString() {
-    return "(TargetVertexId = " + targetVertexId + ", " +
-        "value = " + value + ")";
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public int compareTo(Edge<I, E> edge) {
-    return targetVertexId.compareTo(edge.getTargetVertexId());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    DefaultEdge edge = (DefaultEdge) o;
-    return Objects.equal(targetVertexId, edge.targetVertexId) &&
-        Objects.equal(value, edge.value);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(targetVertexId, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
index c88b2b9..52df38d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
@@ -20,8 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -87,9 +86,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
       return;
     }
     if (hasEdgeRemovals(vertexChanges)) {
-      MutableVertex<I, V, E, M> mv = (MutableVertex<I, V, E, M>) vertex;
       for (I removedDestVertex : vertexChanges.getRemovedEdgeList()) {
-        mv.removeEdges(removedDestVertex);
+        vertex.removeEdges(removedDestVertex);
       }
     }
   }
@@ -156,9 +154,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
       return;
     }
     if (hasEdgeAdditions(vertexChanges)) {
-      MutableVertex<I, V, E, M> mv = (MutableVertex<I, V, E, M>) vertex;
       for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
-        mv.addEdge(edge);
+        vertex.addEdge(edge);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java b/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
deleted file mode 100644
index 185e3c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A complete edge, the target vertex and the edge value.  Can only be one
- * edge with a destination vertex id per edge map.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-public interface Edge<I extends WritableComparable, E extends Writable>
-    extends Comparable<Edge<I, E>> {
-  /**
-   * Get the target vertex index of this edge
-   *
-   * @return Target vertex index of this edge
-   */
-  I getTargetVertexId();
-
-  /**
-   * Get the edge value of the edge
-   *
-   * @return Edge value of this edge
-   */
-  E getValue();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
deleted file mode 100644
index a3e6efb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Factory for creating Edges
- */
-public class EdgeFactory {
-  /** Do not construct */
-  private EdgeFactory() { }
-
-  /**
-   * Create an edge pointing to a given ID with a value
-   *
-   * @param id target ID
-   * @param value edge value
-   * @param <I> Vertex ID type
-   * @param <E> Edge Value type
-   * @return Edge pointing to ID with value
-   */
-  public static <I extends WritableComparable,
-                 E extends Writable>
-  Edge<I, E> create(I id, E value) {
-    return createMutable(id, value);
-  }
-
-  /**
-   * Create an edge pointing to a given ID without a value
-   *
-   * @param id target ID
-   * @param <I> Vertex ID type
-   * @return Edge pointing to ID without a value
-   */
-  public static <I extends WritableComparable>
-  Edge<I, NullWritable> create(I id) {
-    return createMutable(id);
-  }
-
-  /**
-   * Create a mutable edge pointing to a given ID with a value
-   *
-   * @param id target ID
-   * @param value edge value
-   * @param <I> Vertex ID type
-   * @param <E> Edge Value type
-   * @return Edge pointing to ID with value
-   */
-  public static <I extends WritableComparable,
-                 E extends Writable>
-  MutableEdge<I, E> createMutable(I id, E value) {
-    if (value instanceof NullWritable) {
-      return (MutableEdge<I, E>) createMutable(id);
-    } else {
-      return new DefaultEdge<I, E>(id, value);
-    }
-  }
-
-  /**
-   * Create a mutable edge pointing to a given ID with a value
-   *
-   * @param id target ID
-   * @param <I> Vertex ID type
-   * @return Edge pointing to ID with value
-   */
-  public static <I extends WritableComparable>
-  MutableEdge<I, NullWritable> createMutable(I id) {
-    return new EdgeNoValue<I>(id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
deleted file mode 100644
index 4ac6759..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.base.Objects;
-
-/**
- * An edge that has no value.
- *
- * @param <I> Vertex ID
- */
-public class EdgeNoValue<I extends WritableComparable>
-    implements MutableEdge<I, NullWritable> {
-  /** Target vertex id */
-  private I targetVertexId = null;
-
-  /** Empty constructor */
-  EdgeNoValue() { }
-
-  /**
-   * Constructor with target vertex ID. Don't call, use EdgeFactory instead.
-   *
-   * @param targetVertexId vertex ID
-   */
-  EdgeNoValue(I targetVertexId) {
-    this.targetVertexId = targetVertexId;
-  }
-
-  @Override
-  public void setTargetVertexId(I targetVertexId) {
-    this.targetVertexId = targetVertexId;
-  }
-
-  @Override
-  public void setValue(NullWritable value) {
-    // do nothing
-  }
-
-  @Override
-  public I getTargetVertexId() {
-    return targetVertexId;
-  }
-
-  @Override
-  public NullWritable getValue() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    EdgeNoValue edge = (EdgeNoValue) o;
-    return Objects.equal(targetVertexId, edge.targetVertexId);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(targetVertexId);
-  }
-
-  @Override
-  public int compareTo(Edge<I, NullWritable> o) {
-    return targetVertexId.compareTo(o.getTargetVertexId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
deleted file mode 100644
index 6210367..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.MapMaker;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Collects incoming edges for vertices owned by this worker.
- * Note: the current implementation is simply a bridge between
- * incoming requests and vertices. In the future, EdgeStore may become an
- * interface allowing for alternative, pluggable implementations of edge
- * storage without having to extend Vertex.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class EdgeStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(EdgeStore.class);
-  /** Service worker. */
-  private CentralizedServiceWorker<I, V, E, M> service;
-  /** Giraph configuration. */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-  /** Progressable to report progress. */
-  private Progressable progressable;
-  /** Map used to temporarily store incoming edges. */
-  private ConcurrentMap<Integer,
-      ConcurrentMap<I, ByteArrayEdges<I, E>>> transientEdges;
-  /**
-   * Whether we should reuse edge objects (cached to avoid expensive calls
-   * to the configuration).
-   */
-  private boolean reuseIncomingEdgeObjects;
-
-  /**
-   * Constructor.
-   *
-   * @param service Service worker
-   * @param configuration Configuration
-   * @param progressable Progressable
-   */
-  public EdgeStore(
-      CentralizedServiceWorker<I, V, E, M> service,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      Progressable progressable) {
-    this.service = service;
-    this.configuration = configuration;
-    this.progressable = progressable;
-    reuseIncomingEdgeObjects = configuration.reuseIncomingEdgeObjects();
-    transientEdges = new MapMaker().concurrencyLevel(
-        configuration.getNettyServerExecutionConcurrency()).makeMap();
-  }
-
-  /**
-   * Add edges belonging to a given partition on this worker.
-   * Note: This method is thread-safe.
-   *
-   * @param partitionId Partition id for the incoming edges.
-   * @param edges Incoming edges
-   */
-  public void addPartitionEdges(
-      int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
-    ConcurrentMap<I, ByteArrayEdges<I, E>> partitionEdges =
-        transientEdges.get(partitionId);
-    if (partitionEdges == null) {
-      ConcurrentMap<I, ByteArrayEdges<I, E>> newPartitionEdges =
-          new MapMaker().concurrencyLevel(
-              configuration.getNettyServerExecutionConcurrency()).makeMap();
-      partitionEdges = transientEdges.putIfAbsent(partitionId,
-          newPartitionEdges);
-      if (partitionEdges == null) {
-        partitionEdges = newPartitionEdges;
-      }
-    }
-    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
-        edges.getVertexIdEdgeIterator();
-    while (vertexIdEdgeIterator.hasNext()) {
-      vertexIdEdgeIterator.next();
-      I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
-      Edge<I, E> edge = vertexIdEdgeIterator.getCurrentEdge();
-      ByteArrayEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
-      if (vertexEdges == null) {
-        ByteArrayEdges<I, E> newVertexEdges =
-            new ByteArrayEdges<I, E>(configuration);
-        vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
-        if (vertexEdges == null) {
-          vertexEdges = newVertexEdges;
-          // Since we had to use the vertex id as a new key in the map,
-          // we need to release the object.
-          vertexIdEdgeIterator.releaseCurrentVertexId();
-        }
-      }
-      synchronized (vertexEdges) {
-        vertexEdges.appendEdge(edge);
-      }
-    }
-  }
-
-  /**
-   * Move all edges from temporary storage to their source vertices.
-   * Note: this method is not thread-safe.
-   */
-  public void moveEdgesToVertices() {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
-    }
-    for (Map.Entry<Integer, ConcurrentMap<I,
-        ByteArrayEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
-      Partition<I, V, E, M> partition =
-          service.getPartitionStore().getPartition(partitionEdges.getKey());
-      for (I vertexId : partitionEdges.getValue().keySet()) {
-        // Depending on whether the vertex implementation keeps references to
-        // the Edge objects or not, we may be able to reuse objects when
-        // iterating.
-        Iterable<Edge<I, E>> edgesIterable = reuseIncomingEdgeObjects ?
-            partitionEdges.getValue().remove(vertexId) :
-            partitionEdges.getValue().remove(vertexId).copyEdgeIterable();
-        Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
-        // If the source vertex doesn't exist, create it. Otherwise,
-        // just set the edges.
-        if (vertex == null) {
-          vertex = configuration.createVertex();
-          vertex.initialize(vertexId, configuration.createVertexValue(),
-              edgesIterable);
-          partition.putVertex(vertex);
-        } else {
-          vertex.setEdges(edgesIterable);
-          // Some Partition implementations (e.g. ByteArrayPartition) require
-          // us to put back the vertex after modifying it.
-          partition.saveVertex(vertex);
-        }
-        progressable.progress();
-      }
-      // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
-      // require us to put back the partition after modifying it.
-      service.getPartitionStore().putPartition(partition);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
-          "vertices.");
-    }
-    transientEdges.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
index 726c21e..3c2286d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.giraph.graph;
 
-import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 20fa5c5..e74c59a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,8 +18,9 @@
 
 package org.apache.giraph.graph;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.BspUtils;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -41,7 +42,6 @@ import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
@@ -57,9 +57,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
@@ -433,8 +430,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    * @param conf the Configuration object for this job run.
    */
   public void determineClassTypes(Configuration conf) {
+    ImmutableClassesGiraphConfiguration giraphConf =
+        new ImmutableClassesGiraphConfiguration(conf);
     Class<? extends Vertex<I, V, E, M>> vertexClass =
-        BspUtils.<I, V, E, M>getVertexClass(conf);
+        giraphConf.getVertexClass();
     List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
         Vertex.class, vertexClass);
     Type vertexIndexType = classList.get(0);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
deleted file mode 100644
index 52e4c47..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A complete edge, the target vertex and the edge value.  Can only be one
- * edge with a destination vertex id per edge map. This edge can be mutated,
- * that is you can set it's target vertex ID and edge value.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-public interface MutableEdge<I extends WritableComparable, E extends Writable>
-    extends Edge<I, E> {
-  /**
-   * Set the destination vertex index of this edge.
-   *
-   * @param targetVertexId new destination vertex
-   */
-  void setTargetVertexId(I targetVertexId);
-
-  /**
-   * Set the value for this edge.
-   *
-   * @param value new edge value
-   */
-  void setValue(E value);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
deleted file mode 100644
index 4415cc2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.io.EdgeReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * An EdgeReader that creates the opposite direction edge for each edge read.
- * Used to create an undirected graph from a directed input.
- * This class is a decorator around any other EdgeReader.
- *
- * @param <I> Vertex ID
- * @param <E> Edge Value
- */
-public class ReverseEdgeDuplicator<I extends WritableComparable,
-    E extends Writable> implements EdgeReader<I, E> {
-  /** The underlying EdgeReader to wrap */
-  private final EdgeReader<I, E> baseReader;
-
-  /** Whether the reverse edge stored currently is valid */
-  private boolean haveReverseEdge = true;
-  /** Reverse of the edge last read */
-  private Edge<I, E> reverseEdge;
-  /** Reverse source of last edge, in other words last edge's target */
-  private I reverseSourceId;
-
-  /**
-   * Constructor
-   * @param baseReader EdgeReader to wrap
-   */
-  public ReverseEdgeDuplicator(EdgeReader<I, E> baseReader) {
-    this.baseReader = baseReader;
-  }
-
-  /**
-   * Get wrapped EdgeReader
-   * @return EdgeReader
-   */
-  public EdgeReader<I, E> getBaseReader() {
-    return baseReader;
-  }
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    baseReader.initialize(inputSplit, context);
-    haveReverseEdge = true;
-  }
-
-  @Override
-  public boolean nextEdge() throws IOException, InterruptedException {
-    boolean result = true;
-    if (haveReverseEdge) {
-      result = baseReader.nextEdge();
-      haveReverseEdge = false;
-    } else {
-      Edge<I, E> currentEdge = baseReader.getCurrentEdge();
-      reverseSourceId = currentEdge.getTargetVertexId();
-      reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(),
-          currentEdge.getValue());
-      haveReverseEdge = true;
-    }
-    return result;
-  }
-
-  @Override
-  public I getCurrentSourceId() throws IOException, InterruptedException {
-    if (haveReverseEdge) {
-      return reverseSourceId;
-    } else {
-      return baseReader.getCurrentSourceId();
-    }
-  }
-
-  @Override
-  public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
-    if (haveReverseEdge) {
-      return reverseEdge;
-    } else {
-      return baseReader.getCurrentEdge();
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    baseReader.close();
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return baseReader.getProgress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
new file mode 100644
index 0000000..c8abab2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MultiRandomAccessVertexEdges;
+import org.apache.giraph.edge.StrictRandomAccessVertexEdges;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.partition.PartitionContext;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Basic abstract class for writing a BSP application for computation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class Vertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerAggregatorUsage, Writable,
+    ImmutableClassesGiraphConfigurable<I, V, E, M> {
+  /** Vertex id. */
+  private I id;
+  /** Vertex value. */
+  private V value;
+  /** Outgoing edges. */
+  private VertexEdges<I, E> edges;
+  /** If true, do not do anymore computation on this vertex. */
+  private boolean halt;
+  /** Global graph state **/
+  private GraphState<I, V, E, M> graphState;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+  /**
+   * Initialize id, value, and edges.
+   * This method (or the alternative form initialize(id, value)) must be called
+   * after instantiation, unless readFields() is called.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   * @param edges Iterable of edges
+   */
+  public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
+    this.id = id;
+    this.value = value;
+    setEdges(edges);
+  }
+
+  /**
+   * Initialize id and value. Vertex edges will be empty.
+   * This method (or the alternative form initialize(id, value, edges))
+   * must be called after instantiation, unless readFields() is called.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  public void initialize(I id, V value) {
+    this.id = id;
+    this.value = value;
+    this.edges = conf.createAndInitializeVertexEdges(0);
+  }
+
+  /**
+   * Set the outgoing edges for this vertex.
+   *
+   * @param edges Iterable of edges
+   */
+  public void setEdges(Iterable<Edge<I, E>> edges) {
+    // If the iterable is actually an instance of VertexEdges,
+    // we simply take the reference.
+    // Otherwise, we initialize a new VertexEdges.
+    if (edges instanceof VertexEdges) {
+      this.edges = (VertexEdges<I, E>) edges;
+    } else {
+      this.edges = conf.createAndInitializeVertexEdges(edges);
+    }
+  }
+
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param messages Messages that were sent to this vertex in the previous
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
+   * @throws IOException
+   */
+  public abstract void compute(Iterable<M> messages) throws IOException;
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return graphState.getSuperstep();
+  }
+
+  /**
+   * Get the vertex id.
+   *
+   * @return My vertex id.
+   */
+  public I getId() {
+    return id;
+  }
+
+  /**
+   * Get the vertex value (data stored with vertex)
+   *
+   * @return Vertex value
+   */
+  public V getValue() {
+    return value;
+  }
+
+  /**
+   * Set the vertex data (immediately visible in the computation)
+   *
+   * @param value Vertex data to be set
+   */
+  public void setValue(V value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getTotalNumVertices() {
+    return graphState.getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getTotalNumEdges() {
+    return graphState.getTotalNumEdges();
+  }
+
+  /**
+   * Get a read-only view of the out-edges of this vertex.
+   * Note: edge objects returned by this iterable may be invalidated as soon
+   * as the next element is requested. Thus, keeping a reference to an edge
+   * almost always leads to undesired behavior.
+   *
+   * @return the out edges (sort order determined by subclass implementation).
+   */
+  public Iterable<Edge<I, E>> getEdges() {
+    return edges;
+  }
+
+  /**
+   * Get the number of outgoing edges on this vertex.
+   *
+   * @return the total number of outbound edges from this vertex
+   */
+  public int getNumEdges() {
+    return edges.size();
+  }
+
+  /**
+   * Return the value of the first edge with the given target vertex id,
+   * or null if there is no such edge.
+   * Note: edge value objects returned by this method may be invalidated by
+   * the next call. Thus, keeping a reference to an edge value almost always
+   * leads to undesired behavior.
+   *
+   * @param targetVertexId Target vertex id
+   * @return Edge value (or null if missing)
+   */
+  public E getEdgeValue(I targetVertexId) {
+    if (edges instanceof StrictRandomAccessVertexEdges) {
+      return ((StrictRandomAccessVertexEdges<I, E>) edges)
+          .getEdgeValue(targetVertexId);
+    } else {
+      for (Edge<I, E> edge : edges) {
+        if (edge.getTargetVertexId().equals(targetVertexId)) {
+          return edge.getValue();
+        }
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Get an iterable over the values of all edges with the given target
+   * vertex id. This only makes sense for multigraphs (i.e. graphs with
+   * parallel edges).
+   * Note: edge value objects returned by this method may be invalidated as
+   * soon as the next element is requested. Thus, keeping a reference to an
+   * edge value almost always leads to undesired behavior.
+   *
+   * @param targetVertexId Target vertex id
+   * @return Iterable of edge values
+   */
+  public Iterable<E> getAllEdgeValues(final I targetVertexId) {
+    if (edges instanceof MultiRandomAccessVertexEdges) {
+      return ((MultiRandomAccessVertexEdges<I, E>) edges)
+          .getAllEdgeValues(targetVertexId);
+    } else {
+      return new Iterable<E>() {
+        @Override
+        public Iterator<E> iterator() {
+          return new UnmodifiableIterator<E>() {
+            /** Iterator over all edges. */
+            private Iterator<Edge<I, E>> edgeIterator = edges.iterator();
+            /** Last matching edge found. */
+            private Edge<I, E> currentEdge;
+
+            @Override
+            public boolean hasNext() {
+              while (edgeIterator.hasNext()) {
+                currentEdge = edgeIterator.next();
+                if (currentEdge.getTargetVertexId().equals(targetVertexId)) {
+                  return true;
+                }
+              }
+              return false;
+            }
+
+            @Override
+            public E next() {
+              return currentEdge.getValue();
+            }
+          };
+        }
+      };
+    }
+  }
+
+  /**
+   * Send a message to a vertex id.  The message should not be mutated after
+   * this method returns or else undefined results could occur.
+   *
+   * @param id Vertex id to send the message to
+   * @param message Message data to send.  Note that after the message is sent,
+   *        the user should not modify the object.
+   */
+  public void sendMessage(I id, M message) {
+    if (graphState.getWorkerClientRequestProcessor().
+          sendMessageRequest(id, message)) {
+      graphState.getGraphTaskManager().notifySentMessages();
+    }
+  }
+
+  /**
+   * Send a message to all edges.
+   *
+   * @param message Message sent to all edges.
+   */
+  public void sendMessageToAllEdges(M message) {
+    for (Edge<I, E> edge : getEdges()) {
+      sendMessage(edge.getTargetVertexId(), message);
+    }
+  }
+
+  /**
+   * After this is called, the compute() code will no longer be called for
+   * this vertex unless a message is sent to it.  Then the compute() code
+   * will be called once again until this function is called.  The
+   * application finishes only when all vertices vote to halt.
+   */
+  public void voteToHalt() {
+    halt = true;
+  }
+
+  /**
+   * Re-activate vertex if halted.
+   */
+  public void wakeUp() {
+    halt = false;
+  }
+
+  /**
+   * Is this vertex done?
+   *
+   * @return True if halted, false otherwise.
+   */
+  public boolean isHalted() {
+    return halt;
+  }
+
+  /**
+   * Add an edge for this vertex (happens immediately)
+   *
+   * @param edge Edge to add
+   */
+  public void addEdge(Edge<I, E> edge) {
+    edges.add(edge);
+  }
+
+  /**
+   * Removes all edges pointing to the given vertex id.
+   *
+   * @param targetVertexId the target vertex id
+   */
+  public void removeEdges(I targetVertexId) {
+    edges.remove(targetVertexId);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   * @param edges Initial edges
+   */
+  public void addVertexRequest(I id, V value, VertexEdges<I, E> edges)
+    throws IOException {
+    Vertex<I, V, E, M> vertex = conf.createVertex();
+    vertex.initialize(id, value, edges);
+    graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  public void addVertexRequest(I id, V value) throws IOException {
+    addVertexRequest(id, value, conf.createVertexEdges());
+  }
+
+  /**
+   * Request to remove a vertex from the graph
+   * (applied just prior to the next superstep).
+   *
+   * @param vertexId Id of the vertex to be removed.
+   */
+  public void removeVertexRequest(I vertexId) throws IOException {
+    graphState.getWorkerClientRequestProcessor().
+        removeVertexRequest(vertexId);
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param edge Edge to add
+   */
+  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+    throws IOException {
+    graphState.getWorkerClientRequestProcessor().
+        addEdgeRequest(sourceVertexId, edge);
+  }
+
+  /**
+   * Request to remove all edges from a given source vertex to a given target
+   * vertex (processed just prior to the next superstep).
+   *
+   * @param sourceVertexId Source vertex id
+   * @param targetVertexId Target vertex id
+   */
+  public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
+    throws IOException {
+    graphState.getWorkerClientRequestProcessor().
+        removeEdgesRequest(sourceVertexId, targetVertexId);
+  }
+
+  /**
+   * Set the graph state for all workers
+   *
+   * @param graphState Graph state for all workers
+   */
+  public void setGraphState(GraphState<I, V, E, M> graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return graphState.getContext();
+  }
+
+  /**
+   * Get the partition context
+   *
+   * @return Partition context
+   */
+  public PartitionContext getPartitionContext() {
+    return graphState.getPartitionContext();
+  }
+
+  /**
+   * Get the worker context
+   *
+   * @return WorkerContext context
+   */
+  public WorkerContext getWorkerContext() {
+    return graphState.getGraphTaskManager().getWorkerContext();
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    graphState.getWorkerAggregatorUsage().aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return graphState.getWorkerAggregatorUsage().getAggregatedValue(name);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = conf.createVertexId();
+    id.readFields(in);
+    value = conf.createVertexValue();
+    value.readFields(in);
+    edges = conf.createVertexEdges();
+    edges.readFields(in);
+    halt = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    value.write(out);
+    edges.write(out);
+    out.writeBoolean(halt);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String toString() {
+    return "Vertex(id=" + getId() + ",value=" + getValue() +
+        ",#edges=" + getNumEdges() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index ef61dbb..9474636 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index fa33341..ea50f25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -20,8 +20,8 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.json.JSONException;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 4a36706..1fc0ddc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index ed6fad1..f6dccdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
new file mode 100644
index 0000000..e85931f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * An EdgeReader that creates the opposite direction edge for each edge read.
+ * Used to create an undirected graph from a directed input.
+ * This class is a decorator around any other EdgeReader.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class ReverseEdgeDuplicator<I extends WritableComparable,
+    E extends Writable> implements EdgeReader<I, E> {
+  /** The underlying EdgeReader to wrap */
+  private final EdgeReader<I, E> baseReader;
+
+  /** Whether the reverse edge stored currently is valid */
+  private boolean haveReverseEdge = true;
+  /** Reverse of the edge last read */
+  private Edge<I, E> reverseEdge;
+  /** Reverse source of last edge, in other words last edge's target */
+  private I reverseSourceId;
+
+  /**
+   * Constructor
+   * @param baseReader EdgeReader to wrap
+   */
+  public ReverseEdgeDuplicator(EdgeReader<I, E> baseReader) {
+    this.baseReader = baseReader;
+  }
+
+  /**
+   * Get wrapped EdgeReader
+   * @return EdgeReader
+   */
+  public EdgeReader<I, E> getBaseReader() {
+    return baseReader;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    baseReader.initialize(inputSplit, context);
+    haveReverseEdge = true;
+  }
+
+  @Override
+  public boolean nextEdge() throws IOException, InterruptedException {
+    boolean result = true;
+    if (haveReverseEdge) {
+      result = baseReader.nextEdge();
+      haveReverseEdge = false;
+    } else {
+      Edge<I, E> currentEdge = baseReader.getCurrentEdge();
+      reverseSourceId = currentEdge.getTargetVertexId();
+      reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(),
+          currentEdge.getValue());
+      haveReverseEdge = true;
+    }
+    return result;
+  }
+
+  @Override
+  public I getCurrentSourceId() throws IOException, InterruptedException {
+    if (haveReverseEdge) {
+      return reverseSourceId;
+    } else {
+      return baseReader.getCurrentSourceId();
+    }
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+    if (haveReverseEdge) {
+      return reverseEdge;
+    } else {
+      return baseReader.getCurrentEdge();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    baseReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return baseReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 1b1c896..3487cee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
index 923ca5c..3ccb0fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index 82a19bb..38c5548 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -20,7 +20,7 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
index 5092352..8fe0db6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
@@ -17,7 +17,8 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -25,8 +26,6 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
index 934663e..5815403 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
@@ -17,8 +17,8 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index 352f054..fe4a1d5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.io.formats;
 
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
index b00e495..28539f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
@@ -18,16 +18,15 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.regex.Pattern;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
index dda3f2f..4950d21 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
@@ -17,15 +17,14 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
+import com.google.common.collect.ImmutableList;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.IOException;
 
 /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
index 1e3b643..0270348 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.ReverseEdgeDuplicator;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.ReverseEdgeDuplicator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
index 21ca427..6eaf7dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
@@ -18,8 +18,10 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -29,9 +31,6 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
index 0599742..7d8fcf6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
@@ -18,8 +18,9 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import net.iharder.Base64;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -28,8 +29,6 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import net.iharder.Base64;
-
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
index 2df20f1..2ac2dad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -17,9 +17,10 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.giraph.vertex.Vertex;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -29,8 +30,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
 
-import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
index 9a751ae..d0a3305 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
@@ -18,8 +18,8 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
index 4e35201..09fb991 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -17,8 +17,8 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 2024863..cd454e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -18,12 +18,13 @@
 
 package org.apache.giraph.io.formats;
 
+import com.google.common.collect.Sets;
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,8 +32,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -91,7 +90,7 @@ public class PseudoRandomEdgeInputFormat
     /** Aggregate vertices (all input splits). */
     private long aggregateVertices = -1;
     /** Edges per vertex. */
-    private long edgesPerVertex = -1;
+    private int edgesPerVertex = -1;
     /** BspInputSplit (used only for index). */
     private BspInputSplit bspInputSplit;
     /** Saved configuration */
@@ -129,7 +128,7 @@ public class PseudoRandomEdgeInputFormat
             "initialize: Got " + inputSplit.getClass() +
                 " instead of " + BspInputSplit.class);
       }
-      edgesPerVertex = configuration.getLong(
+      edgesPerVertex = configuration.getInt(
           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
       if (edgesPerVertex <= 0) {
         throw new IllegalArgumentException(
@@ -184,8 +183,8 @@ public class PseudoRandomEdgeInputFormat
             "" + destVertexId + ")");
       }
       return EdgeFactory.create(
-              destVertexId,
-              new DoubleWritable(random.nextDouble()));
+          destVertexId,
+          new DoubleWritable(random.nextDouble()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 4da8f9d..40a20e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -18,15 +18,14 @@
 
 package org.apache.giraph.io.formats;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -88,7 +87,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
     /** Aggregate vertices (all input splits). */
     private long aggregateVertices = -1;
     /** Edges per vertex. */
-    private long edgesPerVertex = -1;
+    private int edgesPerVertex = -1;
     /** BspInputSplit (used only for index). */
     private BspInputSplit bspInputSplit;
     /** Saved configuration */
@@ -132,7 +131,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
             "initialize: Got " + inputSplit.getClass() +
             " instead of " + BspInputSplit.class);
       }
-      edgesPerVertex = configuration.getLong(
+      edgesPerVertex = configuration.getInt(
           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
       if (edgesPerVertex <= 0) {
         throw new IllegalArgumentException(
@@ -161,8 +160,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
       // same.
       Random rand = new Random(vertexId);
       DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
-      List<Edge<LongWritable, DoubleWritable>> edges =
-          Lists.newArrayListWithCapacity((int) edgesPerVertex);
+      // In order to save memory and avoid copying, we add directly to a
+      // VertexEdges instance.
+      VertexEdges<LongWritable, DoubleWritable> edges =
+          configuration.createAndInitializeVertexEdges(edgesPerVertex);
       Set<LongWritable> destVertices = Sets.newHashSet();
       for (long i = 0; i < edgesPerVertex; ++i) {
         LongWritable destVertexId = new LongWritable();

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 6a5813b..1071196 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -17,7 +17,7 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
index 0538db9..468e6bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -19,7 +19,7 @@ package org.apache.giraph.io.formats;
 
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index 36d00db..f7da40f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -17,8 +17,8 @@
  */
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index c9f5df1..0aae894 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -19,10 +19,10 @@
 package org.apache.giraph.io.formats;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index e359f66..898e57f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -19,10 +19,10 @@
 package org.apache.giraph.io.formats;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index 9f1fe1f..ad96cfe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.io.formats;
 
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.hadoop.io.Text;