You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[6/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/utils/Times.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Times.java b/giraph-core/src/main/java/org/apache/giraph/utils/Times.java
deleted file mode 100644
index 38b7d60..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/Times.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility methods for Time classes.
- */
-public class Times {
-  /** Do not instantiate */
-  private Times() { }
-
-  /**
-   * Convenience method to measure time in a given TimeUnit.
-   *
-   * @param time Time instance to use
-   * @param timeUnit TimeUnit to measure in
-   * @return long measured time in TimeUnit dimension
-   */
-  public static long get(Time time, TimeUnit timeUnit) {
-    return timeUnit.convert(time.getNanoseconds(), TimeUnit.NANOSECONDS);
-  }
-
-  /**
-   * Convenience method to get time since the beginning of an event in a given
-   * TimeUnit.
-   *
-   * @param time Time object used for measuring.
-   * @param timeUnit TimeUnit to use for dimension.
-   * @param startTime beginning time to diff against
-   * @return time elapsed since startTime in TimeUnit dimension.
-   */
-  public static long getDiff(Time time, TimeUnit timeUnit, long startTime) {
-    return get(time, timeUnit) - startTime;
-  }
-
-  /**
-   * Convenience method to get milliseconds since a previous milliseconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousMilliseconds Previous milliseconds
-   * @return Milliseconds elapsed since the previous milliseconds
-   */
-  public static long getMillisecondsSince(Time time,
-                                          long previousMilliseconds) {
-    return time.getMilliseconds() - previousMilliseconds;
-  }
-
-  /**
-   * Convenience method to get milliseconds since a previous milliseconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousMs Previous milliseconds
-   * @return Milliseconds elapsed since the previous milliseconds
-   */
-  public static long getMsSince(Time time, long previousMs) {
-    return getMillisecondsSince(time, previousMs);
-  }
-
-  /**
-   * Convenience method to get microseconds since a previous microseconds point.
-   *
-   * @param time Time instance to use
-   * @param previousMicros Previous microseconds
-   * @return Microseconds elapsed since the previous microseconds
-   */
-  public static long getMicrosSince(Time time, long previousMicros) {
-    return time.getMicroseconds() - previousMicros;
-  }
-
-  /**
-   * Convenience method to get nanoseconds since a previous nanoseconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousNanoseconds Previous nanoseconds
-   * @return Nanoseconds elapsed since the previous nanoseconds
-   */
-  public static long getNanosecondsSince(Time time, long previousNanoseconds) {
-    return time.getNanoseconds() - previousNanoseconds;
-  }
-
-  /**
-   * Convenience method to get nanoseconds since a previous nanoseconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousNanos Previous nanoseconds
-   * @return Nanoseconds elapsed since the previous nanoseconds
-   */
-  public static long getNanosSince(Time time, long previousNanos) {
-    return getNanosecondsSince(time, previousNanos);
-  }
-
-  /**
-   * Convenience method to get seconds since a previous seconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousSeconds Previous seconds
-   * @return Seconds elapsed since the previous seconds
-   */
-  public static int getSecondsSince(Time time, int previousSeconds) {
-    return time.getSeconds() - previousSeconds;
-  }
-
-  /**
-   * Convenience method to get seconds since a previous seconds
-   * point.
-   *
-   * @param time Time instance to use
-   * @param previousSec Previous seconds
-   * @return Seconds elapsed since the previous seconds
-   */
-  public static int getSecSince(Time time, int previousSec) {
-    return getSecondsSince(time, previousSec);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
new file mode 100644
index 0000000..9ae692f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+
+/**
+ * User applications can subclass {@link EdgeListVertex}, which stores
+ * the outbound edges in an ArrayList (less memory as the cost of expensive
+ * random-access lookup).  Good for static graphs.  Not nearly as memory
+ * efficient as using RepresentativeVertex + ByteArrayPartition
+ * (probably about 10x more), but not bad when keeping vertices as objects in
+ * memory (SimplePartition).
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeListVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends EdgeListVertexBase<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    for (Edge<I, E> currentEdge : getEdges()) {
+      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
+        LOG.warn("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            edge.getTargetVertexId());
+        return false;
+      }
+    }
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public int removeEdges(I targetVertexId) {
+    for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
+      Edge<I, E> edge = edges.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        edges.remove();
+        return 1;
+      }
+    }
+    return 0;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
new file mode 100644
index 0000000..3f785ff
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Common base class for edge-list backed vertices.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeListVertexBase<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(EdgeListVertexBase.class);
+  /** List of edges */
+  private List<Edge<I, E>> edgeList = Lists.newArrayList();
+
+  /**
+   * Append an edge to the list.
+   *
+   * @param edge Edge to append
+   */
+  protected void appendEdge(Edge<I, E> edge) {
+    edgeList.add(edge);
+  }
+
+  @Override
+  public void setEdges(Iterable<Edge<I, E>> edges) {
+    edgeList.clear();
+    Iterables.addAll(edgeList, edges);
+  }
+
+  @Override
+  public Iterable<Edge<I, E>> getEdges() {
+    return edgeList;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return edgeList.size();
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    I vertexId = getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = getConf().createVertexValue();
+    vertexValue.readFields(in);
+    initialize(vertexId, vertexValue);
+
+    int numEdges = in.readInt();
+    edgeList = Lists.newArrayListWithCapacity(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(edgeList.size());
+    for (Edge<I, E> edge : edgeList) {
+      edge.getTargetVertexId().write(out);
+      edge.getValue().write(out);
+    }
+
+    out.writeBoolean(isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
new file mode 100644
index 0000000..a1b3adf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * User applications can subclass {@link HashMapVertex}, which stores
+ * the outbound edges in a HashMap, for efficient edge random-access.  Note
+ * that {@link EdgeListVertex} is much more memory efficient for static graphs.
+ * User applications which need to implement their own
+ * in-memory data structures should subclass {@link MutableVertex}.
+ *
+ * Package access will prevent users from accessing internal methods.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HashMapVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
+  /** Map of target vertices and their edge values */
+  protected Map<I, E> edgeMap = new HashMap<I, E>();
+
+  @Override
+  public void setEdges(Iterable<Edge<I, E>> edges) {
+    edgeMap.clear();
+    for (Edge<I, E> edge : edges) {
+      edgeMap.put(edge.getTargetVertexId(), edge.getValue());
+    }
+  }
+
+  @Override
+  public boolean addEdge(Edge<I, E> edge) {
+    if (edgeMap.put(edge.getTargetVertexId(), edge.getValue()) != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            edge.getTargetVertexId());
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public boolean hasEdge(I targetVertexId) {
+    return edgeMap.containsKey(targetVertexId);
+  }
+
+  /**
+   * Get an iterator to the edges on this vertex.
+   *
+   * @return A <em>sorted</em> iterator, as defined by the sort-order
+   *         of the vertex ids
+   */
+  @Override
+  public Iterable<Edge<I, E>> getEdges() {
+    return Iterables.transform(edgeMap.entrySet(),
+        new Function<Map.Entry<I, E>, Edge<I, E>>() {
+
+          @Override
+          public Edge<I, E> apply(Map.Entry<I, E> edge) {
+            return new Edge<I, E>(edge.getKey(), edge.getValue());
+          }
+        });
+  }
+
+  @Override
+  public E getEdgeValue(I targetVertexId) {
+    return edgeMap.get(targetVertexId);
+  }
+
+  @Override
+  public int getNumEdges() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public int removeEdges(I targetVertexId) {
+    return edgeMap.remove(targetVertexId) != null ? 1 : 0;
+  }
+
+  @Override
+  public final void sendMessageToAllEdges(M message) {
+    for (I targetVertexId : edgeMap.keySet()) {
+      sendMessage(targetVertexId, message);
+    }
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    I vertexId = getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = getConf().createVertexValue();
+    vertexValue.readFields(in);
+    initialize(vertexId, vertexValue);
+
+    int numEdges = in.readInt();
+    edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edgeMap.put(targetVertexId, edgeValue);
+    }
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(edgeMap.size());
+    for (Map.Entry<I, E> edge : edgeMap.entrySet()) {
+      edge.getKey().write(out);
+      edge.getValue().write(out);
+    }
+
+    out.writeBoolean(isHalted());
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
new file mode 100644
index 0000000..a2090e8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Iterables;
+import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link Vertex} using an int as id, value and
+ * message.  Edges are immutable and unweighted. This class aims to be as
+ * memory efficient as possible.
+ */
+public abstract class IntIntNullIntVertex extends
+    SimpleVertex<IntWritable, IntWritable, IntWritable> {
+  /** Int array of neighbor vertex ids */
+  private int[] neighbors;
+
+  @Override
+  public void setNeighbors(Iterable<IntWritable> neighbors) {
+    this.neighbors =
+        new int[(neighbors != null) ? Iterables.size(neighbors) : 0];
+    int n = 0;
+    if (neighbors != null) {
+      for (IntWritable neighbor : neighbors) {
+        this.neighbors[n++] = neighbor.get();
+      }
+    }
+  }
+
+  @Override
+  public Iterable<IntWritable> getNeighbors() {
+    return new Iterable<IntWritable>() {
+      @Override
+      public Iterator<IntWritable> iterator() {
+        return new UnmodifiableIntArrayIterator(neighbors);
+      }
+    };
+  }
+
+  @Override
+  public boolean hasEdge(IntWritable targetVertexId) {
+    for (int neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeInt(getId().get());
+    out.writeInt(getValue().get());
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeInt(neighbors[n]);
+    }
+    out.writeBoolean(isHalted());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int id = in.readInt();
+    int value = in.readInt();
+    initialize(new IntWritable(id), new IntWritable(value));
+    int numEdges = in.readInt();
+    neighbors = new int[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readInt();
+    }
+    readHaltBoolean(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
new file mode 100644
index 0000000..ff4f5d9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A vertex with no value, edges, or messages. Just an ID, nothing more.
+ */
+public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
+    NullWritable, NullWritable, NullWritable> {
+  @Override
+  public void setEdges(Iterable<Edge<IntWritable, NullWritable>> edges) { }
+
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    out.writeBoolean(isHalted());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int id = in.readInt();
+    initialize(new IntWritable(id), NullWritable.get());
+    boolean halt = in.readBoolean();
+    if (halt) {
+      voteToHalt();
+    } else {
+      wakeUp();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleEdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleEdgeListVertex.java
new file mode 100644
index 0000000..aabc91a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleEdgeListVertex.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Compact vertex representation with primitive arrays.
+ */
+public abstract class LongDoubleFloatDoubleEdgeListVertex
+    extends Vertex<LongWritable, DoubleWritable,
+                   FloatWritable, DoubleWritable> {
+  /** long represented vertex id */
+  private long id;
+  /** double represented vertex value */
+  private double value;
+  /** long array of neighbor vertex IDs */
+  private long[] neighbors;
+  /** float array of edge weights */
+  private float[] edgeWeights;
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue) {
+    id = vertexId.get();
+    value = vertexValue.get();
+  }
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+                         Iterable<Edge<LongWritable, FloatWritable>> edges) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    setEdges(edges);
+  }
+
+  @Override
+  public void setEdges(Iterable<Edge<LongWritable, FloatWritable>> edges) {
+    neighbors = new long[(edges != null) ? Iterables.size(edges) : 0];
+    edgeWeights = new float[(edges != null) ? Iterables.size(edges) : 0];
+    int n = 0;
+    if (edges != null) {
+      for (Edge<LongWritable, FloatWritable> edge : edges) {
+        neighbors[n] = edge.getTargetVertexId().get();
+        edgeWeights[n] = edge.getValue().get();
+        n++;
+      }
+    }
+  }
+
+  @Override
+  public LongWritable getId() {
+    return new LongWritable(id);
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(value);
+  }
+
+  @Override
+  public void setValue(DoubleWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
+    return new UnmodifiableLongFloatEdgeArrayIterable(neighbors, edgeWeights);
+  }
+
+  @Override
+  public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+    int idx = 0;
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return new FloatWritable(edgeWeights[idx]);
+      }
+      idx++;
+    }
+    return null;
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMessageToAllEdges(final DoubleWritable message) {
+    for (long neighbor : neighbors) {
+      sendMessage(new LongWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(id);
+    out.writeDouble(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeLong(neighbors[n]);
+    }
+    for (int n = 0; n < edgeWeights.length; n++) {
+      out.writeFloat(edgeWeights[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readLong();
+    value = in.readDouble();
+    int numEdges = in.readInt();
+    neighbors = new long[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readLong();
+    }
+    edgeWeights = new float[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      edgeWeights[n] = in.readFloat();
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (id ^ (id >>> 32));
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof LongDoubleFloatDoubleEdgeListVertex)) {
+      return false;
+    }
+    LongDoubleFloatDoubleEdgeListVertex other =
+        (LongDoubleFloatDoubleEdgeListVertex) obj;
+    if (id != other.id) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleVertex.java
new file mode 100644
index 0000000..feb67a6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleFloatDoubleVertex.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.mahout.math.function.LongFloatProcedure;
+import org.apache.mahout.math.list.DoubleArrayList;
+import org.apache.mahout.math.map.OpenLongFloatHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Optimized vertex implementation for
+ * <LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ */
+public abstract class LongDoubleFloatDoubleVertex
+    extends MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+        DoubleWritable> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+  /** Stores the edges */
+  private OpenLongFloatHashMap edgeMap =
+      new OpenLongFloatHashMap();
+
+  @Override
+  public void setEdges(Iterable<Edge<LongWritable, FloatWritable>> edges) {
+    if (edges != null) {
+      for (Edge<LongWritable, FloatWritable> edge : edges) {
+        edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
+      }
+    }
+  }
+
+  @Override
+  public boolean addEdge(Edge<LongWritable, FloatWritable> edge) {
+    if (edgeMap.put(edge.getTargetVertexId().get(),
+        edge.getValue().get())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addEdge: Vertex=" + getId() +
+            ": already added an edge value for dest vertex id " +
+            edge.getTargetVertexId());
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public int removeEdges(LongWritable targetVertexId) {
+    long target = targetVertexId.get();
+    if (edgeMap.containsKey(target)) {
+      edgeMap.removeKey(target);
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
+    final long[] targetVertices = edgeMap.keys().elements();
+    final int numEdges = edgeMap.size();
+
+    return new Iterable<Edge<LongWritable, FloatWritable>>() {
+      @Override
+      public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
+        return new Iterator<Edge<LongWritable, FloatWritable>>() {
+          private int offset = 0;
+
+          @Override
+          public boolean hasNext() {
+            return offset < numEdges;
+          }
+
+          @Override
+          public Edge<LongWritable, FloatWritable> next() {
+            long targetVertex = targetVertices[offset++];
+            return new Edge<LongWritable, FloatWritable>(
+                new LongWritable(targetVertex),
+                new FloatWritable(targetVertex));
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException(
+                "Mutation disallowed for edge list via iterator");
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    return edgeMap.containsKey(targetVertexId.get());
+  }
+
+  @Override
+  public int getNumEdges() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    long id = in.readLong();
+    double value = in.readDouble();
+    initialize(new LongWritable(id), new DoubleWritable(value));
+    edgeMap.clear();
+    long edgeMapSize = in.readLong();
+    for (long i = 0; i < edgeMapSize; ++i) {
+      long targetVertexId = in.readLong();
+      float edgeValue = in.readFloat();
+      edgeMap.put(targetVertexId, edgeValue);
+    }
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(final DataOutput out) throws IOException {
+    out.writeLong(getId().get());
+    out.writeDouble(getValue().get());
+    out.writeLong(edgeMap.size());
+    edgeMap.forEachPair(new LongFloatProcedure() {
+      @Override
+      public boolean apply(long destVertexId, float edgeValue) {
+        try {
+          out.writeLong(destVertexId);
+          out.writeFloat(edgeValue);
+        } catch (IOException e) {
+          throw new IllegalStateException(
+              "apply: IOException when not allowed", e);
+        }
+        return true;
+      }
+    });
+    out.writeBoolean(isHalted());
+  }
+
+  /**
+   * Helper iterable over the messages.
+   */
+  private static class UnmodifiableDoubleWritableIterable
+    implements Iterable<DoubleWritable> {
+    /** Backing store of messages */
+    private final DoubleArrayList elementList;
+
+    /**
+     * Constructor.
+     *
+     * @param elementList Backing store of element list.
+     */
+    public UnmodifiableDoubleWritableIterable(
+        DoubleArrayList elementList) {
+      this.elementList = elementList;
+    }
+
+    @Override
+    public Iterator<DoubleWritable> iterator() {
+      return new UnmodifiableDoubleWritableIterator(
+          elementList);
+    }
+  }
+
+  /**
+   * Iterator over the messages.
+   */
+  private static class UnmodifiableDoubleWritableIterator
+      extends UnmodifiableIterator<DoubleWritable> {
+    /** Double backing list */
+    private final DoubleArrayList elementList;
+    /** Offset into the backing list */
+    private int offset = 0;
+
+    /**
+     * Constructor.
+     *
+     * @param elementList Backing store of element list.
+     */
+    UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
+      this.elementList = elementList;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return offset < elementList.size();
+    }
+
+    @Override
+    public DoubleWritable next() {
+      return new DoubleWritable(elementList.get(offset++));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleNullDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleNullDoubleVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleNullDoubleVertex.java
new file mode 100644
index 0000000..b86e29f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/LongDoubleNullDoubleVertex.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Compact vertex representation with primitive arrays and null edges.
+ */
+public abstract class LongDoubleNullDoubleVertex
+    extends Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
+  /** long represented vertex id */
+  private long id;
+  /** double represented vertex value */
+  private double value;
+  /** long array of neighbor vertex ids */
+  private long[] neighbors;
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    setEdges(Collections.<Edge<LongWritable, NullWritable>>emptyList());
+  }
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+                         Iterable<Edge<LongWritable, NullWritable>> edges) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    setEdges(edges);
+  }
+
+  @Override
+  public void setEdges(Iterable<Edge<LongWritable, NullWritable>> edges) {
+    neighbors = new long[(edges != null) ? Iterables.size(edges) : 0];
+    int n = 0;
+    if (edges != null) {
+      for (Edge<LongWritable, NullWritable> edge : edges) {
+        neighbors[n++] = edge.getTargetVertexId().get();
+      }
+    }
+  }
+
+  @Override
+  public LongWritable getId() {
+    return new LongWritable(id);
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(value);
+  }
+
+  @Override
+  public void setValue(DoubleWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, NullWritable>> getEdges() {
+    return new UnmodifiableLongNullEdgeArrayIterable(neighbors);
+  }
+
+  @Override
+  public NullWritable getEdgeValue(LongWritable targetVertexId) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMessageToAllEdges(final DoubleWritable message) {
+    for (long neighbor : neighbors) {
+      sendMessage(new LongWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(id);
+    out.writeDouble(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeLong(neighbors[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readLong();
+    value = in.readDouble();
+    int numEdges = in.readInt();
+    neighbors = new long[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readLong();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphEdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphEdgeListVertex.java
new file mode 100644
index 0000000..b60d895
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphEdgeListVertex.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * An edge-list backed vertex that allows for parallel edges.
+ * This can be used not only to support mutable multigraphs,
+ * but also to make mutations and edge-based input efficient without
+ * resorting to a hash-map backed vertex.
+ *
+ * Note: removeEdge() here removes all edges pointing to the target vertex,
+ * but returns only one of them (or null if there are no such edges).
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class MultiGraphEdgeListVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends EdgeListVertexBase<I, V, E, M> {
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public int removeEdges(I targetVertexId) {
+    int removedCount = 0;
+    for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
+      Edge<I, E> edge = edges.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        ++removedCount;
+        edges.remove();
+      }
+    }
+    return removedCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphRepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphRepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphRepresentativeVertex.java
new file mode 100644
index 0000000..4733e2a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphRepresentativeVertex.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Similar to {@link RepresentativeVertex}, but allows for parallel edges.
+ *
+ * Note:  removeEdge() here removes all edges pointing to the target vertex,
+ * but returns only one of them (or null if there are no such edges).
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class MultiGraphRepresentativeVertex<I extends
+    WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends RepresentativeVertexBase<I, V, E, M> {
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public final int removeEdges(I targetVertexId) {
+    return removeAllEdges(targetVertexId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/MutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/MutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/MutableVertex.java
new file mode 100644
index 0000000..a6d7ce5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/MutableVertex.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Interface used by VertexReader to set the properties of a new vertex
+ * or mutate the graph.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MutableVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Vertex<I, V, E, M> {
+  /**
+   * Add an edge for this vertex (happens immediately)
+   *
+   * @param edge Edge to add
+   * @return Return true if succeeded, false otherwise
+   */
+  public abstract boolean addEdge(Edge<I, E> edge);
+
+  /**
+   * Removes all edges pointing to the given vertex id.
+   *
+   * @param targetVertexId the target vertex id
+   * @return the number of removed edges
+   */
+  public abstract int removeEdges(I targetVertexId);
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   * @param edges Initial edges
+   */
+  public void addVertexRequest(I id, V value, Iterable<Edge<I, E>> edges)
+    throws IOException {
+    Vertex<I, V, E, M> vertex = getConf().createVertex();
+    vertex.initialize(id, value, edges);
+    getGraphState().getWorkerClientRequestProcessor().addVertexRequest(vertex);
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  public void addVertexRequest(I id, V value) throws IOException {
+    addVertexRequest(id, value, Collections.<Edge<I, E>>emptyList());
+  }
+
+  /**
+   * Request to remove a vertex from the graph
+   * (applied just prior to the next superstep).
+   *
+   * @param vertexId Id of the vertex to be removed.
+   */
+  public void removeVertexRequest(I vertexId) throws IOException {
+    getGraphState().getWorkerClientRequestProcessor().
+        removeVertexRequest(vertexId);
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param edge Edge to add
+   */
+  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+    throws IOException {
+    getGraphState().getWorkerClientRequestProcessor().
+        addEdgeRequest(sourceVertexId, edge);
+  }
+
+  /**
+   * Request to remove all edges from a given source vertex to a given target
+   * vertex (processed just prior to the next superstep).
+   *
+   * @param sourceVertexId Source vertex id
+   * @param targetVertexId Target vertex id
+   */
+  public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
+    throws IOException {
+    getGraphState().getWorkerClientRequestProcessor().
+        removeEdgesRequest(sourceVertexId, targetVertexId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertex.java
new file mode 100644
index 0000000..f805007
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * This vertex should only be used in conjunction with ByteArrayPartition since
+ * it has special code to deserialize by reusing objects and not instantiating
+ * new ones.  If used without ByteArrayPartition, it will cause a lot of
+ * wasted memory.
+ *
+ * Also, this vertex is optimized for space and not efficient for either adding
+ * or random access of edges.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class RepresentativeVertex<
+    I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends RepresentativeVertexBase<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RepresentativeVertex.class);
+
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    // Note that this is very expensive (deserializes all edges
+    // in an addEdge() request).
+    // Hopefully the user set all the edges in setEdges().
+    for (Edge<I, E> currentEdge : getEdges()) {
+      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
+        LOG.warn("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            edge.getTargetVertexId());
+        return false;
+      }
+    }
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public final int removeEdges(I targetVertexId) {
+    return removeFirstEdge(targetVertexId) ? 1 : 0;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertexBase.java
new file mode 100644
index 0000000..2dc840b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/RepresentativeVertexBase.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Common base class for representative vertices.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class RepresentativeVertexBase<
+    I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RepresentativeVertex.class);
+  /** Representative edge */
+  private final Edge<I, E> representativeEdge = new Edge<I, E>();
+  /** Serialized edges */
+  private byte[] serializedEdges;
+  /** Byte used in serializedEdges */
+  private int serializedEdgesBytesUsed;
+  /** Number of edges */
+  private int edgeCount;
+
+  /**
+   * Append an edge to the serialized representation.
+   *
+   * @param edge Edge to append
+   */
+  protected void appendEdge(Edge<I, E> edge) {
+    ExtendedDataOutput extendedDataOutput =
+        getConf().createExtendedDataOutput(
+            serializedEdges, serializedEdgesBytesUsed);
+    try {
+      edge.getTargetVertexId().write(extendedDataOutput);
+      edge.getValue().write(extendedDataOutput);
+    } catch (IOException e) {
+      throw new IllegalStateException("addEdge: Failed to write to the " +
+          "new byte array");
+    }
+    serializedEdges = extendedDataOutput.getByteArray();
+    serializedEdgesBytesUsed = extendedDataOutput.getPos();
+    ++edgeCount;
+  }
+
+  /**
+   * Remove the first edge pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return True if one such edge was found and removed.
+   */
+  protected boolean removeFirstEdge(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges
+    // in an removedge() request).
+    // Hopefully the user set all the edges correctly in setEdges().
+    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
+    int foundStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
+            serializedEdges, foundStartOffset,
+            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
+        serializedEdgesBytesUsed -=
+            iterator.extendedDataInput.getPos() - foundStartOffset;
+        --edgeCount;
+        return true;
+      }
+      foundStartOffset = iterator.extendedDataInput.getPos();
+    }
+
+    return false;
+  }
+
+  /**
+   * Remove all edges pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return The number of removed edges
+   */
+  protected int removeAllEdges(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges
+    // in an removedge() request).
+    // Hopefully the user set all the edges correctly in setEdges().
+    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
+    int removedCount = 0;
+    List<Integer> foundStartOffsets = new LinkedList<Integer>();
+    List<Integer> foundEndOffsets = new LinkedList<Integer>();
+    int lastStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        foundStartOffsets.add(lastStartOffset);
+        foundEndOffsets.add(iterator.extendedDataInput.getPos());
+        ++removedCount;
+      }
+      lastStartOffset = iterator.extendedDataInput.getPos();
+    }
+    foundStartOffsets.add(serializedEdgesBytesUsed);
+
+    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
+    Integer foundStartOffset = foundStartOffsetIter.next();
+    for (Integer foundEndOffset : foundEndOffsets) {
+      Integer nextFoundStartOffset = foundStartOffsetIter.next();
+      System.arraycopy(serializedEdges, foundEndOffset,
+          serializedEdges, foundStartOffset,
+          nextFoundStartOffset - foundEndOffset);
+      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
+      foundStartOffset = nextFoundStartOffset;
+    }
+
+    edgeCount -= removedCount;
+    return removedCount;
+  }
+
+  @Override
+  public final void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
+    // Make sure the initial values exist
+    representativeEdge.setTargetVertexId(getConf().createVertexId());
+    representativeEdge.setValue(getConf().createEdgeValue());
+    super.initialize(id, value, edges);
+  }
+
+  @Override
+  public final void initialize(I id, V value) {
+    // Make sure the initial values exist
+    representativeEdge.setTargetVertexId(getConf().createVertexId());
+    representativeEdge.setValue(getConf().createEdgeValue());
+    super.initialize(id, value);
+  }
+
+  /**
+   * Iterator that uses the representative edge (only one iterator allowed
+   * at a time)
+   */
+  private final class RepresentativeEdgeIterator implements
+      Iterator<Edge<I, E>> {
+    /** Input for processing the bytes */
+    private final ExtendedDataInput extendedDataInput;
+
+    /** Constructor. */
+    RepresentativeEdgeIterator() {
+      extendedDataInput = getConf().createExtendedDataInput(
+          serializedEdges, 0, serializedEdgesBytesUsed);
+    }
+    @Override
+    public boolean hasNext() {
+      return serializedEdges != null && extendedDataInput.available() > 0;
+    }
+
+    @Override
+    public Edge<I, E> next() {
+      try {
+        representativeEdge.getTargetVertexId().readFields(extendedDataInput);
+        representativeEdge.getValue().readFields(extendedDataInput);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: Failed on pos " +
+            extendedDataInput.getPos() + " edge " + representativeEdge);
+      }
+      return representativeEdge;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: Not supported");
+    }
+  }
+
+  @Override
+  public final Iterator<Edge<I, E>> iterator() {
+    return new RepresentativeEdgeIterator();
+  }
+
+  @Override
+  public final void setEdges(Iterable<Edge<I, E>> edges) {
+    ExtendedDataOutput extendedOutputStream =
+        getConf().createExtendedDataOutput();
+    if (edges != null) {
+      for (Edge<I, E> edge : edges) {
+        try {
+          edge.getTargetVertexId().write(extendedOutputStream);
+          edge.getValue().write(extendedOutputStream);
+        } catch (IOException e) {
+          throw new IllegalStateException("setEdges: Failed to serialize " +
+              edge);
+        }
+        ++edgeCount;
+      }
+    }
+    serializedEdges = extendedOutputStream.getByteArray();
+    serializedEdgesBytesUsed = extendedOutputStream.getPos();
+  }
+
+  @Override
+  public final Iterable<Edge<I, E>> getEdges() {
+    return this;
+  }
+
+  @Override
+  public final int getNumEdges() {
+    return edgeCount;
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    // Ensure these objects are present
+    if (representativeEdge.getTargetVertexId() == null) {
+      representativeEdge.setTargetVertexId(getConf().createVertexId());
+    }
+
+    if (representativeEdge.getValue() == null) {
+      representativeEdge.setValue(getConf().createEdgeValue());
+    }
+
+    I vertexId = getId();
+    if (vertexId == null) {
+      vertexId = getConf().createVertexId();
+    }
+    vertexId.readFields(in);
+
+    V vertexValue = getValue();
+    if (vertexValue == null) {
+      vertexValue = getConf().createVertexValue();
+    }
+    vertexValue.readFields(in);
+
+    initialize(vertexId, vertexValue);
+
+    serializedEdgesBytesUsed = in.readInt();
+    // Only create a new buffer if the old one isn't big enough
+    if (serializedEdges == null ||
+        serializedEdgesBytesUsed > serializedEdges.length) {
+      serializedEdges = new byte[serializedEdgesBytesUsed];
+    }
+    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+    edgeCount = in.readInt();
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(serializedEdgesBytesUsed);
+    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+    out.writeInt(edgeCount);
+
+    out.writeBoolean(isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleMutableVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleMutableVertex.java
new file mode 100644
index 0000000..9def05d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleMutableVertex.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Lists;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Mutable vertex with no edge values.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <M> Message data
+ */
+public abstract class SimpleMutableVertex<I extends WritableComparable,
+    V extends Writable, M extends Writable> extends MutableVertex<I, V,
+        NullWritable, M> {
+  /**
+   * Set the neighbors of this vertex.
+   *
+   * @param neighbors Iterable of destination vertex ids.
+   */
+  public abstract void setNeighbors(Iterable<I> neighbors);
+
+  @Override
+  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
+    setNeighbors(EdgeIterables.getNeighbors(edges));
+  }
+
+  /**
+   * Get a read-only view of the neighbors of this
+   * vertex, i.e. the target vertices of its out-edges.
+   *
+   * @return the neighbors (sort order determined by subclass implementation).
+   */
+  public abstract Iterable<I> getNeighbors();
+
+  @Override
+  public Iterable<Edge<I, NullWritable>> getEdges() {
+    return EdgeIterables.getEdges(getNeighbors());
+  }
+
+  @Override
+  public NullWritable getEdgeValue(I targetVertexId) {
+    return NullWritable.get();
+  }
+
+  /**
+   * Add an edge for this vertex (happens immediately)
+   *
+   * @param targetVertexId target vertex
+   * @return Return true if succeeded, false otherwise
+   */
+  public abstract boolean addEdge(I targetVertexId);
+
+  @Override
+  public boolean addEdge(Edge<I, NullWritable> edge) {
+    return addEdge(edge.getTargetVertexId());
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   */
+  public void addEdgeRequest(I sourceVertexId) throws IOException {
+    getGraphState().getWorkerClientRequestProcessor().
+        addEdgeRequest(sourceVertexId, new Edge<I,
+            NullWritable>(sourceVertexId, NullWritable.get()));
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    I vertexId = (I) getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = (V) getConf().createVertexValue();
+    vertexValue.readFields(in);
+
+    int numEdges = in.readInt();
+    List<Edge<I, NullWritable>> edges =
+        Lists.newArrayListWithCapacity(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = (I) getConf().createVertexId();
+      targetVertexId.readFields(in);
+      edges.add(new Edge<I, NullWritable>(targetVertexId, NullWritable.get()));
+    }
+
+    initialize(vertexId, vertexValue, edges);
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(getNumEdges());
+    for (I neighbor : getNeighbors()) {
+      neighbor.write(out);
+    }
+
+    out.writeBoolean(isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleVertex.java
new file mode 100644
index 0000000..38485de
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/SimpleVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Lists;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Vertex with no edge values.
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <M> Message data
+ */
+public abstract class SimpleVertex<I extends WritableComparable,
+    V extends Writable, M extends Writable> extends Vertex<I, V,
+        NullWritable, M> {
+  /**
+   * Set the neighbors of this vertex.
+   *
+   * @param neighbors Iterable of destination vertex ids.
+   */
+  public abstract void setNeighbors(Iterable<I> neighbors);
+
+  @Override
+  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
+    setNeighbors(EdgeIterables.getNeighbors(edges));
+  }
+
+  /**
+   * Get a read-only view of the neighbors of this
+   * vertex, i.e. the target vertices of its out-edges.
+   *
+   * @return the neighbors (sort order determined by subclass implementation).
+   */
+  public abstract Iterable<I> getNeighbors();
+
+  @Override
+  public Iterable<Edge<I, NullWritable>> getEdges() {
+    return EdgeIterables.getEdges(getNeighbors());
+  }
+
+  @Override
+  public NullWritable getEdgeValue(I targetVertexId) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    I vertexId = getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = getConf().createVertexValue();
+    vertexValue.readFields(in);
+
+    int numEdges = in.readInt();
+    List<I> neighbors = Lists.newArrayListWithCapacity(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      neighbors.add(targetVertexId);
+    }
+
+    initialize(vertexId, vertexValue);
+    setNeighbors(neighbors);
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(getNumEdges());
+    for (I neighbor : getNeighbors()) {
+      neighbor.write(out);
+    }
+
+    out.writeBoolean(isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
new file mode 100644
index 0000000..d1fbe14
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Basic interface for writing a BSP application for computation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class Vertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements WorkerAggregatorUsage, Writable,
+    ImmutableClassesGiraphConfigurable<I, V, E, M> {
+  /** Vertex id. */
+  private I id;
+  /** Vertex value. */
+  private V value;
+  /** If true, do not do anymore computation on this vertex. */
+  private boolean halt;
+  /** Global graph state **/
+  private GraphState<I, V, E, M> graphState;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+  /**
+   * This method must be called after instantiation of a vertex
+   * with ImmutableClassesGiraphConfiguration
+   * unless deserialization from readFields() is
+   * called.
+   *
+   * @param id Will be the vertex id
+   * @param value Will be the vertex value
+   * @param edges Iterable of edges
+   */
+  public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
+    this.id = id;
+    this.value = value;
+    setEdges(edges);
+  }
+
+  /**
+   * This method only sets id and value. Can be used by Vertex
+   * implementations in readFields().
+   *
+   * @param id Vertex id
+   * @param value Vertex value
+   */
+  public void initialize(I id, V value) {
+    this.id = id;
+    this.value = value;
+    setEdges(Collections.<Edge<I, E>>emptyList());
+  }
+
+  /**
+   * Set the outgoing edges for this vertex.
+   *
+   * @param edges Iterable of edges
+   */
+  public abstract void setEdges(Iterable<Edge<I, E>> edges);
+
+  /**
+   * Must be defined by user to do computation on a single Vertex.
+   *
+   * @param messages Messages that were sent to this vertex in the previous
+   *                 superstep.  Each message is only guaranteed to have
+   *                 a life expectancy as long as next() is not called.
+   * @throws IOException
+   */
+  public abstract void compute(Iterable<M> messages) throws IOException;
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  /**
+   * Get the vertex id.
+   *
+   * @return My vertex id.
+   */
+  public I getId() {
+    return id;
+  }
+
+  /**
+   * Get the vertex value (data stored with vertex)
+   *
+   * @return Vertex value
+   */
+  public V getValue() {
+    return value;
+  }
+
+  /**
+   * Set the vertex data (immediately visible in the computation)
+   *
+   * @param value Vertex data to be set
+   */
+  public void setValue(V value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getTotalNumVertices() {
+    return getGraphState().getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getTotalNumEdges() {
+    return getGraphState().getTotalNumEdges();
+  }
+
+  /**
+   * Get a read-only view of the out-edges of this vertex.
+   *
+   * @return the out edges (sort order determined by subclass implementation).
+   */
+  public abstract Iterable<Edge<I, E>> getEdges();
+
+  /**
+   * Does an edge with the target vertex id exist?
+   *
+   * @param targetVertexId Target vertex id to check
+   * @return true if there is an edge to the target
+   */
+  public boolean hasEdge(I targetVertexId) {
+    for (Edge<I, E> edge : getEdges()) {
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the edge value associated with a target vertex id.
+   *
+   * @param targetVertexId Target vertex id to check
+   *
+   * @return the value of the edge to targetVertexId (or null if there
+   *         is no edge to it)
+   */
+  public E getEdgeValue(I targetVertexId) {
+    for (Edge<I, E> edge : getEdges()) {
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        return edge.getValue();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the number of outgoing edges on this vertex.
+   *
+   * @return the total number of outbound edges from this vertex
+   */
+  public int getNumEdges() {
+    return Iterables.size(getEdges());
+  }
+
+  /**
+   * Send a message to a vertex id.  The message should not be mutated after
+   * this method returns or else undefined results could occur.
+   *
+   * @param id Vertex id to send the message to
+   * @param message Message data to send.  Note that after the message is sent,
+   *        the user should not modify the object.
+   */
+  public void sendMessage(I id, M message) {
+    if (message == null) {
+      throw new IllegalArgumentException(
+          "sendMessage: Cannot send null message to " + id);
+    }
+    if (graphState.getWorkerClientRequestProcessor().
+          sendMessageRequest(id, message)) {
+      graphState.getGraphMapper().notifySentMessages();
+    }
+  }
+
+  /**
+   * Lookup WorkerInfo for myself.
+   *
+   * @return WorkerInfo about worker holding this Vertex.
+   */
+  public WorkerInfo getMyWorkerInfo() {
+    return getVertexWorkerInfo(id);
+  }
+
+  /**
+   * Lookup WorkerInfo for a Vertex.
+   *
+   * @param vertexId VertexId to lookup
+   * @return WorkerInfo about worker holding this Vertex.
+   */
+  public WorkerInfo getVertexWorkerInfo(I vertexId) {
+    return getVertexPartitionOwner(vertexId).getWorkerInfo();
+  }
+
+  /**
+   * Lookup PartitionOwner for a Vertex
+   *
+   * @param vertexId id of Vertex to look up.
+   * @return PartitionOwner holding Vertex
+   */
+  private PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return getGraphState().getWorkerClientRequestProcessor().
+        getVertexPartitionOwner(vertexId);
+  }
+
+  /**
+   * Send a message to all edges.
+   *
+   * @param message Message sent to all edges.
+   */
+  public void sendMessageToAllEdges(M message) {
+    for (Edge<I, E> edge : getEdges()) {
+      sendMessage(edge.getTargetVertexId(), message);
+    }
+  }
+
+  /**
+   * After this is called, the compute() code will no longer be called for
+   * this vertex unless a message is sent to it.  Then the compute() code
+   * will be called once again until this function is called.  The
+   * application finishes only when all vertices vote to halt.
+   */
+  public void voteToHalt() {
+    halt = true;
+  }
+
+  /**
+   * Re-activate vertex if halted.
+   */
+  public void wakeUp() {
+    halt = false;
+  }
+
+  /**
+   * Is this vertex done?
+   *
+   * @return True if halted, false otherwise.
+   */
+  public boolean isHalted() {
+    return halt;
+  }
+
+  /**
+   * Get the graph state for all workers.
+   *
+   * @return Graph state for all workers
+   */
+  public GraphState<I, V, E, M> getGraphState() {
+    return graphState;
+  }
+
+  /**
+   * Set the graph state for all workers
+   *
+   * @param graphState Graph state for all workers
+   */
+  public void setGraphState(GraphState<I, V, E, M> graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return getGraphState().getContext();
+  }
+
+  /**
+   * Get the worker context
+   *
+   * @return WorkerContext context
+   */
+  public WorkerContext getWorkerContext() {
+    return getGraphState().getGraphMapper().getWorkerContext();
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    getGraphState().getWorkerAggregatorUsage().
+        aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return getGraphState().getWorkerAggregatorUsage().
+        <A>getAggregatedValue(name);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    I vertexId = (I) getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = (V) getConf().createVertexValue();
+    vertexValue.readFields(in);
+
+    int numEdges = in.readInt();
+    List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = (I) getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = (E) getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edges.add(new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    initialize(vertexId, vertexValue, edges);
+
+    readHaltBoolean(in);
+  }
+
+  /**
+   * Helper method for subclasses which implement their own readFields() to use.
+   *
+   * @param in DataInput to read from.
+   * @throws IOException If anything goes wrong during read.
+   */
+  protected void readHaltBoolean(DataInput in) throws IOException {
+    halt = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(getNumEdges());
+    for (Edge<I, E> edge : getEdges()) {
+      edge.getTargetVertexId().write(out);
+      edge.getValue().write(out);
+    }
+
+    out.writeBoolean(halt);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String toString() {
+    return "Vertex(id=" + getId() + ",value=" + getValue() +
+        ",#edges=" + getNumEdges() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/vertex/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/package-info.java b/giraph-core/src/main/java/org/apache/giraph/vertex/package-info.java
new file mode 100644
index 0000000..5d983e2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Vertex implementations.
+ */
+package org.apache.giraph.vertex;