You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 01:54:22 UTC

svn commit: r1201987 [3/5] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 15 00:54:20 2011
@@ -19,8 +19,19 @@
 package org.apache.giraph.graph;
 
 import net.iharder.Base64;
+
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.RPCCommunications;
+import org.apache.giraph.comm.ServerInterface;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionExchange;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -47,15 +58,16 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 
 /**
@@ -71,38 +83,63 @@ public class BspServiceWorker<
         implements CentralizedServiceWorker<I, V, E, M> {
     /** Number of input splits */
     private int inputSplitCount = -1;
-    /** Cached aggregate number of vertices in the entire application */
-    private long totalVertices = -1;
-    /** Cached aggregate number of edges in the entire application */
-    private long totalEdges = -1;
     /** My process health znode */
     private String myHealthZnode;
-    /** Final server RPC port */
-    private final int finalRpcPort;
     /** List of aggregators currently in use */
     private Set<String> aggregatorInUse = new TreeSet<String>();
+    /** Worker info */
+    private final WorkerInfo workerInfo;
+    /** Worker graph partitioner */
+    private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+    /** Input split vertex cache (only used when loading from input split) */
+    private final Map<PartitionOwner, Partition<I, V, E, M>>
+        inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
+    /** Communication service */
+    private final ServerInterface<I, V, E, M> commService;
+    /** Structure to store the partitions on this worker */
+    private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
+        new HashMap<Integer, Partition<I, V, E, M>>();
+    /** Have the partition exchange children (workers) changed? */
+    private final BspEvent partitionExchangeChildrenChanged =
+        new PredicateLock();
+    /** Max vertices per partition before sending */
+    private final int maxVerticesPerPartition;
     /** Worker Context */
-    private WorkerContext workerContext;
+    private final WorkerContext workerContext;
+    /** Total vertices loaded */
+    private long totalVerticesLoaded = 0;
+    /** Total edges loaded */
+    private long totalEdgesLoaded = 0;
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
 
