You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [10/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Thu Feb 16 22:12:31 2012
@@ -31,424 +31,479 @@ import org.apache.hadoop.util.Reflection
  * instantiate them.
  */
 public class BspUtils {
-    /**
-     * Get the user's subclassed {@link GraphPartitionerFactory}.
-     *
-     * @param conf Configuration to check
-     * @return User's graph partitioner
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable, V extends Writable,
-                   E extends Writable, M extends Writable>
-            Class<? extends GraphPartitionerFactory<I, V, E, M>>
-            getGraphPartitionerClass(Configuration conf) {
-        return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
-                conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
-                              HashPartitionerFactory.class,
-                              GraphPartitionerFactory.class);
-    }
-
-    /**
-     * Create a user graph partitioner class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user graph partitioner class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable>
-            GraphPartitionerFactory<I, V, E, M>
-            createGraphPartitioner(Configuration conf) {
-        Class<? extends GraphPartitionerFactory<I, V, E, M>>
-            graphPartitionerFactoryClass =
-            getGraphPartitionerClass(conf);
-        return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
-    }
-
-    /**
-     * Create a user graph partitioner partition stats class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user graph partition stats class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable>
-            PartitionStats createGraphPartitionStats(Configuration conf) {
-        GraphPartitionerFactory<I, V, E, M> graphPartitioner =
-            createGraphPartitioner(conf);
-        return graphPartitioner.createMasterGraphPartitioner().
-            createPartitionStats();
-    }
-
-    /**
-     * Get the user's subclassed {@link VertexInputFormat}.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex input format class
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable,
-                   M extends Writable>
-            Class<? extends VertexInputFormat<I, V, E, M>>
-            getVertexInputFormatClass(Configuration conf) {
-        return (Class<? extends VertexInputFormat<I, V, E, M>>)
-                conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
-                              null,
-                              VertexInputFormat.class);
-    }
-
-    /**
-     * Create a user vertex input format class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex input format class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable,
-                   M extends Writable> VertexInputFormat<I, V, E, M>
-            createVertexInputFormat(Configuration conf) {
-        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
-            getVertexInputFormatClass(conf);
-        VertexInputFormat<I, V, E, M> inputFormat =
-            ReflectionUtils.newInstance(vertexInputFormatClass, conf);
-        return inputFormat;
-    }
-
-    /**
-     * Get the user's subclassed {@link VertexOutputFormat}.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex output format class
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable>
-            Class<? extends VertexOutputFormat<I, V, E>>
-            getVertexOutputFormatClass(Configuration conf) {
-        return (Class<? extends VertexOutputFormat<I, V, E>>)
-                conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
-                              null,
-                              VertexOutputFormat.class);
-    }
-
-    /**
-     * Create a user vertex output format class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex output format class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable> VertexOutputFormat<I, V, E>
-            createVertexOutputFormat(Configuration conf) {
-        Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
-            getVertexOutputFormatClass(conf);
-        return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
-    }
-
-    /**
-     * Get the user's subclassed {@link AggregatorWriter}.
-     *
-     * @param conf Configuration to check
-     * @return User's aggregator writer class
-     */
-    public static Class<? extends AggregatorWriter>
-            getAggregatorWriterClass(Configuration conf) {
-        return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
-                             TextAggregatorWriter.class,
-                             AggregatorWriter.class);
-    }
-
-    /**
-     * Create a user aggregator output format class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user aggregator writer class
-     */
-    public static AggregatorWriter
-            createAggregatorWriter(Configuration conf) {
-        Class<? extends AggregatorWriter> aggregatorWriterClass =
-            getAggregatorWriterClass(conf);
-        return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
-    }
-
-    /**
-     * Get the user's subclassed {@link VertexCombiner}.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex combiner class
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable,
-                   M extends Writable>
-            Class<? extends VertexCombiner<I, M>>
-            getVertexCombinerClass(Configuration conf) {
-        return (Class<? extends VertexCombiner<I, M>>)
-                conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
-                              null,
-                              VertexCombiner.class);
-    }
-
-    /**
-     * Create a user vertex combiner class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex combiner class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, M extends Writable>
-            VertexCombiner<I, M> createVertexCombiner(Configuration conf) {
-        Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
-            getVertexCombinerClass(conf);
-        return ReflectionUtils.newInstance(vertexCombinerClass, conf);
-    }
-
-    /**
-     * Get the user's subclassed VertexResolver.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex resolver class
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable,
-                   M extends Writable>
-            Class<? extends VertexResolver<I, V, E, M>>
-            getVertexResolverClass(Configuration conf) {
-        return (Class<? extends VertexResolver<I, V, E, M>>)
-                conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
-                              VertexResolver.class,
-                              VertexResolver.class);
-    }
-
-    /**
-     * Create a user vertex revolver
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex resolver
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
-            createVertexResolver(Configuration conf,
-                                 GraphState<I, V, E, M> graphState) {
-        Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
-            getVertexResolverClass(conf);
-        VertexResolver<I, V, E, M> resolver =
-            ReflectionUtils.newInstance(vertexResolverClass, conf);
-        resolver.setGraphState(graphState);
-        return resolver;
-    }
-
-   /**
-     * Get the user's subclassed WorkerContext.
-     *
-     * @param conf Configuration to check
-     * @return User's worker context class
-     */
-	public static Class<? extends WorkerContext>
-            getWorkerContextClass(Configuration conf) {
-        return (Class<? extends WorkerContext>)
-                conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
-                              DefaultWorkerContext.class,
-                              WorkerContext.class);
-    }
-
-   /**
-     * Create a user worker context
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user worker context
-     */
-    @SuppressWarnings("rawtypes")
-	public static <I extends WritableComparable,
-    			   V extends Writable,
-    			   E extends Writable,
-    			   M extends Writable>
-    		WorkerContext createWorkerContext(Configuration conf,
-                GraphState<I, V, E, M> graphState) {
-        Class<? extends WorkerContext> workerContextClass =
-            getWorkerContextClass(conf);
-        WorkerContext workerContext =
-             ReflectionUtils.newInstance(workerContextClass, conf);
-        workerContext.setGraphState(graphState);
-        return workerContext;
-    }
-
-
-    /**
-     * Get the user's subclassed {@link BasicVertex}
-     *
-     * @param conf Configuration to check
-     * @return User's vertex class
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable,
-                   M extends Writable>
-            Class<? extends BasicVertex<I, V, E, M>>
-            getVertexClass(Configuration conf) {
-        return (Class<? extends BasicVertex<I, V, E, M>>)
-                conf.getClass(GiraphJob.VERTEX_CLASS,
-                              null,
-                              BasicVertex.class);
-    }
-
-    /**
-     * Create a user vertex
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
-            createVertex(Configuration conf) {
-        Class<? extends BasicVertex<I, V, E, M>> vertexClass =
-            getVertexClass(conf);
-        BasicVertex<I, V, E, M> vertex =
-            ReflectionUtils.newInstance(vertexClass, conf);
-        return vertex;
-    }
-
-    /**
-     * Get the user's subclassed vertex index class.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex index class
-     */
-    @SuppressWarnings("unchecked")
-    public static <I extends Writable> Class<I>
-            getVertexIndexClass(Configuration conf) {
-        return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
-                                        WritableComparable.class);
-    }
-
-    /**
-     * Create a user vertex index
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex index
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable>
-            I createVertexIndex(Configuration conf) {
-        Class<I> vertexClass = getVertexIndexClass(conf);
-        try {
-            return vertexClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new IllegalArgumentException(
-                "createVertexIndex: Failed to instantiate", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException(
-                "createVertexIndex: Illegally accessed", e);
-        }
-    }
-
-    /**
-     * Get the user's subclassed vertex value class.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex value class
-     */
-    @SuppressWarnings("unchecked")
-    public static <V extends Writable> Class<V>
-            getVertexValueClass(Configuration conf) {
-        return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
-                                        Writable.class);
-    }
-
-    /**
-     * Create a user vertex value
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex value
-     */
-    public static <V extends Writable> V
-            createVertexValue(Configuration conf) {
-        Class<V> vertexValueClass = getVertexValueClass(conf);
-        try {
-            return vertexValueClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new IllegalArgumentException(
-                "createVertexValue: Failed to instantiate", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException(
-                "createVertexValue: Illegally accessed", e);
-        }
-    }
-
-    /**
-     * Get the user's subclassed edge value class.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex edge value class
-     */
-    @SuppressWarnings("unchecked")
-    public static <E extends Writable> Class<E>
-            getEdgeValueClass(Configuration conf){
-        return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
-                                        Writable.class);
-    }
-
-    /**
-     * Create a user edge value
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user edge value
-     */
-    public static <E extends Writable> E
-            createEdgeValue(Configuration conf) {
-        Class<E> edgeValueClass = getEdgeValueClass(conf);
-        try {
-            return edgeValueClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new IllegalArgumentException(
-                "createEdgeValue: Failed to instantiate", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException(
-                "createEdgeValue: Illegally accessed", e);
-        }
-    }
-
-    /**
-     * Get the user's subclassed vertex message value class.
-     *
-     * @param conf Configuration to check
-     * @return User's vertex message value class
-     */
-    @SuppressWarnings("unchecked")
-    public static <M extends Writable> Class<M>
-            getMessageValueClass(Configuration conf) {
-        return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
-                                        Writable.class);
-    }
-
-    /**
-     * Create a user vertex message value
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex message value
-     */
-    public static <M extends Writable> M
-            createMessageValue(Configuration conf) {
-        Class<M> messageValueClass = getMessageValueClass(conf);
-        try {
-            return messageValueClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new IllegalArgumentException(
-                "createMessageValue: Failed to instantiate", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalArgumentException(
-                "createMessageValue: Illegally accessed", e);
-        }
+  /**
+   * Do not construct.
+   */
+  private BspUtils() { }
+
+  /**
+   * Get the user's subclassed {@link GraphPartitionerFactory}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's graph partitioner
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  getGraphPartitionerClass(Configuration conf) {
+    return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+      conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+        HashPartitionerFactory.class,
+        GraphPartitionerFactory.class);
+  }
+
+  /**
+   * Create a user graph partitioner class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user graph partitioner class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  GraphPartitionerFactory<I, V, E, M>
+  createGraphPartitioner(Configuration conf) {
+    Class<? extends GraphPartitionerFactory<I, V, E, M>>
+    graphPartitionerFactoryClass = getGraphPartitionerClass(conf);
+    return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
+  }
+
+  /**
+   * Create a user graph partitioner partition stats class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user graph partition stats class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  PartitionStats createGraphPartitionStats(Configuration conf) {
+    GraphPartitionerFactory<I, V, E, M> graphPartitioner =
+      createGraphPartitioner(conf);
+    return graphPartitioner.createMasterGraphPartitioner().
+      createPartitionStats();
+  }
+
+  /**
+   * Get the user's subclassed {@link VertexInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex input format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable,
+  M extends Writable>
+  Class<? extends VertexInputFormat<I, V, E, M>>
+  getVertexInputFormatClass(Configuration conf) {
+    return (Class<? extends VertexInputFormat<I, V, E, M>>)
+      conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
+        null,
+        VertexInputFormat.class);
+  }
+
+  /**
+   * Create a user vertex input format class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex input format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable,
+  M extends Writable> VertexInputFormat<I, V, E, M>
+  createVertexInputFormat(Configuration conf) {
+    Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+      getVertexInputFormatClass(conf);
+    VertexInputFormat<I, V, E, M> inputFormat =
+      ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+    return inputFormat;
+  }
+
+  /**
+   * Get the user's subclassed {@link VertexOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's vertex output format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable>
+  Class<? extends VertexOutputFormat<I, V, E>>
+  getVertexOutputFormatClass(Configuration conf) {
+    return (Class<? extends VertexOutputFormat<I, V, E>>)
+      conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
+        null,
+        VertexOutputFormat.class);
+  }
+
+  /**
+   * Create a user vertex output format class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex output format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable> VertexOutputFormat<I, V, E>
+  createVertexOutputFormat(Configuration conf) {
+    Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
+      getVertexOutputFormatClass(conf);
+    return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+  }
+
+  /**
+   * Get the user's subclassed {@link AggregatorWriter}.
+   *
+   * @param conf Configuration to check
+   * @return User's aggregator writer class
+   */
+  public static Class<? extends AggregatorWriter>
+  getAggregatorWriterClass(Configuration conf) {
+    return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+      TextAggregatorWriter.class,
+      AggregatorWriter.class);
+  }
+
+  /**
+   * Create a user aggregator output format class
+   *
+   * @param conf Configuration to check
+   * @return Instantiated user aggregator writer class
+   */
+  public static AggregatorWriter createAggregatorWriter(Configuration conf) {
+    Class<? extends AggregatorWriter> aggregatorWriterClass =
+      getAggregatorWriterClass(conf);
+    return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
+  }
+
+  /**
+   * Get the user's subclassed {@link VertexCombiner}.
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex combiner class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, M extends Writable>
+  Class<? extends VertexCombiner<I, M>>
+  getVertexCombinerClass(Configuration conf) {
+    return (Class<? extends VertexCombiner<I, M>>)
+      conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
+        null,
+        VertexCombiner.class);
+  }
+
+  /**
+   * Create a user vertex combiner class
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex combiner class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, M extends Writable>
+  VertexCombiner<I, M> createVertexCombiner(Configuration conf) {
+    Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
+      getVertexCombinerClass(conf);
+    return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+  }
+
+  /**
+   * Get the user's subclassed VertexResolver.
+   *
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex resolver class
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends VertexResolver<I, V, E, M>>
+  getVertexResolverClass(Configuration conf) {
+    return (Class<? extends VertexResolver<I, V, E, M>>)
+      conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
+        VertexResolver.class,
+        VertexResolver.class);
+  }
+
+  /**
+   * Create a user vertex revolver
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user vertex resolver
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
+  createVertexResolver(Configuration conf,
+    GraphState<I, V, E, M> graphState) {
+    Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
+      getVertexResolverClass(conf);
+    VertexResolver<I, V, E, M> resolver =
+      ReflectionUtils.newInstance(vertexResolverClass, conf);
+    resolver.setGraphState(graphState);
+    return resolver;
+  }
+
+  /**
+   * Get the user's subclassed WorkerContext.
+   *
+   * @param conf Configuration to check
+   * @return User's worker context class
+   */
+  public static Class<? extends WorkerContext>
+  getWorkerContextClass(Configuration conf) {
+    return (Class<? extends WorkerContext>)
+      conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+        DefaultWorkerContext.class,
+        WorkerContext.class);
+  }
+
+  /**
+   * Create a user worker context
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user worker context
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  WorkerContext createWorkerContext(Configuration conf,
+    GraphState<I, V, E, M> graphState) {
+    Class<? extends WorkerContext> workerContextClass =
+      getWorkerContextClass(conf);
+    WorkerContext workerContext =
+      ReflectionUtils.newInstance(workerContextClass, conf);
+    workerContext.setGraphState(graphState);
+    return workerContext;
+  }
+
+
+  /**
+   * Get the user's subclassed {@link BasicVertex}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends BasicVertex<I, V, E, M>> getVertexClass(Configuration conf) {
+    return (Class<? extends BasicVertex<I, V, E, M>>)
+      conf.getClass(GiraphJob.VERTEX_CLASS,
+        null,
+        BasicVertex.class);
+  }
+
+  /**
+   * Create a user vertex
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
+  createVertex(Configuration conf) {
+    Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+    BasicVertex<I, V, E, M> vertex =
+      ReflectionUtils.newInstance(vertexClass, conf);
+    return vertex;
+  }
+
+  /**
+   * Get the user's subclassed vertex index class.
+   *
+   * @param <I> Vertex id
+   * @param conf Configuration to check
+   * @return User's vertex index class
+   */
+  @SuppressWarnings("unchecked")
+  public static <I extends Writable> Class<I>
+  getVertexIndexClass(Configuration conf) {
+    return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
+      WritableComparable.class);
+  }
+
+  /**
+   * Create a user vertex index
+   *
+   * @param <I> Vertex id
+   * @param conf Configuration to check
+   * @return Instantiated user vertex index
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable>
+  I createVertexIndex(Configuration conf) {
+    Class<I> vertexClass = getVertexIndexClass(conf);
+    try {
+      return vertexClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+        "createVertexIndex: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+        "createVertexIndex: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex value class.
+   *
+   * @param <V> Vertex data
+   * @param conf Configuration to check
+   * @return User's vertex value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <V extends Writable> Class<V>
+  getVertexValueClass(Configuration conf) {
+    return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
+      Writable.class);
+  }
+
+  /**
+   * Create a user vertex value
+   *
+   * @param <V> Vertex data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex value
+   */
+  public static <V extends Writable> V
+  createVertexValue(Configuration conf) {
+    Class<V> vertexValueClass = getVertexValueClass(conf);
+    try {
+      return vertexValueClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+        "createVertexValue: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+        "createVertexValue: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed edge value class.
+   *
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's vertex edge value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Writable> Class<E>
+  getEdgeValueClass(Configuration conf) {
+    return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
+      Writable.class);
+  }
+
+  /**
+   * Create a user edge value
+   *
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user edge value
+   */
+  public static <E extends Writable> E
+  createEdgeValue(Configuration conf) {
+    Class<E> edgeValueClass = getEdgeValueClass(conf);
+    try {
+      return edgeValueClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+        "createEdgeValue: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+        "createEdgeValue: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex message value class.
+   *
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex message value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <M extends Writable> Class<M>
+  getMessageValueClass(Configuration conf) {
+    return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
+      Writable.class);
+  }
+
+  /**
+   * Create a user vertex message value
+   *
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex message value
+   */
+  public static <M extends Writable> M
+  createMessageValue(Configuration conf) {
+    Class<M> messageValueClass = getMessageValueClass(conf);
+    try {
+      return messageValueClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+        "createMessageValue: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+        "createMessageValue: Illegally accessed", e);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultWorkerContext.java Thu Feb 16 22:12:31 2012
@@ -25,17 +25,17 @@ package org.apache.giraph.graph;
  */
 public class DefaultWorkerContext extends WorkerContext {
 
-	@Override
-	public void preApplication() throws InstantiationException,
-            IllegalAccessException {
-    }
+  @Override
+  public void preApplication()
+    throws InstantiationException, IllegalAccessException {
+  }
 
-    @Override
-    public void postApplication() { }
+  @Override
+  public void postApplication() { }
 
-    @Override
-    public void preSuperstep() { }
+  @Override
+  public void preSuperstep() { }
 
-    @Override
-    public void postSuperstep() { }
-}
\ No newline at end of file
+  @Override
+  public void postSuperstep() { }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Thu Feb 16 22:12:31 2012
@@ -36,133 +36,138 @@ import java.io.IOException;
  */
 @SuppressWarnings("rawtypes")
 public class Edge<I extends WritableComparable, E extends Writable>
-        implements WritableComparable<Edge<I, E>>, Configurable {
-    /** Destination vertex id */
-    private I destVertexId = null;
-    /** Edge value */
-    private E edgeValue = null;
-    /** Configuration - Used to instantiate classes */
-    private Configuration conf = null;
-
-    /**
-     * Constructor for reflection
-     */
-    public Edge() {}
-
-    /**
-     * Create the edge with final values
-     *
-     * @param destVertexId
-     * @param edgeValue
-     */
-    public Edge(I destVertexId, E edgeValue) {
-        this.destVertexId = destVertexId;
-        this.edgeValue = edgeValue;
-    }
-
-    /**
-     * Get the destination vertex index of this edge
-     *
-     * @return Destination vertex index of this edge
-     */
-    public I getDestVertexId() {
-        return destVertexId;
-    }
-
-    /**
-     * Get the edge value of the edge
-     *
-     * @return Edge value of this edge
-     */
-    public E getEdgeValue() {
-        return edgeValue;
-    }
-
-    /**
-     * Set the destination vertex index of this edge.
-     *
-     * @param destVertexId new destination vertex
-     */
-    public void setDestVertexId(I destVertexId) {
-        this.destVertexId = destVertexId;
-    }
-
-    /**
-     * Set the value for this edge.
-     *
-     * @param edgeValue new edge value
-     */
-    public void setEdgeValue(E edgeValue) {
-        this.edgeValue = edgeValue;
-    }
-
-    @Override
-    public String toString() {
-        return "(DestVertexIndex = " + destVertexId +
-            ", edgeValue = " + edgeValue  + ")";
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        destVertexId = (I) BspUtils.createVertexIndex(getConf());
-        destVertexId.readFields(input);
-        edgeValue = (E) BspUtils.createEdgeValue(getConf());
-        edgeValue.readFields(input);
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        if (destVertexId == null) {
-            throw new IllegalStateException(
-                "write: Null destination vertex index");
-        }
-        if (edgeValue == null) {
-            throw new IllegalStateException(
-                "write: Null edge value");
-        }
-        destVertexId.write(output);
-        edgeValue.write(output);
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public int compareTo(Edge<I, E> edge) {
-        return destVertexId.compareTo(edge.getDestVertexId());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) { return true; }
-        if (o == null || getClass() != o.getClass()) { return false; }
-
-        Edge edge = (Edge) o;
-
-        if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
-            edge.destVertexId != null) {
-            return false;
-        }
-        if (edgeValue != null ? !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = destVertexId != null ? destVertexId.hashCode() : 0;
-        result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
-        return result;
-    }
+    implements WritableComparable<Edge<I, E>>, Configurable {
+  /** Destination vertex id */
+  private I destVertexId = null;
+  /** Edge value */
+  private E edgeValue = null;
+  /** Configuration - Used to instantiate classes */
+  private Configuration conf = null;
+
+  /**
+   * Constructor for reflection
+   */
+  public Edge() { }
+
+  /**
+   * Create the edge with final values
+   *
+   * @param destVertexId Desination vertex id.
+   * @param edgeValue Value of the edge.
+   */
+  public Edge(I destVertexId, E edgeValue) {
+    this.destVertexId = destVertexId;
+    this.edgeValue = edgeValue;
+  }
+
+  /**
+   * Get the destination vertex index of this edge
+   *
+   * @return Destination vertex index of this edge
+   */
+  public I getDestVertexId() {
+    return destVertexId;
+  }
+
+  /**
+   * Get the edge value of the edge
+   *
+   * @return Edge value of this edge
+   */
+  public E getEdgeValue() {
+    return edgeValue;
+  }
+
+  /**
+   * Set the destination vertex index of this edge.
+   *
+   * @param destVertexId new destination vertex
+   */
+  public void setDestVertexId(I destVertexId) {
+    this.destVertexId = destVertexId;
+  }
+
+  /**
+   * Set the value for this edge.
+   *
+   * @param edgeValue new edge value
+   */
+  public void setEdgeValue(E edgeValue) {
+    this.edgeValue = edgeValue;
+  }
+
+  @Override
+  public String toString() {
+    return "(DestVertexIndex = " + destVertexId +
+        ", edgeValue = " + edgeValue  + ")";
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    destVertexId = (I) BspUtils.createVertexIndex(getConf());
+    destVertexId.readFields(input);
+    edgeValue = (E) BspUtils.createEdgeValue(getConf());
+    edgeValue.readFields(input);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    if (destVertexId == null) {
+      throw new IllegalStateException(
+          "write: Null destination vertex index");
+    }
+    if (edgeValue == null) {
+      throw new IllegalStateException(
+          "write: Null edge value");
+    }
+    destVertexId.write(output);
+    edgeValue.write(output);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(Edge<I, E> edge) {
+    return destVertexId.compareTo(edge.getDestVertexId());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Edge edge = (Edge) o;
+
+    if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
+      edge.destVertexId != null) {
+      return false;
+    }
+    if (edgeValue != null ?
+        !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = destVertexId != null ? destVertexId.hashCode() : 0;
+    result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
+    return result;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Feb 16 22:12:31 2012
@@ -48,265 +48,266 @@ import java.util.Map;
  */
 @SuppressWarnings("rawtypes")
 public abstract class EdgeListVertex<I extends WritableComparable,
-        V extends Writable,
-        E extends Writable, M extends Writable>
-        extends MutableVertex<I, V, E, M> {
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
-    /** Vertex id */
-    private I vertexId = null;
-    /** Vertex value */
-    private V vertexValue = null;
-    /** List of the dest edge indices */
-    private List<I> destEdgeIndexList;
-    /** List of the dest edge values */
-    /** Map of destination vertices and their edge values */
-    private List<E> destEdgeValueList;
-    /** List of incoming messages from the previous superstep */
-    private List<M> msgList;
-
-    @Override
-    public void initialize(I vertexId, V vertexValue,
-                           Map<I, E> edges,
-                           Iterable<M> messages) {
-        if (vertexId != null) {
-            setVertexId(vertexId);
-        }
-        if (vertexValue != null) {
-            setVertexValue(vertexValue);
-        }
-        if (edges != null && !edges.isEmpty()) {
-            destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
-            destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
-            List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
-            Collections.sort(sortedIndexList, new VertexIdComparator());
-            for (I index : sortedIndexList) {
-                destEdgeIndexList.add(index);
-                destEdgeValueList.add(edges.get(index));
-            }
-            sortedIndexList.clear();
-        } else {
-            destEdgeIndexList = Lists.newArrayListWithCapacity(0);
-            destEdgeValueList = Lists.newArrayListWithCapacity(0);
-        }
-        if (messages != null) {
-            msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
-            Iterables.<M>addAll(msgList, messages);
-        } else {
-            msgList = Lists.newArrayListWithCapacity(0);
-        }
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (other instanceof EdgeListVertex) {
-            @SuppressWarnings("unchecked")
-            EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
-            if (!getVertexId().equals(otherVertex.getVertexId())) {
-                return false;
-            }
-            if (!getVertexValue().equals(otherVertex.getVertexValue())) {
-                return false;
-            }
-            if (!ComparisonUtils.equal(getMessages(),
-                    otherVertex.getMessages())) {
-                return false;
-            }
-            return ComparisonUtils.equal(iterator(), otherVertex.iterator());
-        }
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
+  /** Vertex id */
+  private I vertexId = null;
+  /** Vertex value */
+  private V vertexValue = null;
+  /** List of the dest edge indices */
+  private List<I> destEdgeIndexList;
+  /** List of the dest edge values */
+  private List<E> destEdgeValueList;
+  /** List of incoming messages from the previous superstep */
+  private List<M> msgList;
+
+  @Override
+  public void initialize(I vertexId, V vertexValue,
+      Map<I, E> edges,
+      Iterable<M> messages) {
+    if (vertexId != null) {
+      setVertexId(vertexId);
+    }
+    if (vertexValue != null) {
+      setVertexValue(vertexValue);
+    }
+    if (edges != null && !edges.isEmpty()) {
+      destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
+      destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
+      List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
+      Collections.sort(sortedIndexList, new VertexIdComparator());
+      for (I index : sortedIndexList) {
+        destEdgeIndexList.add(index);
+        destEdgeValueList.add(edges.get(index));
+      }
+      sortedIndexList.clear();
+    } else {
+      destEdgeIndexList = Lists.newArrayListWithCapacity(0);
+      destEdgeValueList = Lists.newArrayListWithCapacity(0);
+    }
+    if (messages != null) {
+      msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
+      Iterables.<M>addAll(msgList, messages);
+    } else {
+      msgList = Lists.newArrayListWithCapacity(0);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return vertexId.hashCode() * 37 + vertexValue.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof EdgeListVertex) {
+      @SuppressWarnings("unchecked")
+      EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
+      if (!getVertexId().equals(otherVertex.getVertexId())) {
         return false;
+      }
+      if (!getVertexValue().equals(otherVertex.getVertexValue())) {
+        return false;
+      }
+      if (!ComparisonUtils.equal(getMessages(),
+          otherVertex.getMessages())) {
+        return false;
+      }
+      return ComparisonUtils.equal(iterator(), otherVertex.iterator());
     }
+    return false;
+  }
 
-    /**
-     * Comparator for the vertex id
-     */
-    private class VertexIdComparator implements Comparator<I> {
-        @SuppressWarnings("unchecked")
-        @Override
-        public int compare(I index1, I index2) {
-            return index1.compareTo(index2);
-        }
-    }
-
-    @Override
-    public final boolean addEdge(I targetVertexId, E edgeValue) {
-        System.out.println("addEdge: " + targetVertexId + " " + edgeValue + " " + destEdgeIndexList);
-        int pos = Collections.binarySearch(destEdgeIndexList,
-                                           targetVertexId,
-                                           new VertexIdComparator());
-        if (pos < 0) {
-            destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
-            destEdgeValueList.add(-1 * (pos + 1), edgeValue);
-            return true;
-        } else {
-            LOG.warn("addEdge: Vertex=" + vertexId +
-                     ": already added an edge value for dest vertex id " +
-                     targetVertexId);
-            return false;
-        }
-    }
-
-    @Override
-    public long getSuperstep() {
-        return getGraphState().getSuperstep();
-    }
-
-    @Override
-    public final void setVertexId(I vertexId) {
-        this.vertexId = vertexId;
-    }
-
-    @Override
-    public final I getVertexId() {
-        return vertexId;
-    }
-
-    @Override
-    public final V getVertexValue() {
-        return vertexValue;
-    }
-
-    @Override
-    public final void setVertexValue(V vertexValue) {
-        this.vertexValue = vertexValue;
-    }
-
-    @Override
-    public E getEdgeValue(I targetVertexId) {
-        int pos = Collections.binarySearch(destEdgeIndexList,
-                targetVertexId,
-                new VertexIdComparator());
-        if (pos < 0) {
-            return null;
-        } else {
-            return destEdgeValueList.get(pos);
-        }
-    }
-
-    @Override
-    public boolean hasEdge(I targetVertexId) {
-        int pos = Collections.binarySearch(destEdgeIndexList,
-                targetVertexId,
-                new VertexIdComparator());
-        if (pos < 0) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    /**
-     * Get an iterator to the edges on this vertex.
-     *
-     * @return A <em>sorted</em> iterator, as defined by the sort-order
-     *         of the vertex ids
-     */
-    @Override
-    public Iterator<I> iterator() {
-        return destEdgeIndexList.iterator();
-    }
-
-    @Override
-    public int getNumOutEdges() {
-        return destEdgeIndexList.size();
-    }
-
-    @Override
-    public E removeEdge(I targetVertexId) {
-        int pos = Collections.binarySearch(destEdgeIndexList,
-                targetVertexId,
-                new VertexIdComparator());
-        if (pos < 0) {
-            return null;
-        } else {
-            destEdgeIndexList.remove(pos);
-            return destEdgeValueList.remove(pos);
-        }
-    }
-
-    @Override
-    public final void sendMsgToAllEdges(M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                "sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        for (I index : destEdgeIndexList) {
-            sendMsg(index, msg);
-        }
-    }
-
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        vertexId = BspUtils.<I>createVertexIndex(getConf());
-        vertexId.readFields(in);
-        boolean hasVertexValue = in.readBoolean();
-        if (hasVertexValue) {
-            vertexValue = BspUtils.<V>createVertexValue(getConf());
-            vertexValue.readFields(in);
-        }
-        int edgeListCount = in.readInt();
-        destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
-        destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
-        for (int i = 0; i < edgeListCount; ++i) {
-            I vertexId = BspUtils.<I>createVertexIndex(getConf());
-            E edgeValue = BspUtils.<E>createEdgeValue(getConf());
-            vertexId.readFields(in);
-            edgeValue.readFields(in);
-            destEdgeIndexList.add(vertexId);
-            destEdgeValueList.add(edgeValue);
-        }
-        int msgListSize = in.readInt();
-        msgList = Lists.newArrayListWithCapacity(msgListSize);
-        for (int i = 0; i < msgListSize; ++i) {
-            M msg = BspUtils.<M>createMessageValue(getConf());
-            msg.readFields(in);
-            msgList.add(msg);
-        }
-        halt = in.readBoolean();
-    }
-
-    @Override
-    final public void write(DataOutput out) throws IOException {
-        vertexId.write(out);
-        out.writeBoolean(vertexValue != null);
-        if (vertexValue != null) {
-            vertexValue.write(out);
-        }
-        out.writeInt(destEdgeIndexList.size());
-        for (int i = 0 ; i < destEdgeIndexList.size(); ++i) {
-            destEdgeIndexList.get(i).write(out);
-            destEdgeValueList.get(i).write(out);
-        }
-        out.writeInt(msgList.size());
-        for (M msg : msgList) {
-            msg.write(out);
-        }
-        out.writeBoolean(halt);
-    }
-
-    @Override
-    void putMessages(Iterable<M> messages) {
-        msgList.clear();
-        for (M message : messages) {
-            msgList.add(message);
-        }
-    }
-
-    @Override
-    public Iterable<M> getMessages() {
-        return Iterables.unmodifiableIterable(msgList);
-    }
-
-    @Override
-    void releaseResources() {
-        // Hint to GC to free the messages
-        msgList.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
-            ",#edges=" + getNumOutEdges() + ")";
-    }
+  /**
+   * Comparator for the vertex id
+   */
+  private class VertexIdComparator implements Comparator<I> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compare(I index1, I index2) {
+      return index1.compareTo(index2);
+    }
+  }
+
+  @Override
+  public final boolean addEdge(I targetVertexId, E edgeValue) {
+    int pos = Collections.binarySearch(destEdgeIndexList,
+        targetVertexId,
+        new VertexIdComparator());
+    if (pos < 0) {
+      destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
+      destEdgeValueList.add(-1 * (pos + 1), edgeValue);
+      return true;
+    } else {
+      LOG.warn("addEdge: Vertex=" + vertexId +
+          ": already added an edge value for dest vertex id " +
+          targetVertexId);
+      return false;
+    }
+  }
+
+  @Override
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  @Override
+  public final void setVertexId(I vertexId) {
+    this.vertexId = vertexId;
+  }
+
+  @Override
+  public final I getVertexId() {
+    return vertexId;
+  }
+
+  @Override
+  public final V getVertexValue() {
+    return vertexValue;
+  }
+
+  @Override
+  public final void setVertexValue(V vertexValue) {
+    this.vertexValue = vertexValue;
+  }
+
+  @Override
+  public E getEdgeValue(I targetVertexId) {
+    int pos = Collections.binarySearch(destEdgeIndexList,
+        targetVertexId,
+        new VertexIdComparator());
+    if (pos < 0) {
+      return null;
+    } else {
+      return destEdgeValueList.get(pos);
+    }
+  }
+
+  @Override
+  public boolean hasEdge(I targetVertexId) {
+    int pos = Collections.binarySearch(destEdgeIndexList,
+        targetVertexId,
+        new VertexIdComparator());
+    if (pos < 0) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Get an iterator to the edges on this vertex.
+   *
+   * @return A <em>sorted</em> iterator, as defined by the sort-order
+   *         of the vertex ids
+   */
+  @Override
+  public Iterator<I> iterator() {
+    return destEdgeIndexList.iterator();
+  }
+
+  @Override
+  public int getNumOutEdges() {
+    return destEdgeIndexList.size();
+  }
+
+  @Override
+  public E removeEdge(I targetVertexId) {
+    int pos = Collections.binarySearch(destEdgeIndexList,
+        targetVertexId,
+        new VertexIdComparator());
+    if (pos < 0) {
+      return null;
+    } else {
+      destEdgeIndexList.remove(pos);
+      return destEdgeValueList.remove(pos);
+    }
+  }
+
+  @Override
+  public final void sendMsgToAllEdges(M msg) {
+    if (msg == null) {
+      throw new IllegalArgumentException(
+          "sendMsgToAllEdges: Cannot send null message to all edges");
+    }
+    for (I index : destEdgeIndexList) {
+      sendMsg(index, msg);
+    }
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    vertexId = BspUtils.<I>createVertexIndex(getConf());
+    vertexId.readFields(in);
+    boolean hasVertexValue = in.readBoolean();
+    if (hasVertexValue) {
+      vertexValue = BspUtils.<V>createVertexValue(getConf());
+      vertexValue.readFields(in);
+    }
+    int edgeListCount = in.readInt();
+    destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
+    destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
+    for (int i = 0; i < edgeListCount; ++i) {
+      I destVertexId = BspUtils.<I>createVertexIndex(getConf());
+      E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+      destVertexId.readFields(in);
+      edgeValue.readFields(in);
+      destEdgeIndexList.add(destVertexId);
+      destEdgeValueList.add(edgeValue);
+    }
+    int msgListSize = in.readInt();
+    msgList = Lists.newArrayListWithCapacity(msgListSize);
+    for (int i = 0; i < msgListSize; ++i) {
+      M msg = BspUtils.<M>createMessageValue(getConf());
+      msg.readFields(in);
+      msgList.add(msg);
+    }
+    halt = in.readBoolean();
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    vertexId.write(out);
+    out.writeBoolean(vertexValue != null);
+    if (vertexValue != null) {
+      vertexValue.write(out);
+    }
+    out.writeInt(destEdgeIndexList.size());
+    for (int i = 0; i < destEdgeIndexList.size(); ++i) {
+      destEdgeIndexList.get(i).write(out);
+      destEdgeValueList.get(i).write(out);
+    }
+    out.writeInt(msgList.size());
+    for (M msg : msgList) {
+      msg.write(out);
+    }
+    out.writeBoolean(halt);
+  }
+
+  @Override
+  void putMessages(Iterable<M> messages) {
+    msgList.clear();
+    for (M message : messages) {
+      msgList.add(message);
+    }
+  }
+
+  @Override
+  public Iterable<M> getMessages() {
+    return Iterables.unmodifiableIterable(msgList);
+  }
+
+  @Override
+  void releaseResources() {
+    // Hint to GC to free the messages
+    msgList.clear();
+  }
+
+  @Override
+  public String toString() {
+    return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+        ",#edges=" + getNumOutEdges() + ")";
+  }
 }