You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/07 06:37:42 UTC

[5/8] GIRAPH-528: Decouple vertex implementation from edge storage (apresta)

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 16b0d48..b08c74d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.bsp.BspUtils;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -28,8 +27,9 @@ import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -69,6 +69,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   private static final int MSG_COMBINER_PARAM_INDEX = 1;
   /** E param edge input format index in classList */
   private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
+  /** E param vertex edges index in classList */
+  private static final int EDGE_PARAM_VERTEX_EDGES_INDEX = 1;
 
   /** Vertex Index Type */
   private Type vertexIndexType;
@@ -103,13 +105,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   public void validateConfiguration() {
     checkConfiguration();
     Class<? extends Vertex<I, V, E, M>> vertexClass =
-      BspUtils.<I, V, E, M>getVertexClass(conf);
+      conf.getVertexClass();
     List<Class<?>> classList = ReflectionUtils.getTypeArguments(
       Vertex.class, vertexClass);
     vertexIndexType = classList.get(ID_PARAM_INDEX);
     vertexValueType = classList.get(VALUE_PARAM_INDEX);
     edgeValueType = classList.get(EDGE_PARAM_INDEX);
     messageValueType = classList.get(MSG_PARAM_INDEX);
+    verifyVertexEdgesGenericTypes();
     verifyVertexInputFormatGenericTypes();
     verifyEdgeInputFormatGenericTypes();
     verifyVertexOutputFormatGenericTypes();
@@ -146,11 +149,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
           GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
           GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
     }
-    if (conf.getEdgeInputFormatClass() != null &&
-        !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
-      throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
-          " only works with mutable vertices");
-    }
     if (conf.getVertexResolverClass() == null) {
       if (LOG.isInfoEnabled()) {
         LOG.info("checkConfiguration: No class found for " +
@@ -158,12 +156,47 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
             DefaultVertexResolver.class.getCanonicalName());
       }
     }