-    public BspServiceWorker(String serverPortList,
-                            int sessionMsecTimeout,
-                            Mapper<?, ?, ?, ?>.Context context,
-                            GraphMapper<I, V, E, M> graphMapper) {
+    public BspServiceWorker(
+            String serverPortList,
+            int sessionMsecTimeout,
+            Mapper<?, ?, ?, ?>.Context context,
+            GraphMapper<I, V, E, M> graphMapper,
+            GraphState<I, V, E,M> graphState)
+            throws UnknownHostException, IOException, InterruptedException {
         super(serverPortList, sessionMsecTimeout, context, graphMapper);
-        this.finalRpcPort =
+        int finalRpcPort =
             getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
-                          GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
-                          getTaskPartition();
-        this.workerContext = BspUtils.createWorkerContext(getConfiguration(), 
+                                      GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+                                      getTaskPartition();
+        maxVerticesPerPartition=
+            getConfiguration().getInt(
+                GiraphJob.MAX_VERTICES_PER_PARTITION,
+                GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
+        workerInfo =
+            new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
+        workerGraphPartitioner =
+            getGraphPartitionerFactory().createWorkerGraphPartitioner();
+        commService = new RPCCommunications<I, V, E, M>(
+            context, this, graphState);
+        graphState.setWorkerCommunications(commService);
+        this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
             graphMapper.getGraphState());
     }
 
-    public int getPort() {
-        return finalRpcPort;
-    }
-    
     public WorkerContext getWorkerContext() {
     	return workerContext;
     }
@@ -182,8 +219,10 @@ public class BspServiceWorker<
                                        CreateMode.EPHEMERAL,
                                        false);
                         reservedInputSplitPath = inputSplitPath;
-                        LOG.info("reserveInputSplit: Reserved input split " +
-                                 "path " + reservedInputSplitPath);
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info("reserveInputSplit: Reserved input " +
+                                     "split path " + reservedInputSplitPath);
+                        }
                         return reservedInputSplitPath;
                     } catch (KeeperException.NodeExistsException e) {
                         LOG.info("reserveInputSplit: Couldn't reserve (already " +
@@ -211,80 +250,11 @@ public class BspServiceWorker<
     }
 
     /**
-     * Each worker will set the vertex ranges that it has found for a given
-     * InputSplit. After this, the InputSplit is considered finished.
-     *
-     * @param inputSplitPath path to the input split znode
-     * @param maxIndexStatMap maps max vertex indexes to a list containing
-     *        the number of vertices (index 0) and the number of edges (index 1)
-     *        in each partition (can be null, where nothing is written)
-     */
-    private void setInputSplitVertexRanges(
-            String inputSplitPath,
-            Map<I, List<Long>> maxIndexStatMap) {
-        String inputSplitFinishedPath =
-            inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
-        byte [] zkData = null;
-        JSONArray statArray = new JSONArray();
-        if (maxIndexStatMap != null) {
-            for (Map.Entry<I, List<Long>> entry : maxIndexStatMap.entrySet()) {
-                try {
-                    ByteArrayOutputStream outputStream =
-                        new ByteArrayOutputStream();
-                    DataOutput output = new DataOutputStream(outputStream);
-                    ((Writable) entry.getKey()).write(output);
-
-                    JSONObject vertexRangeObj = new JSONObject();
-                    vertexRangeObj.put(JSONOBJ_NUM_VERTICES_KEY,
-                                       entry.getValue().get(0));
-                    vertexRangeObj.put(JSONOBJ_NUM_EDGES_KEY,
-                                       entry.getValue().get(1));
-                    vertexRangeObj.put(JSONOBJ_HOSTNAME_ID_KEY,
-                                       getHostnamePartitionId());
-                    vertexRangeObj.put(JSONOBJ_MAX_VERTEX_INDEX_KEY,
-                                       Base64.encodeBytes(
-                                           outputStream.toByteArray()));
-                    vertexRangeObj.put(JSONOBJ_NUM_MESSAGES_KEY, 0L);
-                    statArray.put(vertexRangeObj);
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("setInputSplitVertexRanges: " +
-                                 "Trying to add vertexRangeObj " +
-                                 "(max vertex index = " + entry.getKey() + " " +
-                                 vertexRangeObj + " to InputSplit path " +
-                                 inputSplitPath);
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                        "setInputSplitVertexRanges: Failed to add " +
-                        "vertex range " + entry.getKey(), e);
-                }
-            }
-            zkData = statArray.toString().getBytes();
-        }
-        try {
-            getZkExt().createExt(inputSplitFinishedPath,
-                                 zkData,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.warn("setLocalVertexRanges: " + inputSplitFinishedPath +
-                     " already exists!");
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        if (LOG.isInfoEnabled()) {
-            LOG.info("setInputSplitVertexRanges: Finished loading " +
-                     inputSplitPath + " with vertexRanges - " + statArray);
-        }
-    }
-
-    /**
      * Load the vertices from the user-defined VertexReader into our partitions
      * of vertex ranges.  Do this until all the InputSplits have been processed.
      * All workers will try to do as many InputSplits as they can.  The master
      * will monitor progress and stop this once all the InputSplits have been
-     * loaded and check-pointed.  The InputSplits must be sorted.
+     * loaded and check-pointed.
      *
      * @throws IOException
      * @throws IllegalAccessException
@@ -292,21 +262,22 @@ public class BspServiceWorker<
      * @throws ClassNotFoundException
      * @throws InterruptedException
      */
-    private void loadVertices() throws IOException, ClassNotFoundException,
+    private VertexEdgeCount loadVertices() throws IOException, ClassNotFoundException,
             InterruptedException, InstantiationException,
             IllegalAccessException {
         String inputSplitPath = null;
+        VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
         while ((inputSplitPath = reserveInputSplit()) != null) {
-            Map<I, List<Long>> maxIndexStatMap =
-                loadVerticesFromInputSplit(inputSplitPath);
-            setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
+            vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+                loadVerticesFromInputSplit(inputSplitPath));
         }
+        return vertexEdgeCount;
     }
 
     /**
-     * Extract vertices from input split, generating mapping of Indices to
-     * statistics about the vertices. As a side effect, load the vertices
-     * into local global map
+     * Extract vertices from input split, saving them into a mini cache of
+     * partitions.  Periodically flush the cache of vertices when a limit is
+     * reached.  Mark the input split finished when done.
      *
      * @param inputSplitPath ZK location of input split
      * @return Mapping of vertex indices and statistics, or null if no data read
@@ -316,29 +287,44 @@ public class BspServiceWorker<
      * @throws InstantiationException
      * @throws IllegalAccessException
      */
-    private Map<I, List<Long>> loadVerticesFromInputSplit(String inputSplitPath)
+    private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
         throws IOException, ClassNotFoundException, InterruptedException,
                InstantiationException, IllegalAccessException {
         InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
-
-        List<BasicVertex<I, V, E, M>> vertexList =
+        VertexEdgeCount vertexEdgeCount =
             readVerticesFromInputSplit(inputSplit);
 
-        if (LOG.isInfoEnabled()) {
-            LOG.info("loadVertices: Got " + vertexList.size() +
-                 " vertices from input split " + inputSplit);
+        // Flush the remaining cached vertices
+        for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+                inputSplitCache.entrySet()) {
+            if (!entry.getValue().getVertices().isEmpty()) {
+                commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
+                                             entry.getValue());
+                entry.getValue().getVertices().clear();
+            }
         }
+        inputSplitCache.clear();
 
-        if (vertexList.isEmpty()) {
-            return null;
+        // Mark this input split done to the master
+        String inputSplitFinishedPath =
+            inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
+        try {
+            getZkExt().createExt(inputSplitFinishedPath,
+                                 null,
+                                 Ids.OPEN_ACL_UNSAFE,
+                                 CreateMode.PERSISTENT,
+                                 true);
+        } catch (KeeperException.NodeExistsException e) {
+            LOG.warn("loadVerticesFromInputSplit: " + inputSplitFinishedPath +
+                     " already exists!");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-
-        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-            getVertexRanges(inputSplit, vertexList);
-
-        Map<I, List<Long>> maxIndexStatMap = getMaxIndexStatMap(vertexRangeMap);
-
-        return maxIndexStatMap;
+        if (LOG.isInfoEnabled()) {
+            LOG.info("loadVerticesFromInputSplit: Finished loading " +
+                     inputSplitPath + " " + vertexEdgeCount);
+        }
+        return vertexEdgeCount;
     }
 
     /**
@@ -374,7 +360,7 @@ public class BspServiceWorker<
         ((Writable) inputSplit).readFields(inputStream);
 
         if (LOG.isInfoEnabled()) {
-            LOG.info("loadVertices: Reserved " + inputSplitPath +
+            LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
                  " from ZooKeeper and got input split '" +
                  inputSplit.toString() + "'");
         }
@@ -389,17 +375,18 @@ public class BspServiceWorker<
      * @throws IOException
      * @throws InterruptedException
      */
-    private List<BasicVertex<I, V, E, M>> readVerticesFromInputSplit(
+    private VertexEdgeCount readVerticesFromInputSplit(
             InputSplit inputSplit) throws IOException, InterruptedException {
-        List<BasicVertex<I, V, E, M>> vertexList =
-            new ArrayList<BasicVertex<I, V, E, M>>();
         VertexInputFormat<I, V, E, M> vertexInputFormat =
             BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
         VertexReader<I, V, E, M> vertexReader =
             vertexInputFormat.createVertexReader(inputSplit, getContext());
         vertexReader.initialize(inputSplit, getContext());
+        long vertexCount = 0;
+        long edgeCount = 0;
         while (vertexReader.nextVertex()) {
-            BasicVertex<I, V, E, M> readerVertex = vertexReader.getCurrentVertex();
+            BasicVertex<I, V, E, M> readerVertex =
+                vertexReader.getCurrentVertex();
             if (readerVertex.getVertexId() == null) {
                 throw new IllegalArgumentException(
                     "loadVertices: Vertex reader returned a vertex " +
@@ -409,180 +396,49 @@ public class BspServiceWorker<
                 readerVertex.setVertexValue(
                     BspUtils.<V>createVertexValue(getConfiguration()));
             }
-            // Vertices must be ordered
-            if (!vertexList.isEmpty()) {
-                @SuppressWarnings("unchecked")
-                int compareTo =
-                    vertexList.get(vertexList.size() - 1).
-                    getVertexId().compareTo(readerVertex.getVertexId());
-                if (compareTo > 0) {
-                    throw new IllegalArgumentException(
-                        "loadVertices: Illegal out of order vertices " +
-                        "from vertex reader previous vertex = " +
-                        vertexList.get(vertexList.size() - 1) +
-                        ", next vertex = " + readerVertex);
-                }
+            PartitionOwner partitionOwner =
+                workerGraphPartitioner.getPartitionOwner(
+                    readerVertex.getVertexId());
+            Partition<I, V, E, M> partition =
+                inputSplitCache.get(partitionOwner);
+            if (partition == null) {
+                partition = new Partition<I, V, E, M>(
+                    getConfiguration(),
+                    partitionOwner.getPartitionId());
+                inputSplitCache.put(partitionOwner, partition);
+            }
+            partition.putVertex(readerVertex);
+            if (partition.getVertices().size() >= maxVerticesPerPartition) {
+                commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
+                                             partition);
+                partition.getVertices().clear();
             }
-            vertexList.add(readerVertex);
+            ++vertexCount;
+            edgeCount += readerVertex.getNumOutEdges();
             getContext().progress();
-        }
-        vertexReader.close();
-
-        return vertexList;
-    }
 
-    /**
-     * Partition the vertices into their respective VertexRanges.  The number of
-     * vertex ranges may be up to half of the number of available workers and
-     * must reach a minimum size.
-     *
-     * @param inputSplit inputSplit for these vertices
-     * @param vertexList vertices from which to build the ranges
-     * @return Ordered map of maxi vertex ID in range <-> vertices within range
-     * @throws InstantiationException
-     * @throws IllegalAccessException
-     * @throws IOException
-     */
-    private NavigableMap<I, VertexRange<I, V, E, M>> getVertexRanges(
-        InputSplit inputSplit, List<BasicVertex<I, V, E, M>> vertexList)
-        throws InstantiationException, IllegalAccessException, IOException {
-
-        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-            new TreeMap<I, VertexRange<I, V, E, M>>();
-        long vertexRangesPerInputSplit = getVertexRangesPerInputSplit(inputSplit);
-        long vertexRangeSize = vertexList.size() / vertexRangesPerInputSplit;
-        long minPerVertexRange =
-            getConfiguration().getLong(
-                GiraphJob.MIN_VERTICES_PER_RANGE,
-                GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
-        if (vertexRangeSize < minPerVertexRange) {
-            vertexRangeSize = minPerVertexRange;
-        }
-        I vertexIdMax = null;
-
-        // Identify the endpoints of the ranges, create empty placeholders
-        for (int i = 0; i < vertexList.size(); ++i) {
-            if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
-                VertexRange<I, V, E, M> vertexRange =
-                    new VertexRange<I, V, E, M>(
-                        null, -1, null, vertexIdMax, null);
-                vertexRangeMap.put(vertexIdMax, vertexRange);
-                vertexIdMax = null;
-            }
-
-            if (vertexIdMax == null) {
-                vertexIdMax = vertexList.get(i).getVertexId();
-            } else {
-                @SuppressWarnings("unchecked")
-                int compareTo =
-                    vertexList.get(i).getVertexId().compareTo(vertexIdMax);
-                if (compareTo > 0) {
-                    vertexIdMax = vertexList.get(i).getVertexId();
-                }
-            }
-        }
-        if (vertexIdMax == null) {
-            throw new RuntimeException("loadVertices: Encountered " +
-                                       "impossible null vertexIdMax.");
-        }
-        VertexRange<I, V, E, M> finalRange =
-            new VertexRange<I, V, E, M>(null, -1, null, vertexIdMax, null);
-        vertexRangeMap.put(vertexIdMax, finalRange);
-
-        // Now iterate over the defined ranges, placing each vertex in its range
-        Iterator<I> maxIndexVertexMapIt = vertexRangeMap.keySet().iterator();
-        I currentVertexIndexMax = maxIndexVertexMapIt.next();
-        for (BasicVertex<I, V, E, M> vertex : vertexList) {
-            @SuppressWarnings("unchecked")
-            int compareTo = vertex.getVertexId().compareTo(currentVertexIndexMax);
-            if (compareTo > 0) {
-                if (!maxIndexVertexMapIt.hasNext()) {
-                    throw new RuntimeException(
-                        "loadVertices: Impossible that vertex " +
-                        vertex.getVertexId() + " > " +
-                        currentVertexIndexMax);
+            ++totalVerticesLoaded;
+            totalEdgesLoaded += readerVertex.getNumOutEdges();
+            // Update status every half a million vertices
+            if ((totalVerticesLoaded % 500000) == 0) {
+                String status = "readVerticesFromInputSplit: Loaded " +
+                    totalVerticesLoaded + " vertices and " +
+                    totalEdgesLoaded + " edges " +
+                    ", totalMem = " + Runtime.getRuntime().totalMemory() +
+                    " maxMem ="  + Runtime.getRuntime().maxMemory() +
+                    " freeMem=" + Runtime.getRuntime().freeMemory() + " " +
+                    getGraphMapper().getMapFunctions().toString() +
+                    " - Attempt=" + getApplicationAttempt() +
+                    ", Superstep=" + getSuperstep();
+                if (LOG.isInfoEnabled()) {
+                    LOG.info(status);
                 }
-                currentVertexIndexMax = maxIndexVertexMapIt.next();
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("loadVertices: Adding vertex with index = " +
-                          vertex.getVertexId() + " to vertex range max = " +
-                          currentVertexIndexMax);
-            }
-            VertexRange<I, V, E, M> range =
-                vertexRangeMap.get(currentVertexIndexMax);
-            SortedMap<I, BasicVertex<I, V, E, M>> vertexMap = range.getVertexMap();
-            if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
-                throw new IllegalStateException(
-                    "loadVertices: Already contains vertex " +
-                    vertex.toString() + " in vertex range max " +
-                    currentVertexIndexMax);
+                getContext().setStatus(status);
             }
         }
-        return vertexRangeMap;
-    }
-
-    /**
-     * Determine the number of VertexRanges per InputSplit.
-     *
-     * @param inputSplit inputSplit to generate number for
-     * @return number of VertexRanges per split
-     */
-    private long getVertexRangesPerInputSplit(InputSplit inputSplit) {
-        // ZooKeeper has a limit of the data in a single znode of 1 MB and
-        // each entry can go be on the average somewhat more than 300 bytes
-        final long
-            maxVertexRangesPerInputSplit = 1024 * 1024 / 350 / inputSplitCount;
-
-        long vertexRangesPerInputSplit = (long) (inputSplitCount *
-            getConfiguration().getFloat(
-                GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
-                GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
-        if (vertexRangesPerInputSplit == 0) {
-            vertexRangesPerInputSplit = 1;
-        } else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit) {
-            LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit +
-                     " instead of " + vertexRangesPerInputSplit +
-                     " vertex ranges on input split " + inputSplit);
-            vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
-        }
-        return vertexRangesPerInputSplit;
-    }
-
-    /**
-     * Generate a mapping of max vertex indices to two-element lists consisting
-     * of the number of vertices and the number of edges in each partition.
-     * Additionally, adds entries of the vertexRangeMap to local map.
-     *
-     * @param vertexRangeMap Mapping of indices to VertexRanges
-     * @return  mapping of vertices to stats, or null if no vertices
-     *          are written for the partition
-     */
-    private Map<I, List<Long>> getMaxIndexStatMap(
-        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap) {
-        Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
-
-        for (Entry<I, VertexRange<I, V, E, M>> entry :
-                 vertexRangeMap.entrySet()) {
-            List<Long> statList = new ArrayList<Long>(2);
-            long vertexRangeEdgeCount = 0;
-            for (BasicVertex<I, V, E, M> vertex :
-                entry.getValue().getVertexMap().values()) {
-                vertexRangeEdgeCount += vertex.getNumOutEdges();
-            }
-            statList.add(Long.valueOf(entry.getValue().getVertexMap().size()));
-            statList.add(Long.valueOf(vertexRangeEdgeCount));
-            if (LOG.isInfoEnabled()) {
-                LOG.info("loadVertices: Got " + statList.get(0) +
-                     " vertices and " + statList.get(1) +
-                     " edges from vertex range max index " + entry.getKey());
-            }
-            maxIndexStatMap.put(entry.getKey(), statList);
+        vertexReader.close();
 
-            // Add the local vertex ranges to the stored vertex ranges
-            getStorableVertexRangeMap().put(entry.getKey(), entry.getValue());
-        }
-        return maxIndexStatMap;
+        return new VertexEdgeCount(vertexCount, edgeCount);
     }
 
     @Override
@@ -620,7 +476,13 @@ public class BspServiceWorker<
             }
         }
 
-        startSuperstep();
+        // Add the partitions for that this worker owns
+        Collection<? extends PartitionOwner> masterSetPartitionOwners =
+            startSuperstep();
+
+        workerGraphPartitioner.updatePartitionOwners(
+            getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+        commService.setup();
 
         // Ensure the InputSplits are ready for processing before processing
         while (true) {
@@ -643,22 +505,50 @@ public class BspServiceWorker<
         }
 
         getContext().progress();
+
         try {
-            loadVertices();
+            VertexEdgeCount vertexEdgeCount = loadVertices();
+            if (LOG.isInfoEnabled()) {
+                LOG.info("setup: Finally loaded a total of " +
+                         vertexEdgeCount);
+            }
         } catch (Exception e) {
             LOG.error("setup: loadVertices failed - ", e);
             throw new IllegalStateException("setup: loadVertices failed", e);
         }
+        getContext().progress();
 
-        long workerVertices = 0;
-        long workerEdges = 0;
-        for (VertexRange<I, V, E, M> vertexRange :
-                getStorableVertexRangeMap().values()) {
-            workerVertices += vertexRange.getVertexCount();
-            workerEdges += vertexRange.getEdgeCount();
+        // At this point all vertices have been sent to their destinations.
+        // Move them to the worker, creating creating the empty partitions
+        movePartitionsToWorker(commService);
+        for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+            if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+                !getPartitionMap().containsKey(
+                    partitionOwner.getPartitionId())) {
+                Partition<I, V, E, M> partition =
+                    new Partition<I, V, E, M>(getConfiguration(),
+                                              partitionOwner.getPartitionId());
+                getPartitionMap().put(partitionOwner.getPartitionId(),
+                                      partition);
+            }
         }
 
-        finishSuperstep(0, workerVertices, workerEdges, 0);
+        // Generate the partition stats for the input superstep and process
+        // if necessary
+        List<PartitionStats> partitionStatsList =
+            new ArrayList<PartitionStats>();
+        for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
+            PartitionStats partitionStats =
+                new PartitionStats(partition.getPartitionId(),
+                                   partition.getVertices().size(),
+                                   0,
+                                   partition.getEdgeCount());
+            partitionStatsList.add(partitionStats);
+        }
+        workerGraphPartitioner.finalizePartitionStats(
+            partitionStatsList, workerPartitionMap);
+
+        finishSuperstep(partitionStatsList, 0);
     }
 
     /**
@@ -753,9 +643,9 @@ public class BspServiceWorker<
                     new DataInputStream(input));
                 aggregator.setAggregatedValue(aggregatorValue);
                 if (LOG.isDebugEnabled()) {
-                    LOG.info("getAggregatorValues: " +
-                             "Got aggregator=" + aggregatorName + " value=" +
-                             aggregatorValue);
+                    LOG.debug("getAggregatorValues: " +
+                              "Got aggregator=" + aggregatorName + " value=" +
+                               aggregatorValue);
                 }
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -777,25 +667,25 @@ public class BspServiceWorker<
         JSONArray hostnamePort = new JSONArray();
         hostnamePort.put(getHostname());
 
-        hostnamePort.put(finalRpcPort);
+        hostnamePort.put(workerInfo.getPort());
 
         String myHealthPath = null;
         if (isHealthy()) {
-            myHealthPath = getWorkerHealthyPath(getApplicationAttempt(),
-                                                getSuperstep());
+            myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+                                                    getSuperstep());
         }
         else {
-            myHealthPath = getWorkerUnhealthyPath(getApplicationAttempt(),
-                                                  getSuperstep());
+            myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+                                                      getSuperstep());
         }
-        myHealthPath = myHealthPath + "/" + getHostnamePartitionId();
+        myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
         try {
-            myHealthZnode =
-                getZkExt().createExt(myHealthPath,
-                                     hostnamePort.toString().getBytes(),
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.EPHEMERAL,
-                                     true);
+            myHealthZnode = getZkExt().createExt(
+                myHealthPath,
+                WritableUtils.writeToByteArray(workerInfo),
+                Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL,
+                true);
         } catch (KeeperException.NodeExistsException e) {
             LOG.warn("registerHealth: myHealthPath already exists (likely " +
                      "from previous failure): " + myHealthPath +
@@ -816,51 +706,74 @@ public class BspServiceWorker<
             LOG.info("registerHealth: Created my health node for attempt=" +
                      getApplicationAttempt() + ", superstep=" +
                      getSuperstep() + " with " + myHealthZnode +
-                     " and hostnamePort = " + hostnamePort.toString());
+                     " and workerInfo= " + workerInfo);
         }
     }
 
-    public boolean startSuperstep() {
+    @Override
+    public Collection<? extends PartitionOwner> startSuperstep() {
         // Algorithm:
-        // 1. Register my health for the next superstep.
-        // 2. Wait until the vertex range assignment is complete (unless
-        //    superstep 0).
+        // 1. Communication service will combine message from previous
+        //    superstep
+        // 2. Register my health for the next superstep.
+        // 3. Wait until the partition assignment is complete and get it
+        // 4. Get the aggregator values from the previous superstep
+        if (getSuperstep() != INPUT_SUPERSTEP) {
+            commService.prepareSuperstep();
+        }
+
         registerHealth(getSuperstep());
 
-        String vertexRangeAssignmentsNode = null;
-        if (getSuperstep() > INPUT_SUPERSTEP) {
-            vertexRangeAssignmentsNode =
-                getVertexRangeAssignmentsPath(getApplicationAttempt(),
-                                              getSuperstep());
-            try {
-                while (getZkExt().exists(vertexRangeAssignmentsNode, true) ==
-                        null) {
-                    getVertexRangeAssignmentsReadyChangedEvent().waitForever();
-                    getVertexRangeAssignmentsReadyChangedEvent().reset();
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            if (LOG.isInfoEnabled()) {
-                LOG.info("startSuperstep: Ready for computation on superstep " +
-                         getSuperstep() + " since worker " +
-                         "selection and vertex range assignments are done in " +
-                         vertexRangeAssignmentsNode);
-            }
+        String partitionAssignmentsNode =
+            getPartitionAssignmentsPath(getApplicationAttempt(),
+                                        getSuperstep());
+        Collection<? extends PartitionOwner> masterSetPartitionOwners;
+        try {
+            while (getZkExt().exists(partitionAssignmentsNode, true) ==
+                    null) {
+                getPartitionAssignmentsReadyChangedEvent().waitForever();
+                getPartitionAssignmentsReadyChangedEvent().reset();
+            }
+            List<? extends Writable> writableList =
+                WritableUtils.readListFieldsFromZnode(
+                    getZkExt(),
+                    partitionAssignmentsNode,
+                    false,
+                    null,
+                    workerGraphPartitioner.createPartitionOwner().getClass(),
+                    getConfiguration());
+
+            @SuppressWarnings("unchecked")
+            Collection<? extends PartitionOwner> castedWritableList =
+                (Collection<? extends PartitionOwner>) writableList;
+            masterSetPartitionOwners = castedWritableList;
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "startSuperstep: KeeperException getting assignments", e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "startSuperstep: InterruptedException getting assignments", e);
         }
 
-        getAggregatorValues(getSuperstep());
+        if (LOG.isInfoEnabled()) {
+            LOG.info("startSuperstep: Ready for computation on superstep " +
+                     getSuperstep() + " since worker " +
+                     "selection and vertex range assignments are done in " +
+                     partitionAssignmentsNode);
+        }
+
+        if (getSuperstep() != INPUT_SUPERSTEP) {
+            getAggregatorValues(getSuperstep());
+        }
         getContext().setStatus("startSuperstep: " +
                                getGraphMapper().getMapFunctions().toString() +
                                " - Attempt=" + getApplicationAttempt() +
                                ", Superstep=" + getSuperstep());
-        return true;
+        return masterSetPartitionOwners;
     }
 
     @Override
-    public boolean finishSuperstep(long workerFinishedVertices,
-                                   long workerVertices,
-                                   long workerEdges,
+    public boolean finishSuperstep(List<PartitionStats> partitionStatsList,
                                    long workersSentMessages) {
         // This barrier blocks until success (or the master signals it to
         // restart).
@@ -872,18 +785,27 @@ public class BspServiceWorker<
         // of this worker
         // 3. Let the master know it is finished.
         // 4. Then it waits for the master to say whether to stop or not.
+        try {
+            commService.flush(getContext());
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "finishSuperstep: flush failed", e);
+        }
         JSONArray aggregatorValueArray =
             marshalAggregatorValues(getSuperstep());
+        Collection<PartitionStats> finalizedPartitionStats =
+            workerGraphPartitioner.finalizePartitionStats(
+                partitionStatsList, workerPartitionMap);
+        List<PartitionStats> finalizedPartitionStatsList =
+            new ArrayList<PartitionStats>(finalizedPartitionStats);
+        byte [] partitionStatsBytes =
+            WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
         JSONObject workerFinishedInfoObj = new JSONObject();
         try {
             workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
                                       aggregatorValueArray);
-            workerFinishedInfoObj.put(JSONOBJ_FINISHED_VERTICES_KEY,
-                                      workerFinishedVertices);
-            workerFinishedInfoObj.put(JSONOBJ_NUM_VERTICES_KEY,
-                                      workerVertices);
-            workerFinishedInfoObj.put(JSONOBJ_NUM_EDGES_KEY,
-                                      workerEdges);
+            workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+                                      Base64.encodeBytes(partitionStatsBytes));
             workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
                                       workersSentMessages);
         } catch (JSONException e) {
@@ -905,54 +827,46 @@ public class BspServiceWorker<
             throw new RuntimeException(e);
         }
 
+        getContext().setStatus("finishSuperstep: (waiting for rest " +
+                               "of workers) " +
+                               getGraphMapper().getMapFunctions().toString() +
+                               " - Attempt=" + getApplicationAttempt() +
+                               ", Superstep=" + getSuperstep());
+
         String superstepFinishedNode =
             getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
-        JSONObject globalStatsObject = null;
         try {
             while (getZkExt().exists(superstepFinishedNode, true) == null) {
                 getSuperstepFinishedEvent().waitForever();
                 getSuperstepFinishedEvent().reset();
             }
-            globalStatsObject = new JSONObject(
-                new String(getZkExt().getData(superstepFinishedNode,
-                                              false,
-                                              null)));
-        } catch (Exception e) {
-            throw new RuntimeException(
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "finishSuperstep: Failed while waiting for master to " +
+                "signal completion of superstep " + getSuperstep(), e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
                 "finishSuperstep: Failed while waiting for master to " +
                 "signal completion of superstep " + getSuperstep(), e);
         }
-        long finishedVertices =
-            globalStatsObject.optLong(JSONOBJ_FINISHED_VERTICES_KEY);
-        totalVertices =
-            globalStatsObject.optLong(JSONOBJ_NUM_VERTICES_KEY);
-        totalEdges =
-            globalStatsObject.optLong(JSONOBJ_NUM_EDGES_KEY);
-        long sentMessages =
-            globalStatsObject.optLong(JSONOBJ_NUM_MESSAGES_KEY);
+        GlobalStats globalStats = new GlobalStats();
+        WritableUtils.readFieldsFromZnode(
+            getZkExt(), superstepFinishedNode, false, null, globalStats);
         if (LOG.isInfoEnabled()) {
             LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
-                     " with total finished vertices = " + finishedVertices +
-                     " of out total vertices = " + totalVertices +
-                     ", total edges = " + totalEdges + ", sent messages = " +
-                     sentMessages);
+                     " with global stats " + globalStats);
         }
         incrCachedSuperstep();
-        getContext().setStatus("finishSuperstep: " +
+        getContext().setStatus("finishSuperstep: (all workers done) " +
                                getGraphMapper().getMapFunctions().toString() +
                                " - Attempt=" + getApplicationAttempt() +
                                ", Superstep=" + getSuperstep());
-        return ((finishedVertices == totalVertices) && (sentMessages == 0));
-    }
-
-    @Override
-    public long getTotalVertices() {
-        return totalVertices;
-    }
-
-    @Override
-    public long getTotalEdges() {
-        return totalEdges;
+        getGraphMapper().getGraphState().
+            setNumEdges(globalStats.getEdgeCount()).
+            setNumVertices(globalStats.getVertexCount());
+        return ((globalStats.getFinishedVertexCount() ==
+                globalStats.getVertexCount()) &&
+                (globalStats.getMessageCount() == 0));
     }
 
     /**
@@ -973,10 +887,8 @@ public class BspServiceWorker<
         VertexWriter<I, V, E> vertexWriter =
             vertexOutputFormat.createVertexWriter(getContext());
         vertexWriter.initialize(getContext());
-        for (Map.Entry<I, VertexRange<I, V, E, M>> entry :
-            getVertexRangeMap().entrySet()) {
-            for (BasicVertex<I, V, E, M> vertex :
-                entry.getValue().getVertexMap().values()) {
+        for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+            for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
                 vertexWriter.writeVertex(vertex);
             }
         }
@@ -985,6 +897,7 @@ public class BspServiceWorker<
 
     @Override
     public void cleanup() throws IOException, InterruptedException {
+        commService.closeConnections();
         setCachedSuperstep(getSuperstep() - 1);
         saveVertices();
          // All worker processes should denote they are done by adding special
@@ -1024,27 +937,20 @@ public class BspServiceWorker<
             // cleanup phase -- just log the error
             LOG.error("cleanup: Zookeeper failed to close with " + e);
         }
-    }
-
-    @Override
-    public VertexRange<I, V, E, M> getVertexRange(long superstep, I index) {
-        I maxVertexIndex = getVertexRangeMap(superstep).ceilingKey(index);
 
-        if (maxVertexIndex == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getVertexRange: no partition for " +
-                          "destination vertex " +
-                          index + " -- returning last partition");
-            }
-            return getVertexRangeMap(superstep).lastEntry().getValue();
-        }
-        else {
-            return getVertexRangeMap(superstep).get(maxVertexIndex);
-        }
+        // Preferably would shut down the service only after
+        // all clients have disconnected (or the exceptions on the
+        // client side ignored).
+        commService.close();
     }
 
     @Override
     public void storeCheckpoint() throws IOException {
+        getContext().setStatus("storeCheckpoint: Starting checkpoint " +
+                getGraphMapper().getMapFunctions().toString() +
+                " - Attempt=" + getApplicationAttempt() +
+                ", Superstep=" + getSuperstep());
+
         // Algorithm:
         // For each partition, dump vertices and messages
         Path metadataFilePath =
@@ -1060,91 +966,46 @@ public class BspServiceWorker<
                      getHostnamePartitionId() +
                      CHECKPOINT_VALID_POSTFIX);
 
-        // Remove these files if they already exist
-        try {
-            getFs().delete(validFilePath, false);
-            LOG.warn("storeCheckpoint: Removed file " + validFilePath);
-        } catch (IOException e) {
+        // Remove these files if they already exist (shouldn't though, unless
+        // of previous failure of this worker)
+        if (getFs().delete(validFilePath, false)) {
+            LOG.warn("storeCheckpoint: Removed valid file " +
+                     validFilePath);
+        }
+        if (getFs().delete(metadataFilePath, false)) {
+            LOG.warn("storeCheckpoint: Removed metadata file " +
+                     metadataFilePath);
         }
-        try {
-            getFs().delete(metadataFilePath, false);
-            LOG.warn("storeCheckpoint: Removed file " + metadataFilePath);
-        } catch (IOException e) {
-        }
-        try {
-            getFs().delete(verticesFilePath, false);
+        if (getFs().delete(verticesFilePath, false)) {
             LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
-        } catch (IOException e) {
         }
 
         FSDataOutputStream verticesOutputStream =
             getFs().create(verticesFilePath);
         ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
         DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
-        long workerVertexRanges = 0;
-        for (Map.Entry<I, VertexRange<I, V, E, M>> entry :
-                getVertexRangeMap().entrySet()) {
-            // Only write out the partitions the worker is responsible for
-            if (!entry.getValue().getHostnameId().equals(
-                    getHostnamePartitionId())) {
-                continue;
-            }
-
-            ++workerVertexRanges;
-            // Write the vertices (index, data, edges and messages)
-            // Format:
-            // <vertex count>
-            //   <v0 id><v0 value>
-            //     <v0 num edges>
-            //       <v0 edge 0 dest><v0 edge 0 value>
-            //       <v0 edge 1 dest><v0 edge 1 value>...
-            //     <v0 message count>
-            //       <v0 msg 0><v0 msg 1>...
+        for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
             long startPos = verticesOutputStream.getPos();
-            verticesOutputStream.writeLong(
-                entry.getValue().getVertexMap().size());
-            for (BasicVertex<I, V, E, M> vertex :
-                    entry.getValue().getVertexMap().values()) {
-                ByteArrayOutputStream vertexByteStream =
-                    new ByteArrayOutputStream();
-                DataOutput vertexOutput =
-                    new DataOutputStream(vertexByteStream);
-                vertex.write(vertexOutput);
-                verticesOutputStream.write(vertexByteStream.toByteArray());
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("storeCheckpoint: Wrote vertex id = " +
-                              vertex.getVertexId() + " with " +
-                              vertex.getNumOutEdges() + " edges and " +
-                              vertex.getMsgList().size() + " messages (" +
-                              vertexByteStream.size() + " total bytes)");
-                }
-            }
-            // Write the metadata for this vertex range
+            partition.write(verticesOutputStream);
+            // Write the metadata for this partition
             // Format:
             // <index count>
-            //   <index 0 start pos><# vertices><# edges><max index 0>
-            //   <index 1 start pos><# vertices><# edges><max index 1>...
+            //   <index 0 start pos><partition id>
+            //   <index 1 start pos><partition id>
             metadataOutput.writeLong(startPos);
-            metadataOutput.writeLong(entry.getValue().getVertexMap().size());
-            long edgeCount = 0;
-            for (BasicVertex<I, V, E, M> vertex :
-                    entry.getValue().getVertexMap().values()) {
-                edgeCount += vertex.getNumOutEdges();
-            }
-            metadataOutput.writeLong(edgeCount);
-            entry.getKey().write(metadataOutput);
+            metadataOutput.writeInt(partition.getPartitionId());
             if (LOG.isDebugEnabled()) {
                 LOG.debug("storeCheckpoint: Vertex file starting " +
                           "offset = " + startPos + ", length = " +
                           (verticesOutputStream.getPos() - startPos) +
-                          ", max index of vertex range = " + entry.getKey());
+                          ", partition = " + partition.toString());
             }
         }
         // Metadata is buffered and written at the end since it's small and
-        // needs to know how many vertex ranges this worker owns
+        // needs to know how many partitions this worker owns
         FSDataOutputStream metadataOutputStream =
             getFs().create(metadataFilePath);
-        metadataOutputStream.writeLong(workerVertexRanges);
+        metadataOutputStream.writeInt(workerPartitionMap.size());
         metadataOutputStream.write(metadataByteStream.toByteArray());
         metadataOutputStream.close();
         verticesOutputStream.close();
@@ -1157,246 +1018,254 @@ public class BspServiceWorker<
         getFs().createNewFile(validFilePath);
     }
 
-    /**
-     * Load a single vertex range from checkpoint files.
-     *
-     * @param maxIndex denotes the vertex range
-     * @param dataFileName name of the data file
-     * @param startPos position to start from in data file
-     * @throws IOException
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    private void loadVertexRange(I maxIndex,
-                                 String dataFileName,
-                                 long startPos)
-        throws IOException, InstantiationException, IllegalAccessException {
-        // Read in the reverse order from storeCheckpoint()
-        DataInputStream dataStream = getFs().open(new Path(dataFileName));
-        if (dataStream.skip(startPos) != startPos) {
-            throw new IllegalStateException(
-                "loadVertexRange: Failed to skip " + startPos);
-        }
-        long vertexCount = dataStream.readLong();
-        VertexRange<I, V, E, M> vertexRange = getVertexRangeMap().get(maxIndex);
-        for (int i = 0; i < vertexCount; ++i) {
-            BasicVertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(getConfiguration());
-            vertex.setGraphState(getGraphMapper().getGraphState());
-            vertex.readFields(dataStream);
-            // Add the vertex
-            if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex)
-                    != null) {
-                throw new IllegalStateException(
-                    "loadVertexRange: Vertex "  + vertex + " already exists");
-            }
-        }
-        if (LOG.isInfoEnabled()) {
-            LOG.info("loadVertexRange: " + vertexCount + " vertices in " +
-                     dataFileName);
-        }
-        dataStream.close();
-    }
-
     @Override
     public void loadCheckpoint(long superstep) {
         // Algorithm:
-        // Check all the vertex ranges for this worker and load the ones
-        // that match my hostname and id.
-        I maxVertexIndex = BspUtils.<I>createVertexIndex(getConfiguration());
-        long startPos = -1;
-        long vertexRangeCount = -1;
-        for (VertexRange<I, V, E, M> vertexRange :
-                getVertexRangeMap().values()) {
-            if (vertexRange.getHostnameId()
-                    .compareTo(getHostnamePartitionId()) == 0) {
+        // Examine all the partition owners and load the ones
+        // that match my hostname and id from the master designated checkpoint
+        // prefixes.
+        long startPos = 0;
+        int loadedPartitions = 0;
+        for (PartitionOwner partitionOwner :
+                workerGraphPartitioner.getPartitionOwners()) {
+            if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
                 String metadataFile =
-                    vertexRange.getCheckpointFilePrefix() +
+                    partitionOwner.getCheckpointFilesPrefix() +
                     CHECKPOINT_METADATA_POSTFIX;
+                String partitionsFile =
+                    partitionOwner.getCheckpointFilesPrefix() +
+                    CHECKPOINT_VERTICES_POSTFIX;
                 try {
+                    int partitionId = -1;
                     DataInputStream metadataStream =
                         getFs().open(new Path(metadataFile));
-                    vertexRangeCount = metadataStream.readLong();
-                    for (int i = 0; i < vertexRangeCount; ++i) {
+                    int partitions = metadataStream.readInt();
+                    for (int i = 0; i < partitions; ++i) {
                         startPos = metadataStream.readLong();
-                        // Skip the vertex count
-                        metadataStream.readLong();
-                        // Skip the edge count
-                        metadataStream.readLong();
-                        maxVertexIndex.readFields(metadataStream);
-                        @SuppressWarnings("unchecked")
-                        int compareTo =
-                            vertexRange.getMaxIndex().compareTo(maxVertexIndex);
-                        if (LOG.isDebugEnabled()) {
-	                        LOG.debug("loadCheckpoint: Comparing " +
-	                                  vertexRange.getMaxIndex() + " and " +
-	                                  maxVertexIndex + " = " + compareTo);
-                        }
-                        if (compareTo == 0) {
-                            loadVertexRange(
-                                vertexRange.getMaxIndex(),
-                                vertexRange.getCheckpointFilePrefix() +
-                                    CHECKPOINT_VERTICES_POSTFIX,
-                                startPos);
+                        partitionId = metadataStream.readInt();
+                        if (partitionId == partitionOwner.getPartitionId()) {
+                            break;
                         }
                     }
+                    if (partitionId != partitionOwner.getPartitionId()) {
+                        throw new IllegalStateException(
+                           "loadCheckpoint: " + partitionOwner +
+                           " not found!");
+                    }
                     metadataStream.close();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
+                    Partition<I, V, E, M> partition =
+                        new Partition<I, V, E, M>(
+                            getConfiguration(),
+                            partitionId);
+                    DataInputStream partitionsStream =
+                        getFs().open(new Path(partitionsFile));
+                    if (partitionsStream.skip(startPos) != startPos) {
+                        throw new IllegalStateException(
+                            "loadCheckpoint: Failed to skip " + startPos +
+                            " on " + partitionsFile);
+                    }
+                    partition.readFields(partitionsStream);
+                    partitionsStream.close();
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("loadCheckpoint: Loaded partition " +
+                                 partition);
+                    }
+                    if (getPartitionMap().put(partitionId, partition) != null) {
+                        throw new IllegalStateException(
+                            "loadCheckpoint: Already has partition owner " +
+                            partitionOwner);
+                    }
+                    ++loadedPartitions;
+                } catch (IOException e) {
+                    throw new RuntimeException(
+                        "loadCheckpoing: Failed to get partition owner " +
+                        partitionOwner, e);
                 }
             }
         }
+        if (LOG.isInfoEnabled()) {
+            LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+                    " partitions of out " +
+                    workerGraphPartitioner.getPartitionOwners().size() +
+                    " total.");
+        }
+        // Communication service needs to setup the connections prior to
+        // processing vertices
+        commService.setup();
     }
 
-    @Override
-    public final void exchangeVertexRanges() {
-        boolean syncRequired = false;
-        for (Entry<I, VertexRange<I, V, E, M>> entry :
-                getVertexRangeMap().entrySet()) {
-            final int previousPort = entry.getValue().getPreviousPort();
-            final String previousHostname =
-                entry.getValue().getPreviousHostname();
-            final int port = entry.getValue().getPort();
-            final String hostname = entry.getValue().getHostname();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("exchangeVertexRanges: For max index " +
-                          entry.getKey() + ", count " +
-                          entry.getValue().getVertexMap().size() +
-                          ", has previous port " +
-                          previousPort + ", previous hostname "
-                          + previousHostname +
-                          ", port " + port + ", hostname " + hostname);
-            }
-            if (previousPort == -1) {
-                continue;
-            }
-
-            if ((previousPort == finalRpcPort) &&
-                    getHostname().equals(previousHostname) &&
-                    ((port != finalRpcPort) ||
-                            !(getHostname().equals(hostname)))) {
-                if (!syncRequired) {
-                    getGraphMapper().getWorkerCommunications().
-                        cleanCachedVertexAddressMap();
-                }
-                List<BasicVertex<I, V, E, M>> vertexList =
-                    new ArrayList<BasicVertex<I, V, E, M>>(
-                        entry.getValue().getVertexMap().values());
-                if (vertexList != null) {
-                    LOG.info("exchangeVertexRanges: Sending vertex range " +
-                             entry.getKey() + " with " +
-                             vertexList.size() + " elements to " + hostname +
-                             ":" + port);
-                    getGraphMapper().getWorkerCommunications().sendVertexListReq(
-                        entry.getKey(), vertexList);
-                    vertexList.clear();
-                    entry.getValue().getVertexMap().clear();
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("exchangeVertexRanges: Sent vertex range " +
-                                 entry.getKey() + " with " +
-                                 vertexList.size() +
-                                 " elements to " + hostname +
-                                 ":" + port + " " + vertexList.size() + " " +
-                                 entry.getValue().getVertexMap().size());
-                    }
+    /**
+     * Send the worker partitions to their destination workers
+     *
+     * @param workerPartitionMap Map of worker info to the partitions stored
+     *        on this worker to be sent
+     */
+    private void sendWorkerPartitions(
+            Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+        List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+            new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+                workerPartitionMap.entrySet());
+        Collections.shuffle(randomEntryList);
+        for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+                randomEntryList) {
+            for (Integer partitionId : workerPartitionList.getValue()) {
+                Partition<I, V, E, M> partition =
+                    getPartitionMap().get(partitionId);
+                if (partition == null) {
+                    throw new IllegalStateException(
+                        "sendWorkerPartitions: Couldn't find partition " +
+                        partitionId + " to send to " +
+                        workerPartitionList.getKey());
                 }
-                syncRequired = true;
-            }
-            else if ((port == finalRpcPort) &&
-                    getHostname().equals(hostname) &&
-                    ((previousPort != finalRpcPort) ||
-                            !(getHostname().equals(previousHostname)))) {
                 if (LOG.isInfoEnabled()) {
-                    LOG.info("exchangeVertexRanges: Receiving " +
-                             entry.getKey() + " from " +
-                             previousHostname + ":" + previousPort);
-                }
-                if (!syncRequired) {
-                    getGraphMapper().getWorkerCommunications().
-                        cleanCachedVertexAddressMap();
-                }
-                VertexRange<I, V, E, M> destVertexRange =
-                    getVertexRangeMap().get(entry.getKey());
-                if ((destVertexRange.getVertexMap() != null) &&
-                        !destVertexRange.getVertexMap().isEmpty()) {
-                    throw new RuntimeException(
-                        "exchangeVertexRanges: Cannot receive max index " +
-                        entry.getKey() + " since already have " +
-                        destVertexRange.getVertexMap().size() + " elements.");
+                    LOG.info("sendWorkerPartitions: Sending worker " +
+                             workerPartitionList.getKey() + " partition " +
+                             partitionId);
                 }
-                syncRequired = true;
+                getGraphMapper().getGraphState().getWorkerCommunications().
+                sendPartitionReq(workerPartitionList.getKey(),
+                                 partition);
+                getPartitionMap().remove(partitionId);
             }
         }
 
