You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/09/10 02:40:22 UTC

svn commit: r1167420 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: aching
Date: Sat Sep 10 00:40:21 2011
New Revision: 1167420

URL: http://svn.apache.org/viewvc?rev=1167420&view=rev
Log:
GIRAPH-27: Mutable static global state in Vertex.java should be
refactored. jake.mannix via aching.


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Sep 10 00:40:21 2011
@@ -1,28 +1,31 @@
 Giraph Change Log
 
 Release 0.70.0 - unreleased
+
+  GIRAPH-27: Mutable static global state in Vertex.java should be
+  refactored. jake.mannix via aching.
  
-  GIRAPH-25 NPE in BspServiceMaster when failing a job (aching on behalf 
-  of dvryaboy)
+  GIRAPH-25: NPE in BspServiceMaster when failing a job. dvryaboy via
+  aching.
 
-  GIRAPH-24 Job-level statistics reports one superstep greater than 
+  GIRAPH-24: Job-level statistics reports one superstep greater than 
   workers. (jghoman)
   
-  GIRAPH-18 Refactor BspServiceWorker::loadVertices(). (jghoman)
+  GIRAPH-18: Refactor BspServiceWorker::loadVertices(). (jghoman)
   
-  GIRAPH-14 Support for the Facebook Hadoop branch. (aching)
+  GIRAPH-14: Support for the Facebook Hadoop branch. (aching)
 
-  GIRAPH-16 Add Apache RAT to the verify build step. (omalley)
+  GIRAPH-16: Add Apache RAT to the verify build step. (omalley)
 
-  GIRAPH-17 Giraph doesn't give up properly after the maximum connect
+  GIRAPH-17: Giraph doesn't give up properly after the maximum connect
   attempts to ZooKeeper. (aching)
 
-  GIRAPH-2 Make the project homepage. (jghoman)
+  GIRAPH-2: Make the project homepage. (jghoman)
 
-  GIRAPH-9 Change Yahoo License Header to Apache License Header (hyunsik)
+  GIRAPH-9: Change Yahoo License Header to Apache License Header (hyunsik)
 
-  GIRAPH-6 Remove Yahoo-specific code from pom.xml. (jghoman)
+  GIRAPH-6: Remove Yahoo-specific code from pom.xml. (jghoman)
 
-  GIRAPH-5 Remove Yahoo directories after svn import from Yahoo! (aching)
+  GIRAPH-5: Remove Yahoo directories after svn import from Yahoo! (aching)
 
-  GIRAPH-3 Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file
+  GIRAPH-3: Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Sat Sep 10 00:40:21 2011
@@ -20,7 +20,7 @@ package org.apache.giraph.bsp;
 
 import java.io.IOException;
 
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -44,7 +44,7 @@ public interface CentralizedService<I ex
      *
      * @return representation vertex
      */