+    if (conf.getVertexEdgesClass() == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkConfiguration: No class found for " +
+            GiraphConstants.VERTEX_EDGES_CLASS + ", defaulting to " +
+            ByteArrayEdges.class.getCanonicalName());
+      }
+    }
+  }
+
+  /** Verify matching generic types in VertexEdges. */
+  private void verifyVertexEdgesGenericTypes() {
+    Class<? extends VertexEdges<I, E>> vertexEdgesClass =
+        conf.getVertexEdgesClass();
+    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
+        VertexEdges.class, vertexEdgesClass);
+    // VertexEdges implementations can be generic, in which case there are no
+    // types to check.
+    if (classList.isEmpty()) {
+      return;
+    }
+    if (classList.get(ID_PARAM_INDEX) != null &&
+        !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Vertex index types don't match, " +
+              "vertex - " + vertexIndexType +
+              ", vertex edges - " + classList.get(ID_PARAM_INDEX));
+    }
+    if (classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX) != null &&
+        !edgeValueType.equals(classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Edge value types don't match, " +
+              "vertex - " + edgeValueType +
+              ", vertex edges - " +
+              classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX));
+    }
   }
 
   /** Verify matching generic types in VertexInputFormat. */
   private void verifyVertexInputFormatGenericTypes() {
     Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
-      BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+      conf.getVertexInputFormatClass();
     if (vertexInputFormatClass != null) {
       List<Class<?>> classList =
           ReflectionUtils.getTypeArguments(
@@ -198,7 +231,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /** Verify matching generic types in EdgeInputFormat. */
   private void verifyEdgeInputFormatGenericTypes() {
     Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
-        BspUtils.<I, E>getEdgeInputFormatClass(conf);
+        conf.getEdgeInputFormatClass();
     if (edgeInputFormatClass != null) {
       List<Class<?>> classList =
           ReflectionUtils.getTypeArguments(
@@ -227,7 +260,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /** If there is a combiner type, verify its generic params match the job. */
   private void verifyVertexCombinerGenericTypes() {
     Class<? extends Combiner<I, M>> vertexCombinerClass =
-      BspUtils.<I, M>getCombinerClass(conf);
+      conf.getCombinerClass();
     if (vertexCombinerClass != null) {
       List<Class<?>> classList =
         ReflectionUtils.getTypeArguments(
@@ -250,8 +283,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /** Verify that the output format's generic params match the job. */
   private void verifyVertexOutputFormatGenericTypes() {
     Class<? extends VertexOutputFormat<I, V, E>>
-      vertexOutputFormatClass =
-      BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+      vertexOutputFormatClass = conf.getVertexOutputFormatClass();
     if (vertexOutputFormatClass != null) {
       List<Class<?>> classList =
         ReflectionUtils.getTypeArguments(
@@ -287,7 +319,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    * validate the generic parameter types. */
   private void verifyVertexResolverGenericTypes() {
     Class<? extends VertexResolver<I, V, E, M>>
-      vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+      vrClass = conf.getVertexResolverClass();
     if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 2260837..12aa6fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -21,7 +21,7 @@ import com.google.common.collect.MapMaker;
 import com.google.common.primitives.Ints;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 585ab85..6bc9591 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -25,7 +25,7 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index 657c054..1ca0b61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index cbf6bc3..ae8556f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
deleted file mode 100644
index 1d8fc26..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.MutableEdge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * A list of edges backed by a byte-array.
- * The same Edge object is reused when iterating over all edges,
- * unless an alternative iterable is requested.
- * It automatically optimizes for edges with no value,
- * and also supports shallow copy from another instance.
- *
- * @param <I> Vertex id
- * @param <E> Edge value
- */
-public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
-    implements Iterable<Edge<I, E>>, Writable {
-  /** Representative edge object. */
-  private MutableEdge<I, E> representativeEdge;
-  /** Serialized edges. */
-  private byte[] serializedEdges;
-  /** Number of bytes used in serializedEdges. */
-  private int serializedEdgesBytesUsed;
-  /** Number of edges. */
-  private int edgeCount;
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration;
-
-  /**
-   * Constructor.
-   * Depending on the configuration, instantiates a representative edge with
-   * or without an edge value.
-   *
-   * @param conf Configuration
-   */
-  public ByteArrayEdges(ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
-    configuration = conf;
-    representativeEdge = configuration.createMutableEdge();
-    ExtendedDataOutput extendedOutputStream =
-        configuration.createExtendedDataOutput();
-    serializedEdges = extendedOutputStream.getByteArray();
-    serializedEdgesBytesUsed = extendedOutputStream.getPos();
-  }
-
-  /**
-   * Constructor.
-   * Takes another instance of {@link ByteArrayEdges} and makes a shallow
-   * copy of it.
-   *
-   * @param edges {@link ByteArrayEdges} to copy
-   */
-  public ByteArrayEdges(ByteArrayEdges<I, E> edges) {
-    representativeEdge = edges.representativeEdge;
-    serializedEdges = edges.serializedEdges;
-    serializedEdgesBytesUsed = edges.serializedEdgesBytesUsed;
-    edgeCount = edges.edgeCount;
-    configuration = edges.configuration;
-  }
-
-  /**
-   * Append an edge to the serialized representation.
-   *
-   * @param edge Edge to append
-   */
-  public final void appendEdge(Edge<I, E> edge) {
-    ExtendedDataOutput extendedDataOutput =
-        configuration.createExtendedDataOutput(
-            serializedEdges, serializedEdgesBytesUsed);
-    try {
-      WritableUtils.writeEdge(extendedDataOutput, edge);
-    } catch (IOException e) {
-      throw new IllegalStateException("append: Failed to write to the new " +
-          "byte array");
-    }
-    serializedEdges = extendedDataOutput.getByteArray();
-    serializedEdgesBytesUsed = extendedDataOutput.getPos();
-    ++edgeCount;
-  }
-
-  /**
-   * Set all the edges.
-   * Note: when possible, use the constructor which takes another {@link
-   * ByteArrayEdges} instead of a generic {@link Iterable}.
-   *
-   * @param edges Iterable of edges
-   */
-  public final void setEdges(Iterable<Edge<I, E>> edges) {
-    ExtendedDataOutput extendedOutputStream =
-        configuration.createExtendedDataOutput();
-    if (edges != null) {
-      for (Edge<I, E> edge : edges) {
-        try {
-          WritableUtils.writeEdge(extendedOutputStream, edge);
-        } catch (IOException e) {
-          throw new IllegalStateException("setEdges: Failed to serialize " +
-              edge);
-        }
-        ++edgeCount;
-      }
-    }
-    serializedEdges = extendedOutputStream.getByteArray();
-    serializedEdgesBytesUsed = extendedOutputStream.getPos();
-  }
-
-  /**
-   * Remove the first edge pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return True if one such edge was found and removed.
-   */
-  public final boolean removeFirstEdge(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges).
-    ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
-    int foundStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
-            serializedEdges, foundStartOffset,
-            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
-        serializedEdgesBytesUsed -=
-            iterator.extendedDataInput.getPos() - foundStartOffset;
-        --edgeCount;
-        return true;
-      }
-      foundStartOffset = iterator.extendedDataInput.getPos();
-    }
-
-    return false;
-  }
-
-  /**
-   * Remove all edges pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return The number of removed edges
-   */
-  public final int removeAllEdges(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges).
-    ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
-    int removedCount = 0;
-    List<Integer> foundStartOffsets = new LinkedList<Integer>();
-    List<Integer> foundEndOffsets = new LinkedList<Integer>();
-    int lastStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        foundStartOffsets.add(lastStartOffset);
-        foundEndOffsets.add(iterator.extendedDataInput.getPos());
-        ++removedCount;
-      }
-      lastStartOffset = iterator.extendedDataInput.getPos();
-    }
-    foundStartOffsets.add(serializedEdgesBytesUsed);
-
-    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
-    Integer foundStartOffset = foundStartOffsetIter.next();
-    for (Integer foundEndOffset : foundEndOffsets) {
-      Integer nextFoundStartOffset = foundStartOffsetIter.next();
-      System.arraycopy(serializedEdges, foundEndOffset,
-          serializedEdges, foundStartOffset,
-          nextFoundStartOffset - foundEndOffset);
-      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
-      foundStartOffset = nextFoundStartOffset;
-    }
-
-    edgeCount -= removedCount;
-    return removedCount;
-  }
-
-  public final int getNumEdges() {
-    return edgeCount;
-  }
-
-  /**
-   * Iterator that uses the representative edge (only one iterator allowed
-   * at a time).
-   */
-  private final class ByteArrayEdgeIterator implements Iterator<Edge<I, E>> {
-    /** Input for processing the bytes */
-    private final ExtendedDataInput extendedDataInput;
-
-    /** Constructor. */
-    ByteArrayEdgeIterator() {
-      extendedDataInput = configuration.createExtendedDataInput(
-          serializedEdges, 0, serializedEdgesBytesUsed);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return serializedEdges != null && extendedDataInput.available() > 0;
-    }
-
-    @Override
-    public Edge<I, E> next() {
-      try {
-        WritableUtils.readEdge(extendedDataInput, representativeEdge);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: Failed on pos " +
-            extendedDataInput.getPos() + " edge " + representativeEdge);
-      }
-      return representativeEdge;
-    }
-
-    @Override
-    public void remove() {
-      throw new IllegalAccessError("remove: Not supported");
-    }
-  }
-
-  @Override
-  public final Iterator<Edge<I, E>> iterator() {
-    return new ByteArrayEdgeIterator();
-  }
-
-  /**
-   * Release and return the current representative edge.
-   *
-   * @return The released edge
-   */
-  private Edge<I, E> releaseCurrentEdge() {
-    Edge<I, E> releasedEdge = representativeEdge;
-    representativeEdge = configuration.createMutableEdge();
-    return releasedEdge;
-  }
-
-  /**
-   * Get an iterable wrapper that creates new Edge objects on the fly.
-   *
-   * @return Edge iteratable that creates new objects
-   */
-  public final Iterable<Edge<I, E>> copyEdgeIterable() {
-    return Iterables.transform(this,
-        new Function<Edge<I, E>, Edge<I, E>>() {
-          @Override
-          public Edge<I, E> apply(Edge<I, E> input) {
-            return releaseCurrentEdge();
-          }
-        });
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    serializedEdgesBytesUsed = in.readInt();
-    // Only create a new buffer if the old one isn't big enough
-    if (serializedEdges == null ||
-        serializedEdgesBytesUsed > serializedEdges.length) {
-      serializedEdges = new byte[serializedEdgesBytesUsed];
-    }
-    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
-    edgeCount = in.readInt();
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    out.writeInt(serializedEdgesBytesUsed);
-    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
-    out.writeInt(edgeCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index bd464d5..2cfa661 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -220,6 +220,17 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
     public T getCurrentData() {
       return data;
     }
+
+    /**
+     * Release the current data object.
+     *
+     * @return Released data object
+     */
+    public T releaseCurrentData() {
+      T releasedData = data;
+      data = null;
+      return releasedData;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 1cfd21e..8a5fb01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.utils;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -76,9 +76,23 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
    * lifetime of the object is only until next() is called.
    */
   public class VertexIdEdgeIterator extends VertexIdDataIterator {
+    /**
+     * Get the current edge.
+     *
+     * @return Current edge
+     */
     public Edge<I, E> getCurrentEdge() {
       return getCurrentData();
     }
+
+    /**
+     * Release the current edge.
+     *
+     * @return Released edge
+     */
+    public Edge<I, E> releaseCurrentEdge() {
+      return releaseCurrentData();
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index fd06783..0280c58 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -104,6 +104,11 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
    * lifetime of the object is only until next() is called.
    */
   public class VertexIdMessageIterator extends VertexIdDataIterator {
+    /**
+     * Get the current message.
+     *
+     * @return Current message
+     */
     public M getCurrentMessage() {
       return getCurrentData();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
deleted file mode 100644
index bb940ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-
-/** Simple helper class for comparisons and equality checking */
-public class ComparisonUtils {
-
-  /** Do not construct this object */
-  private ComparisonUtils() { }
-
-  /**
-   * Compare elements, sort order and length
-   *
-   * @param <T> Type of iterable to compare.
-   * @param first First iterable to compare.
-   * @param second Second iterable to compare.
-   * @return True if equal, false otherwise.
-   */
-  public static <T> boolean equal(Iterable<T> first, Iterable<T> second) {
-    return equal(first.iterator(), second.iterator());
-  }
-
-  /**
-   * Compare elements, sort order and length
-   *
-   * @param <T> Type of iterable to compare.
-   * @param first First iterable to compare.
-   * @param second Second iterable to compare.
-   * @return True if equal, false otherwise.
-   */
-  public static <T> boolean equal(Iterator<T> first, Iterator<T> second) {
-    while (first.hasNext() && second.hasNext()) {
-      T message = first.next();
-      T otherMessage = second.next();
-      /* element-wise equality */
-      if (!(message == null ? otherMessage == null :
-        message.equals(otherMessage))) {
-        return false;
-      }
-    }
-    /* length must also be equal */
-    return !(first.hasNext() || second.hasNext());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index bd2a9c4..41238d0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -20,24 +20,25 @@ package org.apache.giraph.utils;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.Algorithm;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.io.formats.GiraphFileInputFormat;
-import org.apache.giraph.job.GiraphConfigurationValidator;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.job.GiraphConfigurationValidator;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
@@ -73,6 +74,7 @@ public final class ConfigurationUtils {
     OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
     OPTIONS.addOption("op", "outputPath", true, "Vertex output path");
     OPTIONS.addOption("c", "combiner", true, "Combiner class");
+    OPTIONS.addOption("ve", "vertexEdges", true, "Vertex edges class");
     OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
     OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
     OPTIONS.addOption("mc", "masterCompute", true, "MasterCompute class");
@@ -193,6 +195,11 @@ public final class ConfigurationUtils {
           (Class<? extends Combiner>)
               Class.forName(cmd.getOptionValue("c")));
     }
+    if (cmd.hasOption("ve")) {
+      giraphConfiguration.setVertexEdgesClass(
+          (Class<? extends VertexEdges>)
+              Class.forName(cmd.getOptionValue("ve")));
+    }
     if (cmd.hasOption("wc")) {
       giraphConfiguration.setWorkerContextClass(
           (Class<? extends WorkerContext>)

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
new file mode 100644
index 0000000..df40f01
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Comparator;
+
+/**
+ * Comparator for edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value (needs to be WritableComparable)
+ */
+public class EdgeComparator<I extends WritableComparable,
+    E extends WritableComparable> implements Comparator<Edge<I, E>> {
+  @Override
+  public int compare(Edge<I, E> e1, Edge<I, E> e2) {
+    return compareEdges(e1, e2);
+  }
+
+  /**
+   * Compares two edges.
+   *
+   * @param e1 First edge
+   * @param e2 Second edge
+   * @param <I> Vertex id
+   * @param <E> Edge value (needs to be WritableComparable)
+   * @return A negative, zero, or positive value depending
+   */
+  public static <I extends WritableComparable, E extends WritableComparable>
+  int compareEdges(Edge<I, E> e1, Edge<I, E> e2) {
+    return ComparisonChain.start()
+        .compare(e1.getTargetVertexId(), e2.getTargetVertexId())
+        .compare(e1.getValue(), e2.getValue())
+        .result();
+  }
+
+  /**
+   * Indicates whether two edges are equal.
+   *
+   * @param e1 First edge
+   * @param e2 Second edge
+   * @param <I> Vertex id
+   * @param <E> Edge value (needs to be WritableComparable)
+   * @return Whether the two edges are equal
+   */
+  public static <I extends WritableComparable, E extends WritableComparable>
+  boolean equal(Edge<I, E> e1, Edge<I, E> e2) {
+    return compareEdges(e1, e2) == 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
index ab288fb..e945d7b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
@@ -18,57 +18,91 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
+import com.google.common.collect.Iterables;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 
 /**
- * Utilities for converting between edge iterables and neighbor iterables.
+ * Utility methods for iterables of edges.
  */
 public class EdgeIterables {
   /** Utility classes shouldn't be instantiated. */
   private EdgeIterables() { }
 
   /**
-   * Convert an edge iterable into a neighbor iterable.
+   * Compare two edge iterables element-wise. The edge value type needs
+   * to be Comparable.
+   *
+   * @param e1 First edge iterable
+   * @param e2 Second edge iterable
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @return Whether the two iterables are equal element-wise
+   */
+  public static <I extends WritableComparable, E extends WritableComparable>
+  boolean equals(Iterable<Edge<I, E>> e1, Iterable<Edge<I, E>> e2) {
+    Iterator<Edge<I, E>> i1 = e1.iterator();
+    Iterator<Edge<I, E>> i2 = e2.iterator();
+    while (i1.hasNext()) {
+      if (!i2.hasNext()) {
+        return false;
+      }
+      if (!EdgeComparator.equal(i1.next(), i2.next())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Make a deep copy of an edge iterable and return it as an {@link
+   * ArrayList}.
+   * Note: this method is slow since it has to deserialize all serialize all
+   * the ids and values. It should only be used in unit tests.
    *
-   * @param edges Edge iterable.
-   * @param <I> Vertex id type.
-   * @param <E> Edge value type.
-   * @return Neighbor iterable.
+   * @param edges Iterable of edges
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @return A new list with copies of all the edges
    */
-  public static
-  <I extends WritableComparable, E extends Writable>
-  Iterable<I> getNeighbors(Iterable<Edge<I, E>> edges) {
-    return Iterables.transform(edges,
-        new Function<Edge<I, E>, I>() {
-          @Override
-          public I apply(Edge<I, E> edge) {
-            return edge == null ? null : edge.getTargetVertexId();
-          }
-        });
+  public static <I extends WritableComparable, E extends WritableComparable>
+  ArrayList<Edge<I, E>> copy(Iterable<Edge<I, E>> edges) {
+    Configuration conf = new Configuration();
+    ArrayList<Edge<I, E>> edgeList =
+        new ArrayList<Edge<I, E>>(Iterables.size(edges));
+    for (Edge<I, E> edge : edges) {
+      edgeList.add(EdgeFactory.create(
+          WritableUtils.clone(edge.getTargetVertexId(), conf),
+          WritableUtils.clone(edge.getValue(), conf)));
+    }
+    return edgeList;
   }
 
   /**
-   * Convert a neighbor iterable into an edge iterable.
+   * Compare two edge iterables up to reordering. The edge value type needs
+   * to be Comparable.
    *
-   * @param neighbors Neighbor iterable.
-   * @param <I> Vertex id type.
-   * @return Edge iterable.
+   * @param e1 First edge iterable
+   * @param e2 Second edge iterable
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @return Whether the two iterables are equal up to reordering
    */
-  public static <I extends WritableComparable>
-  Iterable<Edge<I, NullWritable>> getEdges(Iterable<I> neighbors) {
-    return Iterables.transform(neighbors,
-        new Function<I, Edge<I, NullWritable>>() {
-          @Override
-          public Edge<I, NullWritable> apply(I neighbor) {
-            return EdgeFactory.create(neighbor);
-          }
-        });
+  public static <I extends WritableComparable, E extends WritableComparable>
+  boolean sameEdges(Iterable<Edge<I, E>> e1, Iterable<Edge<I, E>> e2) {
+    ArrayList<Edge<I, E>> edgeList1 = copy(e1);
+    ArrayList<Edge<I, E>> edgeList2 = copy(e2);
+    Comparator<Edge<I, E>> edgeComparator = new EdgeComparator<I, E>();
+    Collections.sort(edgeList1, edgeComparator);
+    Collections.sort(edgeList2, edgeComparator);
+    return equals(edgeList1, edgeList2);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
index 382c39c..8be82b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
@@ -18,47 +18,32 @@
 
 package org.apache.giraph.utils;
 
+import com.google.common.collect.Iterators;
+
 import java.util.Iterator;
-import java.util.NoSuchElementException;
 
 /**
- * Helper empty iterable when there are no messages.
+ * Singleton class for empty iterables.
  *
- * @param <M> Message data
+ * @param <T> Element type
  */
-public class EmptyIterable<M> implements Iterable<M>, Iterator<M> {
+public class EmptyIterable<T> implements Iterable<T> {
   /** Singleton empty iterable */
-  private static final EmptyIterable<Object> EMPTY_ITERABLE =
-      new EmptyIterable<Object>();
+  private static final EmptyIterable EMPTY_ITERABLE = new EmptyIterable();
 
   /**
    * Get the singleton empty iterable
    *
-   * @param <T> Type of the empty iterable
-   * @return Empty singleton iterable
+   * @param <T> Element type
+   * @return Singleton empty iterable
    */
-  public static <T> Iterable<T> emptyIterable() {
+  public static <T> Iterable<T> get() {
     return (Iterable<T>) EMPTY_ITERABLE;
   }
 
   @Override
-  public Iterator<M> iterator() {
-    return this;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public M next() {
-    throw new NoSuchElementException();
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
+  public Iterator<T> iterator() {
+    return Iterators.emptyIterator();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 247130b..0ff366d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -56,9 +56,8 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
    * @param pos Position in the buffer to start writing from
    */
   public ExtendedByteArrayDataOutput(byte[] buf, int pos) {
-    this.buf = buf;
+    this(buf);
     this.count = pos;
-    dataOutput = new DataOutputStream(this);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 889798f..7848d1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Executors;
  *
  * Heavily inspired from Apache Mahout's MahoutTestCase
  */
+@SuppressWarnings("unchecked")
 public class InternalVertexRunner {
   /** ZooKeeper port to use for tests */
   public static final int LOCAL_ZOOKEEPER_PORT = 22182;
@@ -124,6 +125,7 @@ public class InternalVertexRunner {
       GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
       GiraphConfiguration conf = job.getConfiguration();
       conf.setVertexClass(classes.getVertexClass());
+      conf.setVertexEdgesClass(classes.getVertexEdgesClass());
       if (classes.hasVertexInputFormat()) {
         conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index ae2c556..d70eecb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -84,9 +84,16 @@ public class ReflectionUtils {
     // start walking up the inheritance hierarchy until we hit baseClass
     while (! getClass(type).equals(baseClass)) {
       if (type instanceof Class) {
-        // there is no useful information for us in raw types,
-        // so just keep going.
-        type = ((Class<?>) type).getGenericSuperclass();
+        Type newType = ((Class<?>) type).getGenericSuperclass();
+        if (newType == null) {
+          // we have reached an interface, so we stop here
+          break;
+        } else {
+          // there is no useful information for us in raw types,
+          // so just keep going.
+          type = newType;
+        }
+
       } else {
         ParameterizedType parameterizedType = (ParameterizedType) type;
         Class<?> rawType = (Class<?>) parameterizedType.getRawType();
@@ -161,7 +168,7 @@ public class ReflectionUtils {
   }
 
   /**
-   * Instantiate classes that are ImmmutableClasssesGiraphConfigurable
+   * Instantiate classes that are ImmutableClassesGiraphConfigurable
    *
    * @param theClass Class to instantiate
    * @param configuration Giraph configuration, may be null

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
deleted file mode 100644
index cdf662e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive double array
- */
-public class UnmodifiableDoubleArrayIterator extends
-    UnmodifiableIterator<DoubleWritable> {
-  /** Array to iterate over */
-  private final double[] doubleArray;
-  /** Offset to array */
-  private int offset;
-
-  /**
-   * Constructor with array to iterate over.
-   * @param doubleArray Array to iterate over.
-   */
-  public UnmodifiableDoubleArrayIterator(double[] doubleArray) {
-    this.doubleArray = doubleArray;
-    offset = 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return offset < doubleArray.length;
-  }
-
-  @Override
-  public DoubleWritable next() {
-    return new DoubleWritable(doubleArray[offset++]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
deleted file mode 100644
index c9ba0ef..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * {@link UnmodifiableIterator} over a primitive int array
- */
-public class UnmodifiableIntArrayIterator extends
-    UnmodifiableIterator<IntWritable> {
-  /** Array to iterate over */
-  private final int[] intArray;
-  /** Offset to array */
-  private int offset;
-
-  /**
-   * Constructor with array to iterate over.
-   *
-   * @param intArray Array to iterate over.
-   */
-  public UnmodifiableIntArrayIterator(int[] intArray) {
-    this.intArray = intArray;
-    offset = 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return offset < intArray.length;
-  }
-
-  @Override
-  public IntWritable next() {
-    return new IntWritable(intArray[offset++]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
deleted file mode 100644
index c580f9c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.LongWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive long array
- */
-public class UnmodifiableLongArrayIterator extends
-    UnmodifiableIterator<LongWritable> {
-  /** Array to iterate over */
-  private final long[] longArray;
-  /** Offset to array */
-  private int offset;
-
-  /**
-   * Constructor with array to iterate over.
-   * @param longArray Array to iterate over.
-   */
-  public UnmodifiableLongArrayIterator(long[] longArray) {
-    this.longArray = longArray;
-    offset = 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return offset < longArray.length;
-  }
-
-  @Override
-  public LongWritable next() {
-    return new LongWritable(longArray[offset++]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
deleted file mode 100644
index b767058..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import java.util.Iterator;
-
-/**
- * {@link UnmodifiableIterator} over a pair of primitive long-float arrays.
- */
-public class UnmodifiableLongFloatEdgeArrayIterable extends
-    UnmodifiableIterator<Edge<LongWritable, FloatWritable>> implements
-    Iterable<Edge<LongWritable, FloatWritable>> {
-  /** Array of IDs to iterate over */
-  private final long[] longArray;
-  /** Arrays of weights iterate over */
-  private final float[] floatArray;
-  /** Offset to array */
-  private int offset;
-
-  /**
-   * Constructor with arrays to iterate over.
-   * @param longArray Array of IDs to iterate over.
-   * @param floatArray Array of weights to iterate over.
-   */
-  public UnmodifiableLongFloatEdgeArrayIterable(final long[] longArray,
-      final float[] floatArray) {
-    this.longArray = longArray;
-    this.floatArray = floatArray;
-    offset = 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return offset < longArray.length;
-  }
-
-  @Override
-  public Edge<LongWritable, FloatWritable> next() {
-    Edge<LongWritable, FloatWritable> retval =
-        EdgeFactory.create(new LongWritable(longArray[offset]),
-            new FloatWritable(floatArray[offset]));
-    offset++;
-    return retval;
-  }
-
-  @Override
-  public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
deleted file mode 100644
index 18f280a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import java.util.Iterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive long array with NullWritable
- * edges.
- */
-public class UnmodifiableLongNullEdgeArrayIterable extends
-    UnmodifiableIterator<Edge<LongWritable, NullWritable>> implements
-    Iterable<Edge<LongWritable, NullWritable>> {
-  /** Arrays to iterate over */
-  private final long[] longArray;
-  /** Offset to array */
-  private int offset;
-
-  /**
-   * Constructor with array to iterate over.
-   * @param longArray Array to iterate over.
-   */
-  public UnmodifiableLongNullEdgeArrayIterable(final long[] longArray) {
-    this.longArray = longArray;
-    offset = 0;
-  }
-
-  @Override
-  public Iterator<Edge<LongWritable, NullWritable>> iterator() {
-    return this;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return offset < longArray.length;
-  }
-
-  @Override
-  public Edge<LongWritable, NullWritable> next() {
-    Edge<LongWritable, NullWritable> retval =
-        EdgeFactory.create(new LongWritable(longArray[offset]));
-    offset++;
-    return retval;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index da6d6cd..9ff1242 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -116,7 +116,7 @@ public class UnsafeByteArrayOutputStream extends OutputStream
    * @param pos Position to write at the buffer
    */
   public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
-    this.buf = buf;
+    this(buf);
     this.pos = pos;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 129923d..6e7b87a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
deleted file mode 100644
index 1e56b20..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * A vertex whose edges are backed by a byte-array.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class ByteArrayVertex<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends ByteArrayVertexBase<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(ByteArrayVertex.class);
-
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    // Note that this is very expensive (deserializes all edges
-    // in an addEdge() request).
-    // Hopefully the user set all the edges in setEdges().
-    for (Edge<I, E> currentEdge : getEdges()) {
-      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
-        LOG.warn("addEdge: Vertex=" + getId() +
-            ": already added an edge value for target vertex id " +
-            edge.getTargetVertexId());
-        return false;
-      }
-    }
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public final int removeEdges(I targetVertexId) {
-    return removeFirstEdge(targetVertexId) ? 1 : 0;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
deleted file mode 100644
index 26c3f62..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.utils.ByteArrayEdges;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Common base class for byte-array backed vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class ByteArrayVertexBase<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends MutableVertex<I, V, E, M> {
-  /** Serialized edge list. */
-  private ByteArrayEdges<I, E> edges;
-
-  /**
-   * Append an edge to the serialized representation.
-   *
-   * @param edge Edge to append
-   */
-  protected void appendEdge(Edge<I, E> edge) {
-    edges.appendEdge(edge);
-  }
-
-  /**
-   * Remove the first edge pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return True if one such edge was found and removed.
-   */
-  protected boolean removeFirstEdge(I targetVertexId) {
-    return edges.removeFirstEdge(targetVertexId);
-  }
-
-  /**
-   * Remove all edges pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return The number of removed edges
-   */
-  protected int removeAllEdges(I targetVertexId) {
-    return edges.removeAllEdges(targetVertexId);
-  }
-
-  @Override
-  public final void setEdges(Iterable<Edge<I, E>> edges) {
-    // If the edge iterable is backed by a byte-array,
-    // we simply get a shallow copy of it.
-    if (edges instanceof ByteArrayEdges) {
-      this.edges = new ByteArrayEdges<I, E>((ByteArrayEdges<I, E>) edges);
-    } else {
-      this.edges = new ByteArrayEdges<I, E>(getConf());
-      this.edges.setEdges(edges);
-    }
-  }
-
-  @Override
-  public final Iterable<Edge<I, E>> getEdges() {
-    return edges;
-  }
-
-  @Override
-  public final int getNumEdges() {
-    return edges.getNumEdges();
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    I vertexId = getId();
-    if (vertexId == null) {
-      vertexId = getConf().createVertexId();
-    }
-    vertexId.readFields(in);
-
-    V vertexValue = getValue();
-    if (vertexValue == null) {
-      vertexValue = getConf().createVertexValue();
-    }
-    vertexValue.readFields(in);
-
-    initialize(vertexId, vertexValue);
-
-    edges.readFields(in);
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    edges.write(out);
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
deleted file mode 100644
index 882bbb8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.Iterator;
-
-/**
- * User applications can subclass {@link EdgeListVertex}, which stores
- * the outbound edges in an ArrayList (less memory as the cost of expensive
- * random-access lookup).  Good for static graphs.  Not nearly as memory
- * efficient as using ByteArrayVertex + ByteArrayPartition
- * (probably about 10x more), but not bad when keeping vertices as objects in
- * memory (SimplePartition).
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class EdgeListVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends EdgeListVertexBase<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    for (Edge<I, E> currentEdge : getEdges()) {
-      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
-        LOG.warn("addEdge: Vertex=" + getId() +
-            ": already added an edge value for target vertex id " +
-            edge.getTargetVertexId());
-        return false;
-      }
-    }
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public int removeEdges(I targetVertexId) {
-    for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
-      Edge<I, E> edge = edges.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        edges.remove();
-        return 1;
-      }
-    }
-    return 0;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
deleted file mode 100644
index ec04569..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Common base class for edge-list backed vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class EdgeListVertexBase<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends MutableVertex<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(EdgeListVertexBase.class);
-  /** List of edges */
-  private List<Edge<I, E>> edgeList = Lists.newArrayList();
-
-  /**
-   * Append an edge to the list.
-   *
-   * @param edge Edge to append
-   */
-  protected void appendEdge(Edge<I, E> edge) {
-    edgeList.add(edge);
-  }
-
-  @Override
-  public void setEdges(Iterable<Edge<I, E>> edges) {
-    edgeList.clear();
-    Iterables.addAll(edgeList, edges);
-  }
-
-  @Override
-  public Iterable<Edge<I, E>> getEdges() {
-    return edgeList;
-  }
-
-  @Override
-  public int getNumEdges() {
-    return edgeList.size();
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    I vertexId = getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = getConf().createVertexValue();
-    vertexValue.readFields(in);
-    initialize(vertexId, vertexValue);
-
-    int numEdges = in.readInt();
-    edgeList = Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      Edge<I, E> edge = getConf().createEdge();
-      WritableUtils.readEdge(in, edge);
-      edgeList.add(edge);
-    }
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(edgeList.size());
-    for (Edge<I, E> edge : edgeList) {
-      edge.getTargetVertexId().write(out);
-      edge.getValue().write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
deleted file mode 100644
index 2160c3b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * User applications can subclass {@link HashMapVertex}, which stores
- * the outbound edges in a HashMap, for efficient edge random-access.  Note
- * that {@link EdgeListVertex} is much more memory efficient for static graphs.
- * User applications which need to implement their own
- * in-memory data structures should subclass {@link MutableVertex}.
- *
- * Package access will prevent users from accessing internal methods.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class HashMapVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends MutableVertex<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
-  /** Map of target vertices and their edge values */
-  protected Map<I, E> edgeMap = new HashMap<I, E>();
-
-  @Override
-  public void setEdges(Iterable<Edge<I, E>> edges) {
-    edgeMap.clear();
-    for (Edge<I, E> edge : edges) {
-      edgeMap.put(edge.getTargetVertexId(), edge.getValue());
-    }
-  }
-
-  @Override
-  public boolean addEdge(Edge<I, E> edge) {
-    if (edgeMap.put(edge.getTargetVertexId(), edge.getValue()) != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("addEdge: Vertex=" + getId() +
-            ": already added an edge value for target vertex id " +
-            edge.getTargetVertexId());
-      }
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  @Override
-  public boolean hasEdge(I targetVertexId) {
-    return edgeMap.containsKey(targetVertexId);
-  }
-
-  /**
-   * Get an iterator to the edges on this vertex.
-   *
-   * @return A <em>sorted</em> iterator, as defined by the sort-order
-   *         of the vertex ids
-   */
-  @Override
-  public Iterable<Edge<I, E>> getEdges() {
-    return Iterables.transform(edgeMap.entrySet(),
-        new Function<Map.Entry<I, E>, Edge<I, E>>() {
-
-          @Override
-          public Edge<I, E> apply(Map.Entry<I, E> edge) {
-            return EdgeFactory.create(edge.getKey(), edge.getValue());
-          }
-        });
-  }
-
-  @Override
-  public E getEdgeValue(I targetVertexId) {
-    return edgeMap.get(targetVertexId);
-  }
-
-  @Override
-  public int getNumEdges() {
-    return edgeMap.size();
-  }
-
-  @Override
-  public int removeEdges(I targetVertexId) {
-    return edgeMap.remove(targetVertexId) != null ? 1 : 0;
-  }
-
-  @Override
-  public final void sendMessageToAllEdges(M message) {
-    for (I targetVertexId : edgeMap.keySet()) {
-      sendMessage(targetVertexId, message);
-    }
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    I vertexId = getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = getConf().createVertexValue();
-    vertexValue.readFields(in);
-    initialize(vertexId, vertexValue);
-
-    int numEdges = in.readInt();
-    edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = getConf().createVertexId();
-      targetVertexId.readFields(in);
-      E edgeValue = getConf().createEdgeValue();
-      edgeValue.readFields(in);
-      edgeMap.put(targetVertexId, edgeValue);
-    }
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(edgeMap.size());
-    for (Map.Entry<I, E> edge : edgeMap.entrySet()) {
-      edge.getKey().write(out);
-      edge.getValue().write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
deleted file mode 100644
index a2090e8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import com.google.common.collect.Iterables;
-import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
-import org.apache.hadoop.io.IntWritable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Simple implementation of {@link Vertex} using an int as id, value and
- * message.  Edges are immutable and unweighted. This class aims to be as
- * memory efficient as possible.
- */
-public abstract class IntIntNullIntVertex extends
-    SimpleVertex<IntWritable, IntWritable, IntWritable> {
-  /** Int array of neighbor vertex ids */
-  private int[] neighbors;
-
-  @Override
-  public void setNeighbors(Iterable<IntWritable> neighbors) {
-    this.neighbors =
-        new int[(neighbors != null) ? Iterables.size(neighbors) : 0];
-    int n = 0;
-    if (neighbors != null) {
-      for (IntWritable neighbor : neighbors) {
-        this.neighbors[n++] = neighbor.get();
-      }
-    }
-  }
-
-  @Override
-  public Iterable<IntWritable> getNeighbors() {
-    return new Iterable<IntWritable>() {
-      @Override
-      public Iterator<IntWritable> iterator() {
-        return new UnmodifiableIntArrayIterator(neighbors);
-      }
-    };
-  }
-
-  @Override
-  public boolean hasEdge(IntWritable targetVertexId) {
-    for (int neighbor : neighbors) {
-      if (neighbor == targetVertexId.get()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int getNumEdges() {
-    return neighbors.length;
-  }
-
-  @Override
-  public void write(final DataOutput out) throws IOException {
-    out.writeInt(getId().get());
-    out.writeInt(getValue().get());
-    out.writeInt(neighbors.length);
-    for (int n = 0; n < neighbors.length; n++) {
-      out.writeInt(neighbors[n]);
-    }
-    out.writeBoolean(isHalted());
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int id = in.readInt();
-    int value = in.readInt();
-    initialize(new IntWritable(id), new IntWritable(value));
-    int numEdges = in.readInt();
-    neighbors = new int[numEdges];
-    for (int n = 0; n < numEdges; n++) {
-      neighbors[n] = in.readInt();
-    }
-    readHaltBoolean(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
deleted file mode 100644
index f36f6db..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * A vertex with no value, edges, or messages. Just an ID, nothing more.
- */
-public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
-    NullWritable, NullWritable, NullWritable> {
-  @Override
-  public void setEdges(Iterable<Edge<IntWritable, NullWritable>> edges) { }
-
-  @Override
-  public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    out.writeBoolean(isHalted());
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int id = in.readInt();
-    initialize(new IntWritable(id), NullWritable.get());
-    boolean halt = in.readBoolean();
-    if (halt) {
-      voteToHalt();
-    } else {
-      wakeUp();
-    }
-  }
-}