-        // All senders and receivers must agree they are finished
-        if (syncRequired) {
-            String myVertexRangeExchangePath =
-                getVertexRangeExchangePath(getApplicationAttempt(),
-                                           getSuperstep()) +
-                                           "/" + getHostnamePartitionId();
-            String vertexRangeExchangeFinishedPath =
-                getVertexRangeExchangeFinishedPath(getApplicationAttempt(),
-                                                   getSuperstep());
-            LOG.info("exchangeVertexRanges: Ready with path " +
-                     myVertexRangeExchangePath);
-            try {
-                getZkExt().createExt(myVertexRangeExchangePath,
-                                     null,
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-                LOG.info("exchangeVertexRanges: Waiting on change to " +
-                         vertexRangeExchangeFinishedPath);
-                while (getZkExt().exists(vertexRangeExchangeFinishedPath, true)
-                        == null) {
-                    getVertexRangeExchangeFinishedChangedEvent().waitForever();
-                    getVertexRangeExchangeFinishedChangedEvent().reset();
-                }
-            } catch (KeeperException e) {
-                throw new RuntimeException(e);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+        String myPartitionExchangeDonePath =
+            getPartitionExchangeWorkerPath(
+                getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+        try {
+            getZkExt().createExt(myPartitionExchangeDonePath,
+                    null,
+                    Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT,
+                    true);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                    "sendWorkerPartitions: KeeperException to create " +
+                    myPartitionExchangeDonePath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                    "sendWorkerPartitions: InterruptedException to create " +
+                    myPartitionExchangeDonePath, e);
         }
+        if (LOG.isInfoEnabled()) {
+            LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+        }
+    }
 
-        // Add the vertices that were sent earlier.
-        Map<I, List<BasicVertex<I, V, E, M>>> inVertexRangeMap =
-            getGraphMapper().getWorkerCommunications().getInVertexRangeMap();
-        synchronized (inVertexRangeMap) {
-            for (Entry<I, List<BasicVertex<I, V, E, M>>> entry :
-                    inVertexRangeMap.entrySet()) {
-                if (entry.getValue() == null || entry.getValue().isEmpty()) {
-                    continue;
-                }
+    @Override
+    public final void exchangeVertexPartitions(
+            Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+        // 1. Fix the addresses of the partition ids if they have changed.
+        // 2. Send all the partitions to their destination workers in a random
+        //    fashion.
+        // 3. Notify completion with a ZooKeeper stamp
+        // 4. Wait for all my dependencies to be done (if any)
+        // 5. Add the partitions to myself.
+        PartitionExchange partitionExchange =
+            workerGraphPartitioner.updatePartitionOwners(
+                getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+        commService.fixPartitionIdToSocketAddrMap();
+
+        Map<WorkerInfo, List<Integer>> workerPartitionMap =
+            partitionExchange.getSendWorkerPartitionMap();
+        if (!workerPartitionMap.isEmpty()) {
+            sendWorkerPartitions(workerPartitionMap);
+        }
+
+        Set<WorkerInfo> myDependencyWorkerSet =
+            partitionExchange.getMyDependencyWorkerSet();
+        Set<String> workerIdSet = new HashSet<String>();
+        for (WorkerInfo workerInfo : myDependencyWorkerSet) {
+            if (workerIdSet.add(workerInfo.getHostnameId()) != true) {
+                throw new IllegalStateException(
+                    "exchangeVertexPartitions: Duplicate entry " + workerInfo);
+            }
+        }
+        if (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+                         "exiting early");
+            }
+            return;
+        }
 
-                SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
-                    getVertexRangeMap().get(entry.getKey()).getVertexMap();
-                if (vertexMap.size() != 0) {
-                    throw new RuntimeException(
-                        "exchangeVertexRanges: Failed to import vertex range " +
-                        entry.getKey() + " of size " + entry.getValue().size() +
-                        " since it is already of size " + vertexMap.size() +
-                        " but should be empty!");
+        String vertexExchangePath =
+            getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+        List<String> workerDoneList;
+        try {
+            while (true) {
+                workerDoneList = getZkExt().getChildrenExt(
+                    vertexExchangePath, true, false, false);
+                workerIdSet.removeAll(workerDoneList);
+                if (workerIdSet.isEmpty()) {
+                    break;
                 }
                 if (LOG.isInfoEnabled()) {
-                    LOG.info("exchangeVertexRanges: Adding " +
-                             entry.getValue().size() +
-                             " vertices for max index " + entry.getKey());
+                    LOG.info("exchangeVertexPartitions: Waiting for workers " +
+                             workerIdSet);
                 }
+                getPartitionExchangeChildrenChangedEvent().waitForever();
+                getPartitionExchangeChildrenChangedEvent().reset();
+            }
+        } catch (KeeperException e) {
+            throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("exchangeVertexPartitions: Done with exchange.");
+        }
+
+        // Add the partitions sent earlier
+        movePartitionsToWorker(commService);
+    }
+
+    /**
+     * Partitions that are exchanged need to be moved from the communication
+     * service to the worker.
+     *
+     * @param commService Communication service where the partitions are
+     *        temporarily stored.
+     */
+    private void movePartitionsToWorker(
+            ServerInterface<I, V, E, M> commService) {
+        Map<Integer, List<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+                commService.getInPartitionVertexMap();
+        synchronized (inPartitionVertexMap) {
+            for (Entry<Integer, List<BasicVertex<I, V, E, M>>> entry :
+                inPartitionVertexMap.entrySet()) {
+                if (getPartitionMap().containsKey(entry.getKey())) {
+                    throw new IllegalStateException(
+                        "moveVerticesToWorker: Already has partition " +
+                         entry.getKey());
+                }
+
+                Partition<I, V, E, M> tmpPartition = new Partition<I, V, E, M>(
+                    getConfiguration(),
+                    entry.getKey());
                 for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
-                    if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
+                    if (tmpPartition.putVertex(vertex) != null) {
                         throw new IllegalStateException(
-                            "exchangeVertexRanges: Vertex " + vertex +
+                            "moveVerticesToWorker: Vertex " + vertex +
                             " already exists!");
                     }
                 }
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("moveVerticesToWorker: Adding " +
+                            entry.getValue().size() +
+                            " vertices for partition id " + entry.getKey());
+                }
+                getPartitionMap().put(tmpPartition.getPartitionId(),
+                                      tmpPartition);
                 entry.getValue().clear();
             }
+            inPartitionVertexMap.clear();
         }
     }
 
-    @Override
-    public NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap() {
-        return getVertexRangeMap(getSuperstep());
+    final public BspEvent getPartitionExchangeChildrenChangedEvent() {
+        return partitionExchangeChildrenChanged;
     }
 
     @Override
@@ -1423,8 +1292,53 @@ public class BspServiceWorker<
                     "processEvent: Couldn't properly get job state from " +
                     jsonObj.toString());
             }
-            return true;
+            foundEvent = true;
+        } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+                   event.getType() == EventType.NodeChildrenChanged) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("processEvent : partitionExchangeChildrenChanged " +
+                         "(at least one worker is done sending partitions)");
+            }
+            partitionExchangeChildrenChanged.signal();
+            foundEvent = true;
         }