-    BasicVertex<I, V, E, M> getRepresentativeVertex();
+    Vertex<I, V, E, M> getRepresentativeVertex();
 
     /**
      * Get the current global superstep of the application to work on.

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Sep 10 00:40:21 2011
@@ -1017,10 +1017,11 @@ end[HADOOP_FACEBOOK]*/
         // Resolve all graph mutations
         for (I vertexIndex : resolveVertexIndexSet) {
             VertexResolver<I, V, E, M> vertexResolver =
-                BspUtils.createVertexResolver(conf);
+                BspUtils.createVertexResolver(
+                    conf, service.getGraphMapper().getGraphState());
             VertexRange<I, V, E, M> vertexRange =
                 service.getVertexRange(service.getSuperstep() - 1, vertexIndex);
-            BasicVertex<I, V, E, M> originalVertex =
+            Vertex<I, V, E, M> originalVertex =
                 vertexRange.getVertexMap().get(vertexIndex);
             List<M> msgList = inMessages.get(vertexIndex);
             if (originalVertex != null) {
@@ -1043,7 +1044,8 @@ end[HADOOP_FACEBOOK]*/
 
             if (vertex != null) {
                 ((MutableVertex<I, V, E, M>) vertex).setVertexId(vertexIndex);
-                vertexRange.getVertexMap().put(vertex.getVertexId(), vertex);
+                vertexRange.getVertexMap().put(vertex.getVertexId(),
+                                               (Vertex<I, V, E, M>) vertex);
             } else if (originalVertex != null) {
                 vertexRange.getVertexMap().remove(originalVertex.getVertexId());
             }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Sat Sep 10 00:40:21 2011
@@ -1,4 +1,4 @@
-/*
+ /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,14 +18,15 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.SortedMap;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /**
  * Basic interface for writing a BSP application for computation.
  *
@@ -35,11 +36,12 @@ import org.apache.hadoop.io.WritableComp
  * @param <M> message data
  */
 @SuppressWarnings("rawtypes")
-public interface BasicVertex<I extends WritableComparable,
-                             V extends Writable,
-                             E extends Writable,
-                             M extends Writable>
-                             extends AggregatorUsage {
+public abstract class BasicVertex<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        implements AggregatorUsage {
+    /** Global graph state **/
+    private GraphState<I,V,E,M> graphState;
+
     /**
      * Optionally defined by the user to be executed once on all workers
      * before application has started.
@@ -47,26 +49,26 @@ public interface BasicVertex<I extends W
      * @throws IllegalAccessException
      * @throws InstantiationException
      */
-    void preApplication()
+    public abstract void preApplication()
         throws InstantiationException, IllegalAccessException;
 
     /**
      * Optionally defined by the user to be executed once on all workers
      * after the application has completed.
      */
-    void postApplication();
+    public abstract void postApplication();
 
     /**
      * Optionally defined by the user to be executed once prior to vertex
      * processing on a worker for the current superstep.
      */
-    void preSuperstep();
+    public abstract void preSuperstep();
 
     /**
      * Optionally defined by the user to be executed once after all vertex
      * processing on a worker for the current superstep.
      */
-    void postSuperstep();
+    public abstract void postSuperstep();
 
     /**
      * Must be defined by user to do computation on a single Vertex.
@@ -75,33 +77,35 @@ public interface BasicVertex<I extends W
      *        vertex in the previous superstep
      * @throws IOException
      */
-    void compute(Iterator<M> msgIterator) throws IOException;
+    public abstract void compute(Iterator<M> msgIterator) throws IOException;
 
     /**
      * Retrieves the current superstep.
      *
      * @return Current superstep
      */
-    long getSuperstep();
+    public long getSuperstep() {
+        return getGraphState().getSuperstep();
+    }
 
     /**
      * Get the vertex id
      */
-    I getVertexId();
+    public abstract I getVertexId();
 
     /**
      * Get the vertex value (data stored with vertex)
      *
      * @return Vertex value
      */
-    V getVertexValue();
+    public abstract V getVertexValue();
 
     /**
      * Set the vertex data (immediately visible in the computation)
      *
      * @param vertexValue Vertex data to be set
      */
-    void setVertexValue(V vertexValue);
+    public abstract void setVertexValue(V vertexValue);
 
     /**
      * Get the total (all workers) number of vertices that
@@ -109,7 +113,9 @@ public interface BasicVertex<I extends W
      *
      * @return Total number of vertices (-1 if first superstep)
      */
-    long getNumVertices();
+    public long getNumVertices() {
+        return getGraphState().getNumVertices();
+    }
 
     /**
      * Get the total (all workers) number of edges that
@@ -117,7 +123,9 @@ public interface BasicVertex<I extends W
      *
      * @return Total number of edges (-1 if first superstep)
      */
-    long getNumEdges();
+    public long getNumEdges() {
+        return getGraphState().getNumEdges();
+    }
 
     /**
      * Every vertex has edges to other vertices.  Get a handle to the outward
@@ -125,7 +133,7 @@ public interface BasicVertex<I extends W
      *
      * @return Map of the destination vertex index to the {@link Edge}
      */
-    SortedMap<I, Edge<I, E>> getOutEdgeMap();
+    public abstract SortedMap<I, Edge<I, E>> getOutEdgeMap();
 
     /**
      * Send a message to a vertex id.
@@ -133,12 +141,19 @@ public interface BasicVertex<I extends W
      * @param id vertex id to send the message to
      * @param msg message data to send
      */
-    void sendMsg(I id, M msg);
+    public void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException(
+                "sendMsg: Cannot send null message to " + id);
+        }
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            sendMessageReq(id, msg);
+    }
 
     /**
      * Send a message to all edges.
      */
-    void sendMsgToAllEdges(M msg);
+    public abstract void sendMsgToAllEdges(M msg);
 
     /**
      * After this is called, the compute() code will no longer be called for
@@ -146,16 +161,64 @@ public interface BasicVertex<I extends W
      * will be called once again until this function is called.  The application
      * finishes only when all vertices vote to halt.
      */
-    void voteToHalt();
+    public abstract void voteToHalt();
 
     /**
      * Is this vertex done?
      */
-    boolean isHalted();
+    public abstract boolean isHalted();
 
     /**
      *  Get the list of incoming messages from the previous superstep.  Same as
      *  the message iterator passed to compute().
      */
-    List<M> getMsgList();
+    public abstract List<M> getMsgList();
+
+    /**
+     * Get the graph state for all workers.
+     *
+     * @return Graph state for all workers
+     */
+    GraphState<I, V, E, M> getGraphState() {
+        return graphState;
+    }
+
+    /**
+     * Set the graph state for all workers
+     *
+     * @param graphState Graph state for all workers
+     */
+    void setGraphState(GraphState<I, V, E, M> graphState) {
+        this.graphState = graphState;
+    }
+
+    /**
+     * Get the mapper context
+     *
+     * @return Mapper context
+     */
+    public Mapper.Context getContext() {
+        return getGraphState().getContext();
+    }
+
+    @Override
+    public final <A extends Writable> Aggregator<A> registerAggregator(
+            String name,
+            Class<? extends Aggregator<A>> aggregatorClass)
+            throws InstantiationException, IllegalAccessException {
+        return getGraphState().getGraphMapper().getAggregatorUsage().
+            registerAggregator(name, aggregatorClass);
+    }
+
+    @Override
+    public final Aggregator<? extends Writable> getAggregator(String name) {
+        return getGraphState().getGraphMapper().getAggregatorUsage().
+            getAggregator(name);
+    }
+
+    @Override
+    public final boolean useAggregator(String name) {
+        return getGraphState().getGraphMapper().getAggregatorUsage().
+            useAggregator(name);
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Sat Sep 10 00:40:21 2011
@@ -18,41 +18,39 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.log4j.Logger;
+import org.apache.giraph.bsp.CentralizedService;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import org.apache.giraph.bsp.CentralizedService;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.giraph.zk.ZooKeeperExt;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 /**
  * Zookeeper-based implementation of {@link CentralizedService}.
@@ -631,7 +629,9 @@ public abstract class BspService <
         this.hostnamePartitionId = hostname + "_" + getTaskPartition();
 
         this.representativeVertex =
-            BspUtils.<I, V, E, M>createVertex(getConfiguration());
+            BspUtils.<I, V, E, M>createVertex(
+                getConfiguration(),
+                getGraphMapper().getGraphState());
 
         this.checkpointFrequency =
             conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
@@ -670,7 +670,12 @@ public abstract class BspService <
         return jobId;
     }
 
-    final public BasicVertex<I, V, E, M> getRepresentativeVertex() {
+    /**
+     * Get the representative vertex
+     *
+     * @return Representative vertex for this service.
+     */
+    final public Vertex<I, V, E, M> getRepresentativeVertex() {
         return representativeVertex;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Sep 10 00:40:21 2011
@@ -18,6 +18,28 @@
 
 package org.apache.giraph.graph;
 
+import net.iharder.Base64;
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -36,33 +58,6 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import net.iharder.Base64;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.log4j.Logger;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.giraph.bsp.ApplicationState;
-
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
  */
@@ -387,16 +382,17 @@ public class BspServiceWorker<
      * @throws InterruptedException
      */
     private List<Vertex<I, V, E, M>> readVerticesFromInputSplit(
-        InputSplit inputSplit) throws IOException, InterruptedException {
-        List<Vertex<I, V, E, M>> vertexList = new ArrayList<Vertex<I, V, E, M>>();
+            InputSplit inputSplit) throws IOException, InterruptedException {
+        List<Vertex<I, V, E, M>> vertexList =
+            new ArrayList<Vertex<I, V, E, M>>();
         VertexInputFormat<I, V, E> vertexInputFormat =
             BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
         VertexReader<I, V, E> vertexReader =
             vertexInputFormat.createVertexReader(inputSplit, getContext());
         vertexReader.initialize(inputSplit, getContext());
         Vertex<I, V, E, M> readerVertex =
-            BspUtils.<I, V, E, M>createVertex(getConfiguration());
-
+            BspUtils.<I, V, E, M>createVertex(
+                getConfiguration(), getGraphMapper().getGraphState());
         while (vertexReader.next(readerVertex)) {
             if (readerVertex.getVertexId() == null) {
                 throw new IllegalArgumentException(
@@ -422,7 +418,8 @@ public class BspServiceWorker<
                 }
             }
             vertexList.add(readerVertex);
-            readerVertex = BspUtils.<I, V, E, M>createVertex(getConfiguration());
+            readerVertex = BspUtils.<I, V, E, M>createVertex(getConfiguration(),
+                getGraphMapper().getGraphState());
             getContext().progress();
         }
         vertexReader.close();
@@ -510,8 +507,7 @@ public class BspServiceWorker<
             }
             VertexRange<I, V, E, M> range =
                 vertexRangeMap.get(currentVertexIndexMax);
-            SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
-                range.getVertexMap();
+            SortedMap<I, Vertex<I, V, E, M>> vertexMap = range.getVertexMap();
             if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
                 throw new IllegalStateException(
                     "loadVertices: Already contains vertex " +
@@ -1179,7 +1175,9 @@ public class BspServiceWorker<
         VertexRange<I, V, E, M> vertexRange = getVertexRangeMap().get(maxIndex);
         for (int i = 0; i < vertexCount; ++i) {
             Vertex<I, V, E, M> vertex =
-                BspUtils.<I, V, E, M>createVertex(getConfiguration());
+                BspUtils.<I, V, E, M>createVertex(
+                    getConfiguration(),
+                    getGraphMapper().getGraphState());
             vertex.readFields(dataStream);
             // Add the vertex
             if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex)
@@ -1364,7 +1362,7 @@ public class BspServiceWorker<
                     continue;
                 }
 
-                SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
+                SortedMap<I, Vertex<I, V, E, M>> vertexMap =
                     getVertexRangeMap().get(entry.getKey()).getVertexMap();
                 if (vertexMap.size() != 0) {
                     throw new RuntimeException(
@@ -1378,7 +1376,7 @@ public class BspServiceWorker<
                              entry.getValue().size() +
                              " vertices for max index " + entry.getKey());
                 }
-                for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+                for (Vertex<I, V, E, M> vertex : entry.getValue()) {
                     if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
                         throw new IllegalStateException(
                             "exchangeVertexRanges: Vertex " + vertex +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Sat Sep 10 00:40:21 2011
@@ -152,7 +152,8 @@ public class BspUtils {
      */
     @SuppressWarnings("rawtypes")
     public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable> VertexRangeBalancer<I, V, E, M>
+            E extends Writable, M extends Writable>
+            VertexRangeBalancer<I, V, E, M>
             createVertexRangeBalancer(Configuration conf) {
         Class<? extends VertexRangeBalancer<I, V, E, M>>
             vertexRangeBalancerClass = getVertexRangeBalancerClass(conf);
@@ -187,10 +188,14 @@ public class BspUtils {
     @SuppressWarnings("rawtypes")
     public static <I extends WritableComparable, V extends Writable,
             E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
-            createVertexResolver(Configuration conf) {
+            createVertexResolver(Configuration conf,
+                                 GraphState<I, V, E, M> graphState) {
         Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
             getVertexResolverClass(conf);
-        return ReflectionUtils.newInstance(vertexResolverClass, conf);
+        VertexResolver<I, V, E, M> resolver =
+            ReflectionUtils.newInstance(vertexResolverClass, conf);
+        resolver.setGraphState(graphState);
+        return resolver;
     }
 
     /**
@@ -221,10 +226,14 @@ public class BspUtils {
     @SuppressWarnings("rawtypes")
     public static <I extends WritableComparable, V extends Writable,
             E extends Writable, M extends Writable> Vertex<I, V, E, M>
-            createVertex(Configuration conf) {
+            createVertex(Configuration conf,
+            GraphState<I, V, E, M> graphState) {
         Class<? extends Vertex<I, V, E, M>> vertexClass =
             getVertexClass(conf);
-        return ReflectionUtils.newInstance(vertexClass, conf);
+        Vertex<I, V, E, M> vertex =
+            ReflectionUtils.newInstance(vertexClass, conf);
+        vertex.setGraphState(graphState);
+        return vertex;
     }
 
     /**

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sat Sep 10 00:40:21 2011
@@ -18,15 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.RPCCommunications;
 import org.apache.giraph.comm.ServerInterface;
@@ -39,6 +30,15 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 /**
  * This mapper that will execute the BSP graph tasks.  Since this mapper will
  * not be passing data by key-value pairs through the MR framework, the
@@ -66,6 +66,11 @@ public class GraphMapper<I extends Writa
     private boolean done = false;
     /** What kind of functions is this mapper doing? */
     private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
+    /**
+     * Graph state for all vertices that is used for the duration of
+     * this mapper.
+     */
+    private GraphState<I,V,E,M> graphState = new GraphState<I, V, E, M>();
 
     /** What kinds of functions to run on this mapper */
     public enum MapFunctions {
@@ -103,6 +108,10 @@ public class GraphMapper<I extends Writa
         return serviceWorker;
     }
 
+    public final GraphState<I,V,E,M> getGraphState() {
+      return graphState;
+    }
+
     /**
      * Default handler for uncaught exceptions.
      */
@@ -380,6 +389,7 @@ public class GraphMapper<I extends Writa
     public void setup(Context context)
             throws IOException, InterruptedException {
         context.setStatus("setup: Beginning mapper setup.");
+        graphState.setContext(context);
         // Setting the default handler for uncaught exceptions.
         Thread.setDefaultUncaughtExceptionHandler(
             new OverrideExceptionHandler());
@@ -391,8 +401,6 @@ public class GraphMapper<I extends Writa
         }
         // Ensure the user classes have matching types and figure them out
         determineClassTypes(conf);
-        Vertex.setGraphMapper(this);
-        Vertex.setContext(context);
 
         // Do some initial setup (possibly starting up a Zookeeper service)
         context.setStatus("setup: Initializing Zookeeper services.");
@@ -506,7 +514,13 @@ public class GraphMapper<I extends Writa
         }
         mapAlreadyRun = true;
 
+        graphState.setSuperstep(serviceWorker.getSuperstep()).
+            setContext(context).setGraphMapper(this).
+            setNumEdges(serviceWorker.getTotalEdges()).
+            setNumVertices(serviceWorker.getTotalVertices());
+
         try {
+            serviceWorker.getRepresentativeVertex().setGraphState(graphState);
             serviceWorker.getRepresentativeVertex().preApplication();
         } catch (InstantiationException e) {
             LOG.fatal("map: preApplication failed in instantiation", e);
@@ -526,9 +540,14 @@ public class GraphMapper<I extends Writa
         do {
             long superstep = serviceWorker.getSuperstep();
 
+            graphState.setSuperstep(superstep)
+                      .setNumEdges(serviceWorker.getTotalEdges())
+                      .setNumVertices(serviceWorker.getTotalVertices());
+
             if (commService != null) {
                 commService.prepareSuperstep();
             }
+
             serviceWorker.startSuperstep();
             if (zkManager != null && zkManager.runsZooKeeper()) {
                 if (LOG.isInfoEnabled()) {
@@ -570,10 +589,6 @@ public class GraphMapper<I extends Writa
             serviceWorker.exchangeVertexRanges();
             context.progress();
 
-            Vertex.setSuperstep(superstep);
-            Vertex.setNumVertices(serviceWorker.getTotalVertices());
-            Vertex.setNumEdges(serviceWorker.getTotalEdges());
-
             serviceWorker.getRepresentativeVertex().preSuperstep();
             context.progress();
 
@@ -591,8 +606,11 @@ public class GraphMapper<I extends Writa
                     continue;
                 }
 
-                for (BasicVertex<I, V, E, M> vertex :
+                for (Vertex<I, V, E, M> vertex :
                         entry.getValue().getVertexMap().values()) {
+                    // Make sure every vertex has the current
+                    // graphState before computing
+                    vertex.setGraphState(graphState);
                     if (vertex.isHalted() &&
                             !vertex.getMsgList().isEmpty()) {
                         Vertex<I, V, E, M> activatedVertex =

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1167420&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Sat Sep 10 00:40:21 2011
@@ -0,0 +1,75 @@
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/*
+ * Global state of the graph.  Should be treated as a singleton (but is kept
+ * as a regular bean to facilitate ease of unit testing)
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+@SuppressWarnings("rawtypes")
+public class GraphState<I extends WritableComparable, V extends Writable,
+        E extends Writable, M extends Writable> {
+    /** Graph-wide superstep */
+    private long superstep = 0;
+    /** Graph-wide number of vertices */
+    private long numVertices = -1;
+    /** Graph-wide number of edges */
+    private long numEdges = -1;
+    /** Graph-wide map context */
+    private Mapper.Context context = null;
+    /** Graph-wide BSP Mapper for this Vertex */
+    private GraphMapper<I, V, E, M> graphMapper = null;
+
+    public long getSuperstep() {
+        return superstep;
+    }
+
+    public GraphState<I, V, E, M> setSuperstep(long superstep) {
+        this.superstep = superstep;
+        return this;
+    }
+
+    public long getNumVertices() {
+        return numVertices;
+    }
+
+    public GraphState<I, V, E, M> setNumVertices(long numVertices) {
+        this.numVertices = numVertices;
+        return this;
+    }
+
+    public long getNumEdges() {
+        return numEdges;
+    }
+
+    public GraphState<I, V, E, M> setNumEdges(long numEdges) {
+        this.numEdges = numEdges;
+        return this;
+    }
+
+    public Mapper.Context getContext() {
+        return context;
+    }
+
+    public GraphState<I, V , E ,M> setContext(Mapper.Context context) {
+        this.context = context;
+        return this;
+    }
+
+    public GraphMapper<I, V, E, M> getGraphMapper() {
+        return graphMapper;
+    }
+
+    public GraphState<I, V, E, M> setGraphMapper(
+            GraphMapper<I, V, E, M> graphMapper) {
+        this.graphMapper = graphMapper;
+        return this;
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Sat Sep 10 00:40:21 2011
@@ -28,17 +28,15 @@ import org.apache.hadoop.io.WritableComp
  * or mutate the graph.
  */
 @SuppressWarnings("rawtypes")
-public interface MutableVertex<I extends WritableComparable,
-                               V extends Writable,
-                               E extends Writable,
-                               M extends Writable>
-                               extends BasicVertex<I, V, E, M>, Writable {
+public abstract class MutableVertex<I extends WritableComparable,
+        V extends Writable, E extends Writable, M extends Writable>
+        extends BasicVertex<I, V, E, M> implements Writable {
     /**
      * Set the vertex id
      *
      * @param id Vertex id is set to this (instantiated by the user)
      */
-    void setVertexId(I id);
+    public abstract void setVertexId(I id);
 
     /**
      * Add an edge for this vertex (happens immediately)
@@ -46,7 +44,7 @@ public interface MutableVertex<I extends
      * @param edge Edge to be added
      * @return Return true if succeeded, false otherwise
      */
-    boolean addEdge(Edge<I, E> edge);
+    public abstract boolean addEdge(Edge<I, E> edge);
 
     /**
      * Create a vertex for use in addVertexRequest().  Still need to get the
@@ -54,7 +52,12 @@ public interface MutableVertex<I extends
      *
      * @return Created vertex for addVertexRequest.
      */
-    MutableVertex<I, V, E, M> instantiateVertex();
+    public MutableVertex<I, V, E, M> instantiateVertex() {
+        Vertex<I, V, E, M> mutableVertex =
+            BspUtils.createVertex(getContext().getConfiguration(),
+                                  getGraphState());
+        return mutableVertex;
+    }
 
     /**
      * Sends a request to create a vertex that will be available during the
@@ -62,7 +65,11 @@ public interface MutableVertex<I extends
      *
      * @param vertex User created vertex
      */
-    void addVertexRequest(MutableVertex<I, V, E, M> vertex) throws IOException;
+    public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
+            throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+        addVertexReq(vertex);
+}
 
     /**
      * Request to remove a vertex from the graph
@@ -70,7 +77,10 @@ public interface MutableVertex<I extends
      *
      * @param vertexId Id of the vertex to be removed.
      */
-    void removeVertexRequest(I vertexId) throws IOException;
+    public void removeVertexRequest(I vertexId) throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+        removeVertexReq(vertexId);
+    }
 
     /**
      * Request to add an edge of a vertex in the graph
@@ -79,7 +89,11 @@ public interface MutableVertex<I extends
      * @param sourceVertexId Source vertex id of edge
      * @param edge Edge to add
      */
-    void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) throws IOException;
+    public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+            throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            addEdgeReq(sourceVertexId, edge);
+    }
 
     /**
      * Request to remove an edge of a vertex from the graph
@@ -88,5 +102,9 @@ public interface MutableVertex<I extends
      * @param sourceVertexId Source vertex id of edge
      * @param destVertexId Destination vertex id of edge
      */
-    void removeEdgeRequest(I sourceVertexId, I destVertexId) throws IOException;
+    public void removeEdgeRequest(I sourceVertexId, I destVertexId)
+            throws IOException {
+        getGraphState().getGraphMapper().getWorkerCommunications().
+            removeEdgeReq(sourceVertexId, destVertexId);
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Sat Sep 10 00:40:21 2011
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,13 +30,6 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.log4j.Logger;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-
 /**
  * User applications should all subclass {@link Vertex}.  Package access
  * should prevent users from accessing internal methods.
@@ -43,24 +40,11 @@ import org.apache.hadoop.mapreduce.Mappe
  * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
-public abstract class Vertex<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        implements MutableVertex<I, V, E, M> {
+public abstract class Vertex<I extends WritableComparable, V extends Writable,
+        E extends Writable, M extends Writable>
+        extends MutableVertex<I, V, E, M> {
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(Vertex.class);
-    /** Class-wide superstep */
-    private static long superstep = 0;
-    /** Class-wide number of vertices */
-    private static long numVertices = -1;
-    /** Class-wide number of edges */
-    private static long numEdges = -1;
-    /** Class-wide map context */
-    private static Mapper.Context context = null;
-    /** Class-wide BSP Mapper for this Vertex */
-    private static GraphMapper<?, ? ,?, ?> graphMapper = null;
     /** Vertex id */
     private I vertexId = null;
     /** Vertex value */
@@ -119,32 +103,6 @@ public abstract class Vertex<
         return vertexId;
     }
 
-    /**
-     * Set the GraphMapper for this vertex (internal use).
-     *
-     * @param graphMapper Mapper to use for communication
-     */
-    final static <I extends WritableComparable,
-            V extends Writable, E extends Writable,
-            M extends Writable> void
-            setGraphMapper(GraphMapper<I, V, E, M> graphMapper) {
-        Vertex.graphMapper = graphMapper;
-    }
-
-    /**
-     * Set the global superstep for all the vertices (internal use)
-     *
-     * @param superstep New superstep
-     */
-    static void setSuperstep(long superstep) {
-        Vertex.superstep = superstep;
-    }
-
-    @Override
-    public final long getSuperstep() {
-        return superstep;
-    }
-
     @Override
     public final V getVertexValue() {
         return vertexValue;
@@ -155,50 +113,11 @@ public abstract class Vertex<
         this.vertexValue = vertexValue;
     }
 
-    /**
-     * Set the total number of vertices from the last superstep.
-     *
-     * @param numVertices Aggregate vertices in the last superstep
-     */
-    static void setNumVertices(long numVertices) {
-        Vertex.numVertices = numVertices;
-    }
-
-    @Override
-    public final long getNumVertices() {
-        return numVertices;
-    }
-
-    /**
-     * Set the total number of edges from the last superstep.
-     *
-     * @param numEdges Aggregate edges in the last superstep
-     */
-    static void setNumEdges(long numEdges) {
-        Vertex.numEdges = numEdges;
-    }
-
-    @Override
-    public final long getNumEdges() {
-        return numEdges;
-    }
-
     @Override
     public final SortedMap<I, Edge<I, E>> getOutEdgeMap() {
         return destEdgeMap;
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public final void sendMsg(I id, M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                "sendMsg: Cannot send null message to " + id);
-        }
-        ((GraphMapper<I, V, E, M>) graphMapper).
-            getWorkerCommunications().sendMessageReq(id, msg);
-    }
-
     @Override
     public final void sendMsgToAllEdges(M msg) {
         if (msg == null) {
@@ -211,45 +130,6 @@ public abstract class Vertex<
     }
 
     @Override
-    public MutableVertex<I, V, E, M> instantiateVertex() {
-        Vertex<I, V, E, M> mutableVertex =
-            BspUtils.<I, V, E, M>createVertex(getContext().getConfiguration());
-        return mutableVertex;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
-            throws IOException {
-        ((GraphMapper<I, V, E, M>) graphMapper).
-            getWorkerCommunications().addVertexReq(vertex);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void removeVertexRequest(I vertexId) throws IOException {
-        ((GraphMapper<I, V, E, M>) graphMapper).
-            getWorkerCommunications().removeVertexReq(vertexId);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void addEdgeRequest(I vertexIndex,
-                               Edge<I, E> edge) throws IOException {
-        ((GraphMapper<I, V, E, M>) graphMapper).
-            getWorkerCommunications().addEdgeReq(vertexIndex, edge);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void removeEdgeRequest(I sourceVertexId,
-                                  I destVertexId) throws IOException {
-        ((GraphMapper<I, V, E, M>) graphMapper).
-            getWorkerCommunications().removeEdgeReq(sourceVertexId,
-                                                    destVertexId);
-    }
-
-    @Override
     public final void voteToHalt() {
         halt = true;
     }
@@ -262,12 +142,12 @@ public abstract class Vertex<
     @Override
     final public void readFields(DataInput in) throws IOException {
         vertexId =
-            BspUtils.<I>createVertexIndex(getContext().getConfiguration());
+            BspUtils.createVertexIndex(getContext().getConfiguration());
         vertexId.readFields(in);
         boolean hasVertexValue = in.readBoolean();
         if (hasVertexValue) {
             vertexValue =
-                BspUtils.<V>createVertexValue(getContext().getConfiguration());
+                BspUtils.createVertexValue(getContext().getConfiguration());
             vertexValue.readFields(in);
         }
         long edgeMapSize = in.readLong();
@@ -280,7 +160,7 @@ public abstract class Vertex<
         long msgListSize = in.readLong();
         for (long i = 0; i < msgListSize; ++i) {
             M msg =
-                BspUtils.<M>createMessageValue(getContext().getConfiguration());
+                BspUtils.createMessageValue(getContext().getConfiguration());
             msg.readFields(in);
             msgList.add(msg);
         }
@@ -306,37 +186,10 @@ public abstract class Vertex<
     }
 
     @Override
-    public final <A extends Writable> Aggregator<A> registerAggregator(
-            String name,
-            Class<? extends Aggregator<A>> aggregatorClass)
-            throws InstantiationException, IllegalAccessException {
-        return graphMapper.getAggregatorUsage().registerAggregator(
-            name, aggregatorClass);
-    }
-
-    @Override
-    public final Aggregator<? extends Writable> getAggregator(String name) {
-        return graphMapper.getAggregatorUsage().getAggregator(name);
-    }
-
-    @Override
-    public final boolean useAggregator(String name) {
-        return graphMapper.getAggregatorUsage().useAggregator(name);
-    }
-
-    @Override
     public List<M> getMsgList() {
         return msgList;
     }
 
-    public final Mapper<?, ?, ?, ?>.Context getContext() {
-        return context;
-    }
-
-    final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
-        Vertex.context = context;
-    }
-
     @Override
     public String toString() {
         return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java Sat Sep 10 00:40:21 2011
@@ -68,8 +68,8 @@ public class VertexRange<I extends Writa
     /** Checkpoint file prefix (null if not recovering from a checkpoint) */
     private String checkpointfilePrefix = null;
     /** Vertex map for this range (keyed by index) */
-    private final SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
-        new TreeMap<I, BasicVertex<I, V, E, M>>();
+    private final SortedMap<I, Vertex<I, V, E, M>> vertexMap =
+        new TreeMap<I, Vertex<I, V, E, M>>();
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(VertexRange.class);
 
@@ -207,7 +207,7 @@ public class VertexRange<I extends Writa
      *
      * @return Map of vertices (keyed by index)
      */
-    public SortedMap<I, BasicVertex<I, V, E, M>> getVertexMap() {
+    public SortedMap<I, Vertex<I, V, E, M>> getVertexMap() {
         return vertexMap;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Sat Sep 10 00:40:21 2011
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.graph;
 
-import java.util.List;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
+import java.util.List;
+
 /**
  * Default implementation of how to resolve vertex creation/removal, messages
  * to nonexistent vertices, etc.
@@ -41,6 +41,9 @@ public class VertexResolver<I extends Wr
         implements BasicVertexResolver<I, V, E, M>, Configurable {
     /** Configuration */
     private Configuration conf = null;
+
+    private GraphState<I,V,E,M> graphState;
+
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(VertexResolver.class);
 
@@ -107,7 +110,7 @@ public class VertexResolver<I extends Wr
 
     @Override
     public MutableVertex<I, V, E, M> instantiateVertex() {
-        return BspUtils.<I, V, E, M>createVertex(getConf());
+        return BspUtils.<I, V, E, M>createVertex(getConf(), graphState);
     }
 
     @Override
@@ -119,4 +122,8 @@ public class VertexResolver<I extends Wr
     public void setConf(Configuration conf) {
         this.conf = conf;
     }
+
+    public void setGraphState(GraphState<I, V, E, M> graphState) {
+      this.graphState = graphState;
+    }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Sat Sep 10 00:40:21 2011
@@ -18,35 +18,41 @@
 
 package org.apache.giraph;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
+import junit.framework.Test;
+import junit.framework.TestSuite;
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCombinerVertex;
 import org.apache.giraph.examples.SimpleFailVertex;
 import org.apache.giraph.examples.SimpleMsgVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.examples.SimpleShortestPathsVertex;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimpleShortestPathsVertex;
 import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
 import org.apache.giraph.examples.SimpleSumCombiner;
 import org.apache.giraph.examples.SimpleSuperstepVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.GiraphJob;
-import junit.framework.Test;
-import junit.framework.TestSuite;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 
 /**
  * Unit test for many simple BSP applications.
@@ -87,16 +93,22 @@ public class TestBspBasic extends BspCas
         InvocationTargetException, SecurityException, NoSuchMethodException {
         System.out.println("testInstantiateVertex: java.class.path=" +
                            System.getProperty("java.class.path"));
-        java.lang.reflect.Constructor<?> ctor =
-            SimpleSuperstepVertex.class.getConstructor();
-        assertNotNull(ctor);
-        SimpleSuperstepVertex test =
-            (SimpleSuperstepVertex) ctor.newInstance();
+        GiraphJob job = new GiraphJob(getCallingMethodName());
+        job.setVertexClass(SimpleSuperstepVertex.class);
+        job.setVertexInputFormatClass(
+            SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
+        GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
+            new GraphState<LongWritable, IntWritable,
+                           FloatWritable, IntWritable>();
+        Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
+            BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>
+            createVertex(job.getConfiguration(), gs);
         System.out.println("testInstantiateVertex: superstep=" +
-                           test.getSuperstep());
-        SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat inputFormat =
-            SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat
-            .class.newInstance();
+                           vertex.getSuperstep());
+        VertexInputFormat<LongWritable, IntWritable, FloatWritable>
+            inputFormat =
+                BspUtils.<LongWritable, IntWritable, FloatWritable>
+                createVertexInputFormat(job.getConfiguration());
         List<InputSplit> splitArray =
             inputFormat.getSplits(
                 new JobContext(new Configuration(), new JobID()), 1);