+
         return foundEvent;
     }
+
+    @Override
+    public WorkerInfo getWorkerInfo() {
+        return workerInfo;
+    }
+
+    @Override
+    public Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
+        return workerPartitionMap;
+    }
+
+    @Override
+    public Collection<? extends PartitionOwner> getPartitionOwners() {
+        return workerGraphPartitioner.getPartitionOwners();
+    }
+
+    @Override
+    public PartitionOwner getVertexPartitionOwner(I vertexIndex) {
+        return workerGraphPartitioner.getPartitionOwner(vertexIndex);
+    }
+
+    public Partition<I, V, E, M> getPartition(I vertexIndex) {
+        PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+        return workerPartitionMap.get(partitionOwner.getPartitionId());
+    }
+
+    @Override
+    public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
+        PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+        if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
+            return workerPartitionMap.get(
+                partitionOwner.getPartitionId()).getVertex(vertexIndex);
+        } else {
+            return null;
+        }
+    }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Nov 15 00:54:20 2011
@@ -18,6 +18,9 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -29,6 +32,56 @@ import org.apache.hadoop.util.Reflection
  */
 public class BspUtils {
     /**
+     * Get the user's subclassed {@link GraphPartitionerFactory}.
+     *
+     * @param conf Configuration to check
+     * @return User's graph partitioner
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static <I extends WritableComparable, V extends Writable,
+                   E extends Writable, M extends Writable>
+            Class<? extends GraphPartitionerFactory<I, V, E, M>>
+            getGraphPartitionerClass(Configuration conf) {
+        return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+                conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+                              HashPartitionerFactory.class,
+                              GraphPartitionerFactory.class);
+    }
+
+    /**
+     * Create a user graph partitioner class
+     *
+     * @param conf Configuration to check
+     * @return Instantiated user graph partitioner class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable,
+            E extends Writable, M extends Writable>
+            GraphPartitionerFactory<I, V, E, M>
+            createGraphPartitioner(Configuration conf) {
+        Class<? extends GraphPartitionerFactory<I, V, E, M>>
+            graphPartitionerFactoryClass =
+            getGraphPartitionerClass(conf);
+        return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
+    }
+
+    /**
+     * Create a user graph partitioner partition stats class
+     *
+     * @param conf Configuration to check
+     * @return Instantiated user graph partition stats class
+     */
+    @SuppressWarnings("rawtypes")
+    public static <I extends WritableComparable, V extends Writable,
+            E extends Writable, M extends Writable>
+            PartitionStats createGraphPartitionStats(Configuration conf) {
+        GraphPartitionerFactory<I, V, E, M> graphPartitioner =
+            createGraphPartitioner(conf);
+        return graphPartitioner.createMasterGraphPartitioner().
+            createPartitionStats();
+    }
+
+    /**
      * Get the user's subclassed {@link VertexInputFormat}.
      *
      * @param conf Configuration to check
@@ -131,41 +184,6 @@ public class BspUtils {
     }
 
     /**
-     * Get the user's subclassed vertex range balancer
-     *
-     * @param conf Configuration to check
-     * @return User's vertex range balancer class
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static <I extends WritableComparable,
-                   V extends Writable,
-                   E extends Writable,
-                   M extends Writable>
-            Class<? extends VertexRangeBalancer<I, V, E, M>>
-            getVertexRangeBalancerClass(Configuration conf) {
-        return (Class<? extends VertexRangeBalancer<I, V, E, M>>)
-                conf.getClass(GiraphJob.VERTEX_RANGE_BALANCER_CLASS,
-                              StaticBalancer.class,
-                              BasicVertexRangeBalancer.class);
-    }
-
-    /**
-     * Create a user vertex range balancer class
-     *
-     * @param conf Configuration to check
-     * @return Instantiated user vertex input format class
-     */
-    @SuppressWarnings("rawtypes")
-    public static <I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable>
-            VertexRangeBalancer<I, V, E, M>
-            createVertexRangeBalancer(Configuration conf) {
-        Class<? extends VertexRangeBalancer<I, V, E, M>>
-            vertexRangeBalancerClass = getVertexRangeBalancerClass(conf);
-        return ReflectionUtils.newInstance(vertexRangeBalancerClass, conf);
-    }
-
-    /**
      * Get the user's subclassed VertexResolver.
      *
      * @param conf Configuration to check
@@ -202,7 +220,7 @@ public class BspUtils {
         resolver.setGraphState(graphState);
         return resolver;
     }
-    
+
    /**
      * Get the user's subclassed WorkerContext.
      *
@@ -216,7 +234,7 @@ public class BspUtils {
                               DefaultWorkerContext.class,
                               WorkerContext.class);
     }
-     
+
    /**
      * Create a user worker context
      *
@@ -224,12 +242,12 @@ public class BspUtils {
      * @return Instantiated user worker context
      */
     @SuppressWarnings("rawtypes")
-	public static <I extends WritableComparable, 
+	public static <I extends WritableComparable,
     			   V extends Writable,
-    			   E extends Writable, 
-    			   M extends Writable> 
+    			   E extends Writable,
+    			   M extends Writable>
     		WorkerContext createWorkerContext(Configuration conf,
-            		GraphState<I, V, E, M> graphState) {
+                GraphState<I, V, E, M> graphState) {
         Class<? extends WorkerContext> workerContextClass =
             getWorkerContextClass(conf);
         WorkerContext workerContext =
@@ -237,7 +255,7 @@ public class BspUtils {
         workerContext.setGraphState(graphState);
         return workerContext;
     }
-    
+
 
     /**
      * Get the user's subclassed Vertex.
@@ -268,8 +286,10 @@ public class BspUtils {
     public static <I extends WritableComparable, V extends Writable,
             E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
             createVertex(Configuration conf) {
-        Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
-        BasicVertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+        Class<? extends BasicVertex<I, V, E, M>> vertexClass =
+            getVertexClass(conf);
+        BasicVertex<I, V, E, M> vertex =
+            ReflectionUtils.newInstance(vertexClass, conf);
         return vertex;
     }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Nov 15 00:54:20 2011
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
@@ -43,12 +44,12 @@ public class GiraphJob extends Job {
     /** Vertex combiner class - optional */
     public static final String VERTEX_COMBINER_CLASS =
         "giraph.combinerClass";
-    /** Vertex range balancer class - optional */
-    public static final String VERTEX_RANGE_BALANCER_CLASS =
-        "giraph.vertexRangeBalancerClass";
     /** Vertex resolver class - optional */
     public static final String VERTEX_RESOLVER_CLASS =
         "giraph.vertexResolverClass";
+    /** Graph partitioner factory class - optional */
+    public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
+        "giraph.graphPartitionerFactoryClass";
 
     /** Vertex index class */
     public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
@@ -143,11 +144,20 @@ public class GiraphJob extends Job {
     /** Default maximum number of RPC handlers */
     public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
 
+    /**
+     *  Maximum number of vertices per partition before sending.
+     *  (input superstep only).
+     */
+    public static final String MAX_VERTICES_PER_PARTITION =
+        "giraph.maxVerticesPerPartition";
+    /** Default maximum number of vertices per partition before sending. */
+    public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
+
     /** Maximum number of messages per peer before flush */
     public static final String MSG_SIZE = "giraph.msgSize";
     /** Default maximum number of messages per peer before flush */
     public static final int MSG_SIZE_DEFAULT = 1000;
-    
+
     /** Number of flush threads per peer */
     public static final String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
 
@@ -191,7 +201,7 @@ public class GiraphJob extends Job {
         "giraph.zkJavaOpts";
     /** Default java opts passed to ZooKeeper startup */
     public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
-        "-Xmx256m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+        "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
         "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
 
     /**
@@ -372,16 +382,15 @@ public class GiraphJob extends Job {
     }
 
     /**
-     * Set the vertex range balancer class (optional)
+     * Set the graph partitioner class (optional)
      *
-     * @param vertexRangeBalancerClass Determines how vertex
-     *        ranges are balanced prior to each superstep
+     * @param graphPartitionerClass Determines how the graph is partitioned
      */
-    final public void setVertexRangeBalancerClass(
-            Class<?> vertexRangeBalancerClass) {
-        getConfiguration().setClass(VERTEX_RANGE_BALANCER_CLASS,
-                                    vertexRangeBalancerClass,
-                                    VertexRangeBalancer.class);
+    final public void setGraphPartitionerFactoryClass(
+            Class<?> graphPartitionerFactoryClass) {
+        getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+                                    graphPartitionerFactoryClass,
+                                    GraphPartitionerFactory.class);
     }
 
     /**
@@ -394,7 +403,7 @@ public class GiraphJob extends Job {
                                     vertexResolverClass,
                                     VertexResolver.class);
     }
-    
+
    /**
     * Set the worker context class (optional)
     *
@@ -406,7 +415,7 @@ public class GiraphJob extends Job {
                                     workerContextClass,
                                     WorkerContext.class);
     }
-    
+
     /**
      * Set worker configuration for determining what is required for
      * a superstep.
@@ -499,6 +508,9 @@ public class GiraphJob extends Job {
         setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
         setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
 
+        // Speculative execution doesn't make sense for Giraph
+        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+
         if (getJar() == null) {
             setJarByClass(GiraphJob.class);
         }

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Aggregated stats by the master.
+ */
+public class GlobalStats implements Writable {
+    private long vertexCount = 0;
+    private long finishedVertexCount = 0;
+    private long edgeCount = 0;
+    private long messageCount = 0;
+
+    public void addPartitionStats(PartitionStats partitionStats) {
+        this.vertexCount += partitionStats.getVertexCount();
+        this.finishedVertexCount += partitionStats.getFinishedVertexCount();
+        this.edgeCount += partitionStats.getEdgeCount();
+    }
+
+    public long getVertexCount() {
+        return vertexCount;
+    }
+
+    public long getFinishedVertexCount() {
+        return finishedVertexCount;
+    }
+
+    public long getEdgeCount() {
+        return edgeCount;
+    }
+
+    public long getMessageCount() {
+        return messageCount;
+    }
+
+    public void addMessageCount(long messageCount) {
+        this.messageCount += messageCount;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        vertexCount = input.readLong();
+        finishedVertexCount = input.readLong();
+        edgeCount = input.readLong();
+        messageCount = input.readLong();
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeLong(vertexCount);
+        output.writeLong(finishedVertexCount);
+        output.writeLong(edgeCount);
+        output.writeLong(messageCount);
+    }
+
+    @Override
+    public String toString() {
+        return "(vtx=" + vertexCount + ",finVtx=" +
+               finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
+               messageCount + ")";
+    }
+}