You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/25 03:52:53 UTC

svn commit: r1377179 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/graph/partition/ src/test/java/org/apache/giraph/ src/test/jav...

Author: aching
Date: Sat Aug 25 01:52:52 2012
New Revision: 1377179

URL: http://svn.apache.org/viewvc?rev=1377179&view=rev
Log:
GIRAPH-249: Move part of the graph out-of-core when memory is low
(apresta via aching).

Added:
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
    giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java
      - copied, changed from r1375824, giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/partition/
    giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
Removed:
    giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Aug 25 01:52:52 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-249: Move part of the graph out-of-core when memory is low
+  (apresta via aching).
+
   GIRAPH-306: Netty requests should be reliable and implement exactly
   once semantics. (aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Sat Aug 25 01:52:52 2012
@@ -21,10 +21,10 @@ package org.apache.giraph.bsp;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.graph.WorkerAggregatorUsage;
+import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -63,14 +63,14 @@ public interface CentralizedServiceWorke
   WorkerContext getWorkerContext();
 
   /**
-   * Get a map of the partition id to the partition for this worker.
+   * Get the partition store for this worker.
    * The partitions contain the vertices for
    * this worker and can be used to run compute() for the vertices or do
    * checkpointing.
    *
-   * @return List of partitions that this worker owns.
+   * @return The partition store for this worker.
    */
-  Map<Integer, Partition<I, V, E, M>> getPartitionMap();
+  PartitionStore<I, V, E, M> getPartitionStore();
 
   /**
    * Get a collection of all the partition owners.
@@ -115,15 +115,31 @@ public interface CentralizedServiceWorke
   boolean finishSuperstep(List<PartitionStats> partitionStatsList);
 
   /**
-   * Get the partition that a vertex index would belong to
+   * Get the partition that a vertex id would belong to.
    *
-   * @param vertexId Index of the vertex that is used to find the correct
+   * @param vertexId Id of the vertex that is used to find the correct
    *        partition.
    * @return Correct partition if exists on this worker, null otherwise.
    */
   Partition<I, V, E, M> getPartition(I vertexId);
 
   /**
+   * Get the partition id that a vertex id would belong to.
+   *
+   * @param vertexId Vertex id
+   * @return Partition id
+   */
+  Integer getPartitionId(I vertexId);
+
+  /**
+   * Whether a partition with given id exists on this worker.
+   *
+   * @param partitionId Partition id
+   * @return True iff this worker has the specified partition
+   */
+  boolean hasPartition(Integer partitionId);
+
+  /**
    * Every client will need to get a partition owner from a vertex id so that
    * they know which worker to sent the request to.
    *

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Aug 25 01:52:52 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.IOException;
@@ -812,19 +811,7 @@ public abstract class BasicRPCCommunicat
       LOG.debug("putVertexList: On partition id " + partitionId +
           " adding vertex list of size " + vertexList.size());
     }
-    synchronized (inPartitionVertexMap) {
-      if (vertexList.size() == 0) {
-        return;
-      }
-      if (!inPartitionVertexMap.containsKey(partitionId)) {
-        inPartitionVertexMap.put(partitionId,
-            Lists.newArrayList(vertexList));
-      } else {
-        Collection<Vertex<I, V, E, M>> tmpVertices =
-            inPartitionVertexMap.get(partitionId);
-        tmpVertices.addAll(vertexList);
-      }
-    }
+    service.getPartitionStore().addPartitionVertices(partitionId, vertexList);
   }
 
   @Override
@@ -1170,7 +1157,7 @@ public abstract class BasicRPCCommunicat
       // Assign the messages to each destination vertex (getting rid of
       // the old ones)
       for (Partition<I, V, E, M> partition :
-        service.getPartitionMap().values()) {
+          service.getPartitionStore().getPartitions()) {
         for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
           List<M> msgList = inMessages.get(vertex.getId());
           if (msgList != null) {
@@ -1263,7 +1250,7 @@ public abstract class BasicRPCCommunicat
       if (partition == null) {
         throw new IllegalStateException(
             "prepareSuperstep: No partition for index " + vertexIndex +
-            " in " + service.getPartitionMap() + " should have been " +
+            " in " + service.getPartitionStore() + " should have been " +
             service.getVertexPartitionOwner(vertexIndex));
       }
       if (vertex != null) {
@@ -1315,10 +1302,4 @@ public abstract class BasicRPCCommunicat
   public String getName() {
     return myName;
   }
-
-  @Override
-  public Map<Integer, Collection<Vertex<I, V, E, M>>>
-  getInPartitionVertexMap() {
-    return inPartitionVertexMap;
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java Sat Aug 25 01:52:52 2012
@@ -28,8 +28,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
 
 /**
  * Netty based implementation of the {@link WorkerClientServer} interface.
@@ -126,12 +124,6 @@ public class NettyWorkerClientServer<I e
   }
 
   @Override
-  public Map<Integer, Collection<Vertex<I, V, E, M>>>
-  getInPartitionVertexMap() {
-    return server.getInPartitionVertexMap();
-  }
-
-  @Override
   public ServerData<I, V, E, M> getServerData() {
     return server.getServerData();
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java Sat Aug 25 01:52:52 2012
@@ -20,8 +20,8 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
 import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
 import org.apache.giraph.comm.messages.FlushableMessageStore;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
@@ -40,8 +40,6 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
 
-import java.util.Collection;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -85,7 +83,7 @@ public class NettyWorkerServer<I extends
         GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
     if (!useOutOfCoreMessaging) {
       serverData = new ServerData<I, V, E, M>(
-          SimpleMessageStore.newFactory(service, conf));
+          conf, SimpleMessageStore.newFactory(service, conf));
     } else {
       int maxMessagesInMemory = conf.getInt(GiraphJob.MAX_MESSAGES_IN_MEMORY,
           GiraphJob.MAX_MESSAGES_IN_MEMORY_DEFAULT);
@@ -97,7 +95,7 @@ public class NettyWorkerServer<I extends
       MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
           storeFactory = DiskBackedMessageStoreByPartition.newFactory(service,
               maxMessagesInMemory, partitionStoreFactory);
-      serverData = new ServerData<I, V, E, M>(storeFactory);
+      serverData = new ServerData<I, V, E, M>(conf, storeFactory);
     }
 
     nettyServer = new NettyServer<I, V, E, M>(conf, serverData);
@@ -115,18 +113,16 @@ public class NettyWorkerServer<I extends
 
     Set<I> resolveVertexIndexSet = Sets.newHashSet();
     // Keep track of the vertices which are not here but have received messages
-    for (I vertexId :
-        serverData.getCurrentMessageStore().getDestinationVertices()) {
-      Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
-      if (vertex == null) {
-        if (service.getPartition(vertexId) == null) {
-          throw new IllegalStateException(
-              "prepareSuperstep: No partition for vertex index " + vertexId);
-        }
-        if (!resolveVertexIndexSet.add(vertexId)) {
-          throw new IllegalStateException(
-              "prepareSuperstep: Already has missing vertex on this " +
-                  "worker for " + vertexId);
+    for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
+      for (I vertexId : serverData.getCurrentMessageStore().
+          getPartitionDestinationVertices(partitionId)) {
+        Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
+        if (vertex == null) {
+          if (!resolveVertexIndexSet.add(vertexId)) {
+            throw new IllegalStateException(
+                "prepareSuperstep: Already has missing vertex on this " +
+                    "worker for " + vertexId);
+          }
         }
       }
     }
@@ -175,7 +171,7 @@ public class NettyWorkerServer<I extends
       if (partition == null) {
         throw new IllegalStateException(
             "prepareSuperstep: No partition for index " + vertexIndex +
-            " in " + service.getPartitionMap() + " should have been " +
+            " in " + service.getPartitionStore() + " should have been " +
             service.getVertexPartitionOwner(vertexIndex));
       }
       if (vertex != null) {
@@ -193,12 +189,6 @@ public class NettyWorkerServer<I extends
   }
 
   @Override
-  public Map<Integer, Collection<Vertex<I, V, E, M>>>
-  getInPartitionVertexMap() {
-    return serverData.getPartitionVertexMap();
-  }
-
-  @Override
   public ServerData<I, V, E, M> getServerData() {
     return serverData;
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Sat Aug 25 01:52:52 2012
@@ -18,21 +18,20 @@
 
 package org.apache.giraph.comm;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.giraph.comm.RequestRegistry.Type;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+
 /**
  * Send a collection of vertices for a partition.
  *
@@ -98,25 +97,12 @@ public class SendVertexRequest<I extends
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
-    partitionVertexMap = serverData.getPartitionVertexMap();
     if (vertices.isEmpty()) {
       LOG.warn("doRequest: Got an empty request!");
       return;
     }
-    Collection<Vertex<I, V, E, M>> vertexMap =
-        partitionVertexMap.get(partitionId);
-    if (vertexMap == null) {
-      final Collection<Vertex<I, V, E, M>> tmpVertices  =
-          Lists.newArrayListWithCapacity(vertices.size());
-      vertexMap = partitionVertexMap.putIfAbsent(partitionId, tmpVertices);
-      if (vertexMap == null) {
-        vertexMap = tmpVertices;
-      }
-    }
-    synchronized (vertexMap) {
-      vertexMap.addAll(vertices);
-    }
+    serverData.getPartitionStore().addPartitionVertices(partitionId,
+        vertices);
   }
 }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Sat Aug 25 01:52:52 2012
@@ -20,13 +20,16 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.partition.DiskBackedPartitionStore;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.graph.partition.SimplePartitionStore;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -40,14 +43,8 @@ import java.util.concurrent.ConcurrentHa
 @SuppressWarnings("rawtypes")
 public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Map of partition ids to incoming vertices from other workers.
-   * (Synchronized on values)
-   */
-  private final ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
-  inPartitionVertexMap =
-      new ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>();
-
+  /** Partition store for this worker. */
+  private volatile PartitionStore<I, V, E, M> partitionStore;
   /** Message store factory */
   private final
   MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
@@ -68,23 +65,34 @@ public class ServerData<I extends Writab
   private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
   vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
 
-  /** @param messageStoreFactory Factory for message stores */
-  public ServerData(MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
-      messageStoreFactory) {
+  /**
+   * Constructor.
+   *
+   * @param configuration Configuration
+   * @param messageStoreFactory Factory for message stores
+   */
+  public ServerData(Configuration configuration,
+                    MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+                        messageStoreFactory) {
 
     this.messageStoreFactory = messageStoreFactory;
     currentMessageStore = messageStoreFactory.newStore();
     incomingMessageStore = messageStoreFactory.newStore();
+    if (configuration.getBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH,
+        GiraphJob.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+      partitionStore = new DiskBackedPartitionStore<I, V, E, M>(configuration);
+    } else {
+      partitionStore = new SimplePartitionStore<I, V, E, M>(configuration);
+    }
   }
 
   /**
-   * Get the partition vertices (synchronize on the values)
+   * Return the partition store for this worker.
    *
-   * @return Partition vertices
+   * @return The partition store
    */
-  public ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
-  getPartitionVertexMap() {
-    return inPartitionVertexMap;
+  public PartitionStore<I, V, E, M> getPartitionStore() {
+    return partitionStore;
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,10 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.Closeable;
-import java.util.Collection;
-import java.util.Map;
 
 /**
  * Interface for message communication server.
@@ -52,15 +49,6 @@ public interface WorkerServer<I extends 
   void prepareSuperstep();
 
   /**
-   * Get the vertices that were sent in the last iteration.  After getting
-   * the map, the user should synchronize with it to insure it
-   * is thread-safe.
-   *
-   * @return map of vertex ranges to vertices
-   */
-  Map<Integer, Collection<Vertex<I, V, E, M>>> getInPartitionVertexMap();
-
-  /**
    * Get server data
    *
    * @return Server data

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Aug 25 01:52:52 2012
@@ -24,11 +24,12 @@ import org.apache.giraph.comm.NettyWorke
 import org.apache.giraph.comm.RPCCommunications;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientServer;
-import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.graph.partition.SimplePartitionStore;
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.WritableUtils;
@@ -100,9 +101,6 @@ public class BspServiceWorker<I extends 
   inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
   /** Communication service */
   private final WorkerClientServer<I, V, E, M> commService;
-  /** Structure to store the partitions on this worker */
-  private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
-      new HashMap<Integer, Partition<I, V, E, M>>();
   /** Have the partition exchange children (workers) changed? */
   private final BspEvent partitionExchangeChildrenChanged;
   /** Regulates the size of outgoing Collections of vertices read
@@ -118,6 +116,10 @@ public class BspServiceWorker<I extends 
   private long totalEdgesLoaded = 0;
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
+  /**
+   * Partition store for worker (only used by the Hadoop RPC implementation).
+   */
+  private final PartitionStore<I, V, E, M> workerPartitionStore;
 
   /**
    * Constructor for setting up the worker.
@@ -171,6 +173,13 @@ public class BspServiceWorker<I extends 
     this.workerContext =
         BspUtils.createWorkerContext(getConfiguration(),
             graphMapper.getGraphState());
+
+    if (useNetty) {
+      workerPartitionStore = null;
+    } else {
+      workerPartitionStore =
+          new SimplePartitionStore<I, V, E, M>(getConfiguration());
+    }
   }
 
   public WorkerContext getWorkerContext() {
@@ -558,7 +567,7 @@ public class BspServiceWorker<I extends 
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
         startSuperstep();
     workerGraphPartitioner.updatePartitionOwners(
-        getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 
     commService.setup();
 
@@ -647,18 +656,15 @@ public class BspServiceWorker<I extends 
       getInputSplitsAllDoneEvent().reset();
     }
 
-    // At this point all vertices have been sent to their destinations.
-    // Move them to the worker, creating the empty partitions
-    movePartitionsToWorker(commService);
+    // Create remaining partitions owned by this worker.
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
       if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
-          !getPartitionMap().containsKey(
+          !getPartitionStore().hasPartition(
               partitionOwner.getPartitionId())) {
         Partition<I, V, E, M> partition =
             new Partition<I, V, E, M>(getConfiguration(),
                 partitionOwner.getPartitionId());
-        getPartitionMap().put(partitionOwner.getPartitionId(),
-            partition);
+        getPartitionStore().addPartition(partition);
       }
     }
 
@@ -666,7 +672,8 @@ public class BspServiceWorker<I extends 
     // if necessary
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
-    for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
               partition.getVertices().size(),
@@ -675,7 +682,7 @@ public class BspServiceWorker<I extends 
       partitionStatsList.add(partitionStats);
     }
     workerGraphPartitioner.finalizePartitionStats(
-        partitionStatsList, workerPartitionMap);
+        partitionStatsList, getPartitionStore());
 
     finishSuperstep(partitionStatsList);
   }
@@ -985,7 +992,7 @@ public class BspServiceWorker<I extends 
 
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
-          ", mesages = " + workerSentMessages + " " +
+          ", messages = " + workerSentMessages + " " +
           MemoryUtils.getRuntimeMemoryStats());
     }
 
@@ -993,7 +1000,7 @@ public class BspServiceWorker<I extends 
         marshalAggregatorValues(getSuperstep());
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
-            partitionStatsList, workerPartitionMap);
+            partitionStatsList, getPartitionStore());
     List<PartitionStats> finalizedPartitionStatsList =
         new ArrayList<PartitionStats>(finalizedPartitionStats);
     byte [] partitionStatsBytes =
@@ -1086,7 +1093,8 @@ public class BspServiceWorker<I extends 
     VertexWriter<I, V, E> vertexWriter =
         vertexOutputFormat.createVertexWriter(getContext());
     vertexWriter.initialize(getContext());
-    for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
       for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
         vertexWriter.writeVertex(vertex);
       }
@@ -1185,7 +1193,8 @@ public class BspServiceWorker<I extends 
         getFs().create(verticesFilePath);
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
-    for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
       // write messages
@@ -1212,7 +1221,7 @@ public class BspServiceWorker<I extends 
     // needs to know how many partitions this worker owns
     FSDataOutputStream metadataOutputStream =
         getFs().create(metadataFilePath);
-    metadataOutputStream.writeInt(workerPartitionMap.size());
+    metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
     metadataOutputStream.write(metadataByteStream.toByteArray());
     metadataOutputStream.close();
     verticesOutputStream.close();
@@ -1313,11 +1322,12 @@ public class BspServiceWorker<I extends 
             LOG.info("loadCheckpoint: Loaded partition " +
                 partition);
           }
-          if (getPartitionMap().put(partitionId, partition) != null) {
+          if (getPartitionStore().hasPartition(partitionId)) {
             throw new IllegalStateException(
                 "loadCheckpoint: Already has partition owner " +
                     partitionOwner);
           }
+          getPartitionStore().addPartition(partition);
           ++loadedPartitions;
         } catch (IOException e) {
           throw new RuntimeException(
@@ -1370,7 +1380,7 @@ public class BspServiceWorker<I extends 
       randomEntryList) {
       for (Integer partitionId : workerPartitionList.getValue()) {
         Partition<I, V, E, M> partition =
-            getPartitionMap().get(partitionId);
+            getPartitionStore().removePartition(partitionId);
         if (partition == null) {
           throw new IllegalStateException(
               "sendWorkerPartitions: Couldn't find partition " +
@@ -1385,7 +1395,6 @@ public class BspServiceWorker<I extends 
         getGraphMapper().getGraphState().getWorkerCommunications().
             sendPartitionRequest(workerPartitionList.getKey(),
                 partition);
-        getPartitionMap().remove(partitionId);
       }
     }
 
@@ -1428,12 +1437,12 @@ public class BspServiceWorker<I extends 
     // 5. Add the partitions to myself.
     PartitionExchange partitionExchange =
         workerGraphPartitioner.updatePartitionOwners(
-            getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
     commService.fixPartitionIdToSocketAddrMap();
 
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
         partitionExchange.getSendWorkerPartitionMap();
-    if (!workerPartitionMap.isEmpty()) {
+    if (!getPartitionStore().isEmpty()) {
       sendWorkerPartitions(sendWorkerPartitionMap);
     }
 
@@ -1446,7 +1455,7 @@ public class BspServiceWorker<I extends 
             "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
       }
     }
-    if (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
+    if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
             "exiting early");
@@ -1481,56 +1490,6 @@ public class BspServiceWorker<I extends 
     if (LOG.isInfoEnabled()) {
       LOG.info("exchangeVertexPartitions: Done with exchange.");
     }
-
-    // Add the partitions sent earlier
-    movePartitionsToWorker(commService);
-  }
-
-  /**
-   * Partitions that are exchanged need to be moved from the communication
-   * service to the worker.
-   *
-   * @param commService Communication service where the partitions are
-   *        temporarily stored.
-   */
-  private void movePartitionsToWorker(
-      WorkerServer<I, V, E, M> commService) {
-    Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
-        commService.getInPartitionVertexMap();
-    synchronized (inPartitionVertexMap) {
-      for (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
-        inPartitionVertexMap.entrySet()) {
-        if (getPartitionMap().containsKey(entry.getKey())) {
-          throw new IllegalStateException(
-              "moveVerticesToWorker: Already has partition " +
-                  getPartitionMap().get(entry.getKey()) +
-                  ", cannot receive vertex list of size " +
-                  entry.getValue().size());
-        }
-
-        Partition<I, V, E, M> tmpPartition =
-            new Partition<I, V, E, M>(getConfiguration(),
-                entry.getKey());
-        synchronized (entry.getValue()) {
-          for (Vertex<I, V, E, M> vertex : entry.getValue()) {
-            if (tmpPartition.putVertex(vertex) != null) {
-              throw new IllegalStateException(
-                  "moveVerticesToWorker: Vertex " + vertex +
-                  " already exists!");
-            }
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("movePartitionsToWorker: Adding " +
-                entry.getValue().size() +
-                " vertices for partition id " + entry.getKey());
-          }
-          getPartitionMap().put(tmpPartition.getId(),
-              tmpPartition);
-          entry.getValue().clear();
-        }
-      }
-      inPartitionVertexMap.clear();
-    }
   }
 
   /**
@@ -1586,8 +1545,12 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
-  public Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
-    return workerPartitionMap;
+  public PartitionStore<I, V, E, M> getPartitionStore() {
+    if (workerPartitionStore != null) {
+      return workerPartitionStore;
+    } else {
+      return getServerData().getPartitionStore();
+    }
   }
 
   @Override
@@ -1600,22 +1563,27 @@ public class BspServiceWorker<I extends 
     return workerGraphPartitioner.getPartitionOwner(vertexId);
   }
 
-  /**
-   * Get the partition for a vertex index.
-   *
-   * @param vertexId Vertex index to search for the partition.
-   * @return Partition that owns this vertex.
-   */
+  @Override
   public Partition<I, V, E, M> getPartition(I vertexId) {
+    return getPartitionStore().getPartition(getPartitionId(vertexId));
+  }
+
+  @Override
+  public Integer getPartitionId(I vertexId) {
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
-    return workerPartitionMap.get(partitionOwner.getPartitionId());
+    return partitionOwner.getPartitionId();
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return getPartitionStore().hasPartition(partitionId);
   }
 
   @Override
   public Vertex<I, V, E, M> getVertex(I vertexId) {
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
-    if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
-      return workerPartitionMap.get(
+    if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
+      return getPartitionStore().getPartition(
           partitionOwner.getPartitionId()).getVertex(vertexId);
     } else {
       return null;

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Aug 25 01:52:52 2012
@@ -436,6 +436,24 @@ public class GiraphJob {
   /** Default size of buffer when reading and writing messages out-of-core. */
   public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
 
+  /** Directory in the local filesystem for out-of-core partitions. */
+  public static final String PARTITIONS_DIRECTORY =
+      "giraph.partitionsDirectory";
+  /** Default directory for out-of-core partitions. */
+  public static final String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+
+  /** Enable out-of-core graph. */
+  public static final String USE_OUT_OF_CORE_GRAPH =
+      "giraph.useOutOfCoreGraph";
+  /** Default is not to use out-of-core graph. */
+  public static final boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+
+  /** Maximum number of partitions to hold in memory for each worker. */
+  public static final String MAX_PARTITIONS_IN_MEMORY =
+      "giraph.maxPartitionsInMemory";
+  /** Default maximum number of in-memory partitions. */
+  public static final int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+
   /** Keep the zookeeper output for debugging? Default is to remove it. */
   public static final String KEEP_ZOOKEEPER_DATA =
       "giraph.keepZooKeeperData";

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sat Aug 25 01:52:52 2012
@@ -477,11 +477,10 @@ public class GraphMapper<I extends Writa
       TimedLogger partitionLogger = new TimedLogger(15000, LOG);
       int completedPartitions = 0;
       for (Partition<I, V, E, M> partition :
-        serviceWorker.getPartitionMap().values()) {
+        serviceWorker.getPartitionStore().getPartitions()) {
         PartitionStats partitionStats =
             new PartitionStats(partition.getId(), 0, 0, 0);
-        for (Vertex<I, V, E, M> vertex :
-          partition.getVertices()) {
+        for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
           // Make sure every vertex has the current
           // graphState before computing
           vertex.setGraphState(graphState);
@@ -522,8 +521,8 @@ public class GraphMapper<I extends Writa
         partitionStatsList.add(partitionStats);
         ++completedPartitions;
         partitionLogger.info("map: Completed " + completedPartitions + " of " +
-            serviceWorker.getPartitionMap().size() + " partitions " +
-            MemoryUtils.getRuntimeMemoryStats());
+            serviceWorker.getPartitionStore().getNumPartitions() +
+            " partitions " + MemoryUtils.getRuntimeMemoryStats());
       }
     } while (!serviceWorker.finishSuperstep(partitionStatsList));
     if (LOG.isInfoEnabled()) {

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A partition store that can possibly spill to disk.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedPartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(DiskBackedPartitionStore.class);
+  /** Map of partitions kept in memory. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>>
+  inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
+  /** Maximum number of partitions to keep in memory. */
+  private int maxInMemoryPartitions;
+  /** Map of partitions kept out-of-core. The values are partition sizes. */
+  private final ConcurrentMap<Integer, Integer> onDiskPartitions =
+      Maps.newConcurrentMap();
+  /** Directory on the local file system for storing out-of-core partitions. */
+  private final String basePath;
+  /** Configuration. */
+  private final Configuration conf;
+  /** Slot for loading out-of-core partitions. */
+  private Partition<I, V, E, M> loadedPartition;
+  /** Locks for accessing and modifying partitions. */
+  private final ConcurrentMap<Integer, Lock> partitionLocks =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   */
+  public DiskBackedPartitionStore(Configuration conf) {
+    this.conf = conf;
+    // We must be able to hold at least one partition in memory
+    maxInMemoryPartitions = Math.max(1,
+        conf.getInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY,
+            GiraphJob.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+    basePath = conf.get("mapred.job.id", "Unknown Job") +
+        conf.get(GiraphJob.PARTITIONS_DIRECTORY,
+            GiraphJob.PARTITIONS_DIRECTORY_DEFAULT);
+  }
+
+  /**
+   * Get the path to the file where a partition is stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the given partition
+   */
+  private String getPartitionPath(Integer partitionId) {
+    return basePath + "/partition-" + partitionId;
+  }
+
+  /**
+   * Create a new lock for a partition, lock it, and return it. If already
+   * existing, return null.
+   *
+   * @param partitionId Partition id
+   * @return A newly created lock, or null if already present
+   */
+  private Lock createLock(Integer partitionId) {
+    Lock lock = new ReentrantLock(true);
+    lock.lock();
+    if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
+      return null;
+    }
+    return lock;
+  }
+
+  /**
+   * Get the lock for a partition id.
+   *
+   * @param partitionId Partition id
+   * @return The lock
+   */
+  private Lock getLock(Integer partitionId) {
+    return partitionLocks.get(partitionId);
+  }
+
+  /**
+   * Write a partition to disk.
+   *
+   * @param partition The partition object to write
+   * @throws java.io.IOException
+   */
+  private void writePartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    File file = new File(getPartitionPath(partition.getId()));
+    file.getParentFile().mkdirs();
+    file.createNewFile();
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file)));
+    for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Read a partition from disk.
+   *
+   * @param partitionId Id of the partition to read
+   * @return The partition object
+   * @throws IOException
+   */
+  private Partition<I, V, E, M> readPartition(Integer partitionId)
+    throws IOException {
+    Partition<I, V, E, M> partition = new Partition<I, V, E, M>(conf,
+        partitionId);
+    File file = new File(getPartitionPath(partitionId));
+    DataInputStream inputStream = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file)));
+    int numVertices = onDiskPartitions.get(partitionId);
+    for (int i = 0; i < numVertices; ++i) {
+      Vertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(conf);
+      vertex.readFields(inputStream);
+      partition.putVertex(vertex);
+    }
+    inputStream.close();
+    file.delete();
+    return partition;
+  }
+
+  /**
+   * Append some vertices to an out-of-core partition.
+   *
+   * @param partitionId Id of the destination partition
+   * @param vertices Vertices to be added
+   * @throws IOException
+   */
+  private void appendVertices(Integer partitionId,
+                              Collection<Vertex<I, V, E, M>> vertices)
+    throws IOException {
+    File file = new File(getPartitionPath(partitionId));
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file, true)));
+    for (Vertex<I, V, E, M> vertex : vertices) {
+      vertex.write(outputStream);
+    }
+    outputStream.close();
+  }
+
+  /**
+   * Load an out-of-core partition in memory.
+   *
+   * @param partitionId Partition id
+   */
+  private void loadPartition(Integer partitionId) {
+    if (loadedPartition != null) {
+      if (loadedPartition.getId() == partitionId) {
+        return;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
+            " out of core");
+      }
+      try {
+        writePartition(loadedPartition);
+        onDiskPartitions.put(loadedPartition.getId(),
+            loadedPartition.getVertices().size());
+        loadedPartition = null;
+      } catch (IOException e) {
+        throw new IllegalStateException("loadPartition: failed writing " +
+            "partition " + loadedPartition.getId() + " to disk", e);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadPartition: loading partition " + partitionId +
+          " in memory");
+    }
+    try {
+      loadedPartition = readPartition(partitionId);
+    } catch (IOException e) {
+      throw new IllegalStateException("loadPartition: failed reading " +
+          "partition " + partitionId + " from disk");
+    }
+  }
+
+  /**
+   * Add a new partition without requiring a lock.
+   *
+   * @param partition Partition to be added
+   */
+  private void addPartitionNoLock(Partition<I, V, E, M> partition) {
+    synchronized (inMemoryPartitions) {
+      if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
+        inMemoryPartitions.put(partition.getId(), partition);
+
+        return;
+      }
+    }
+    try {
+      writePartition(partition);
+      onDiskPartitions.put(partition.getId(), partition.getVertices().size());
+    } catch (IOException e) {
+      throw new IllegalStateException("addPartition: failed writing " +
+          "partition " + partition.getId() + "to disk");
+    }
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    Lock lock = createLock(partition.getId());
+    if (lock == null) {
+      throw new IllegalStateException("addPartition: partition " +
+          partition.getId() + " already exists");
+    }
+    addPartitionNoLock(partition);
+    lock.unlock();
+  }
+
+  @Override
+  public void addPartitionVertices(Integer partitionId,
+                                   Collection<Vertex<I, V, E, M>> vertices) {
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      Partition<I, V, E, M> partition = inMemoryPartitions.get(partitionId);
+      partition.putVertices(vertices);
+    } else if (onDiskPartitions.containsKey(partitionId)) {
+      Lock lock = getLock(partitionId);
+      lock.lock();
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        loadedPartition.putVertices(vertices);
+      } else {
+        try {
+          appendVertices(partitionId, vertices);
+          onDiskPartitions.put(partitionId,
+              onDiskPartitions.get(partitionId) + vertices.size());
+        } catch (IOException e) {
+          throw new IllegalStateException("addPartitionVertices: failed " +
+              "writing vertices to partition " + partitionId + " on disk", e);
+        }
+      }
+      lock.unlock();
+    } else {
+      Lock lock = createLock(partitionId);
+      if (lock != null) {
+        addPartitionNoLock(new Partition<I, V, E, M>(conf, partitionId));
+        lock.unlock();
+      } else {
+        // Another thread is already creating the partition,
+        // so we make sure it's done before repeating the call.
+        lock = getLock(partitionId);
+        lock.lock();
+        lock.unlock();
+      }
+      addPartitionVertices(partitionId, vertices);
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      return inMemoryPartitions.get(partitionId);
+    } else if (onDiskPartitions.containsKey(partitionId)) {
+      loadPartition(partitionId);
+      return loadedPartition;
+    } else {
+      throw new IllegalStateException("getPartition: partition " +
+          partitionId + " does not exist");
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (onDiskPartitions.containsKey(partitionId)) {
+      Partition<I, V, E, M> partition;
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        partition = loadedPartition;
+        loadedPartition = null;
+      } else {
+        try {
+          partition = readPartition(partitionId);
+        } catch (IOException e) {
+          throw new IllegalStateException("removePartition: failed reading " +
+              "partition " + partitionId + " from disk", e);
+        }
+      }
+      onDiskPartitions.remove(partitionId);
+      return partition;
+    } else {
+      return inMemoryPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitionLocks.remove(partitionId);
+    if (inMemoryPartitions.containsKey(partitionId)) {
+      inMemoryPartitions.remove(partitionId);
+    } else {
+      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+        loadedPartition = null;
+      } else {
+        File file = new File(getPartitionPath(partitionId));
+        file.delete();
+      }
+      onDiskPartitions.remove(partitionId);
+    }
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitionLocks.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return Iterables.concat(inMemoryPartitions.keySet(),
+        onDiskPartitions.keySet());
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitionLocks.size();
+  }
+
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Sat Aug 25 01:52:52 2012
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.graph.partition;
 
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /**
  * Implements hash-based partitioning from the id hash code.
  *
@@ -62,7 +62,7 @@ public class HashWorkerPartitioner<I ext
   @Override
   public Collection<PartitionStats> finalizePartitionStats(
       Collection<PartitionStats> workerPartitionStats,
-      Map<Integer, Partition<I, V, E, M>> partitionMap) {
+      PartitionStore<I, V, E, M> partitionStore) {
     // No modification necessary
     return workerPartitionStats;
   }
@@ -71,7 +71,7 @@ public class HashWorkerPartitioner<I ext
   public PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      Map<Integer, Partition<I, V, E, M>> partitionMap) {
+      PartitionStore<I, V, E, M> partitionStore) {
     synchronized (partitionOwnerList) {
       partitionOwnerList.clear();
       partitionOwnerList.addAll(masterSetPartitionOwners);

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Sat Aug 25 01:52:52 2012
@@ -31,7 +31,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 /**
  * A generic container that stores vertices.  Vertex ids will map to exactly
@@ -51,7 +52,7 @@ public class Partition<I extends Writabl
   /** Partition id */
   private final int id;
   /** Vertex map for this range (keyed by index) */
-  private final Map<I, Vertex<I, V, E, M>> vertexMap;
+  private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
 
   /**
    * Constructor.
@@ -64,9 +65,9 @@ public class Partition<I extends Writabl
     this.id = id;
     if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
         GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
-      vertexMap = Maps.newTreeMap();
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
     } else {
-      vertexMap = Maps.newHashMap();
+      vertexMap = Maps.newConcurrentMap();
     }
   }
 
@@ -110,6 +111,17 @@ public class Partition<I extends Writabl
   }
 
   /**
+   * Put several vertices in the partition.
+   *
+   * @param vertices Vertices to add
+   */
+  public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
+    for (Vertex<I, V, E , M> vertex : vertices) {
+      vertexMap.put(vertex.getId(), vertex);
+    }
+  }
+
+  /**
    * Get the number of edges in this partition.  Computed on the fly.
    *
    * @return Number of edges.

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+
+/**
+ * Structure that stores partitions for a worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class PartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+
+  /**
+   * Add a new partition to the store.
+   *
+   * @param partition Partition
+   */
+  public abstract void addPartition(Partition<I, V, E, M> partition);
+
+  /**
+   * Add some vertices to a (possibly existing) partition.
+   *
+   * @param partitionId Id of the destination partition
+   * @param vertices Vertices
+   */
+  public abstract void addPartitionVertices(
+      Integer partitionId, Collection<Vertex<I, V, E, M>> vertices);
+
+  /**
+   * Get a partition.
+   *
+   * @param partitionId Partition id
+   * @return The requested partition
+   */
+  public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
+
+  /**
+   * Remove a partition and return it.
+   *
+   * @param partitionId Partition id
+   * @return The removed partition
+   */
+  public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
+
+  /**
+   * Just delete a partition
+   * (more efficient than {@link #removePartition(Integer partitionID)} if the
+   * partition is out of core).
+   *
+   * @param partitionId Partition id
+   */
+  public abstract void deletePartition(Integer partitionId);
+
+  /**
+   * Whether a specific partition is present in the store.
+   *
+   * @param partitionId Partition id
+   * @return True iff the partition is present
+   */
+  public abstract boolean hasPartition(Integer partitionId);
+
+  /**
+   * Return the ids of all the stored partitions as an Iterable.
+   *
+   * @return The partition ids
+   */
+  public abstract Iterable<Integer> getPartitionIds();
+
+  /**
+   * Return the number of stored partitions.
+   *
+   * @return The number of partitions
+   */
+  public abstract int getNumPartitions();
+
+  /**
+   * Whether the partition store is empty.
+   *
+   * @return True iff there are no partitions in the store
+   */
+  public boolean isEmpty() {
+    return getNumPartitions() == 0;
+  }
+
+  /**
+   * Return all the stored partitions as an Iterable.
+   *
+   * @return The partition objects
+   */
+  public Iterable<Partition<I, V, E, M>> getPartitions() {
+    return Iterables.transform(getPartitionIds(),
+        new Function<Integer, Partition<I, V, E, M>>() {
+          @Override
+          public Partition<I, V, E, M> apply(Integer partitionId) {
+            return getPartition(partitionId);
+          }
+        });
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple in-memory partition store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SimplePartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Map of stored partitions. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
+      Maps.newConcurrentMap();
+  /** Configuration. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   */
+  public SimplePartitionStore(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    if (partitions.putIfAbsent(partition.getId(), partition) != null) {
+      throw new IllegalStateException("addPartition: partition " +
+          partition.getId() + " already exists");
+    }
+  }
+
+  @Override
+  public void addPartitionVertices(Integer partitionId,
+                                   Collection<Vertex<I, V, E, M>> vertices) {
+    Partition<I, V, E, M> partition = partitions.get(partitionId);
+    if (partition == null) {
+      Partition<I, V, E, M> newPartition = new Partition<I, V, E, M>(conf,
+          partitionId);
+      partition = partitions.putIfAbsent(partitionId, newPartition);
+      if (partition == null) {
+        partition = newPartition;
+      }
+    }
+    partition.putVertices(vertices);
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    return partitions.get(partitionId);
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    return partitions.remove(partitionId);
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitions.remove(partitionId);
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitions.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return partitions.keySet();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitions.size();
+  }
+
+
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,12 @@
 
 package org.apache.giraph.graph.partition;
 
-import java.util.Collection;
-import java.util.Map;
-
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.util.Collection;
+
 /**
  * Stores the {@link PartitionOwner} objects from the master and provides the
  * mapping of vertex to {@link PartitionOwner}. Also generates the partition
@@ -62,13 +61,13 @@ public interface WorkerGraphPartitioner<
    *
    * @param workerPartitionStats Stats generated by the infrastructure during
    *        the superstep
-   * @param partitionMap Map of all the partitions owned by this worker
+   * @param partitionStore Partition store for this worker
    *        (could be used to provide more useful stat information)
    * @return Final partition stats
    */
   Collection<PartitionStats> finalizePartitionStats(
       Collection<PartitionStats> workerPartitionStats,
-      Map<Integer, Partition<I, V, E, M>> partitionMap);
+      PartitionStore<I, V, E, M> partitionStore);
 
   /**
    * Get the partitions owners and update locally.  Returns the partitions
@@ -77,14 +76,14 @@ public interface WorkerGraphPartitioner<
    * @param myWorkerInfo Worker info.
    * @param masterSetPartitionOwners Master set partition owners, received
    *        prior to beginning the superstep
-   * @param partitionMap Map of all the partitions owned by this worker
+   * @param partitionStore Partition store for this worker
    *        (can be used to fill the return map of partitions to send)
    * @return Information for the partition exchange.
    */
   PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      Map<Integer, Partition<I, V, E, M>> partitionMap);
+      PartitionStore<I, V, E, M> partitionStore);
 
   /**
    * Get a collection of the {@link PartitionOwner} objects.

Copied: giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java (from r1375824, giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java?p2=giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java&p1=giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java&r1=1375824&r2=1377179&rev=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java Sat Aug 25 01:52:52 2012
@@ -18,24 +18,23 @@
 
 package org.apache.giraph;
 
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
 import org.apache.giraph.examples.SimpleMutateGraphVertex;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
 /**
  * Unit test for graph mutation
  */
-public class TestMutateGraphVertex extends BspCase {
+public class TestMutateGraph extends BspCase {
 
-    public TestMutateGraphVertex() {
-        super(TestMutateGraphVertex.class.getName());
+    public TestMutateGraph() {
+        super(TestMutateGraph.class.getName());
     }
 
     /**

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Sat Aug 25 01:52:52 2012
@@ -18,26 +18,22 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Sets;
-import java.util.Set;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
+import java.util.Set;
 
 /**
  * Test the netty connections
@@ -57,7 +53,7 @@ public class ConnectionTest {
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -87,7 +83,7 @@ public class ConnectionTest {
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
 
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
@@ -132,7 +128,7 @@ public class ConnectionTest {
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
 import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
@@ -42,6 +35,14 @@ import static org.junit.Assert.assertEqu
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
 /**
  * Test all the netty failure scenarios
  */
@@ -136,7 +137,7 @@ public class RequestFailureTest {
     // Start the service
     serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -176,7 +177,7 @@ public class RequestFailureTest {
     // Start the service
     serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -216,7 +217,7 @@ public class RequestFailureTest {
     // Start the service
     serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Sat Aug 25 01:52:52 2012
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.EdgeListV
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -96,7 +97,7 @@ public class RequestTest {
     // Start the service
     serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (SimpleMessageStore.newFactory(
+            (conf, SimpleMessageStore.newFactory(
                 MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -135,19 +136,17 @@ public class RequestTest {
     server.stop();
 
     // Check the output
-    Map<Integer, Collection<Vertex<IntWritable, IntWritable,
-    IntWritable, IntWritable>>> partitionVertexMap =
-        serverData.getPartitionVertexMap();
-    synchronized (partitionVertexMap) {
-      assertTrue(partitionVertexMap.containsKey(partitionId));
-      int total = 0;
-      for (Vertex<IntWritable, IntWritable,
-          IntWritable, IntWritable> vertex :
-            (partitionVertexMap.get(partitionId))) {
-        total += vertex.getId().get();
-      }
-      assertEquals(total, 45);
+    PartitionStore<IntWritable, IntWritable,
+        IntWritable, IntWritable> partitionStore =
+        serverData.getPartitionStore();
+    assertTrue(partitionStore.hasPartition(partitionId));
+    int total = 0;
+    for (Vertex<IntWritable, IntWritable,
+        IntWritable, IntWritable> vertex :
+        partitionStore.getPartition(partitionId).getVertices()) {
+      total += vertex.getId().get();
     }
+    assertEquals(total, 45);
   }
 
   @Test

Added: giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.IntIntNullIntVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+
+/**
+ * Test case for partition stores.
+ */
+public class TestPartitionStores {
+  private static class MyVertex extends IntIntNullIntVertex {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {}
+  }
+
+  private Partition<IntWritable, IntWritable, NullWritable,
+      IntWritable> createPartition(Configuration conf, Integer id,
+                                   Vertex<IntWritable, IntWritable,
+                                       NullWritable, IntWritable>... vertices) {
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
+        new Partition<IntWritable, IntWritable, NullWritable,
+            IntWritable>(conf, id);
+    for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
+        vertices) {
+      partition.putVertex(v);
+    }
+    return partition;
+  }
+
+  @Test
+  public void testSimplePartitionStore() {
+    Configuration conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_CLASS, MyVertex.class,
+        Vertex.class);
+    conf.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class,
+        WritableComparable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, IntWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.EDGE_VALUE_CLASS, NullWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class,
+        Writable.class);
+
+    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+        partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
+        NullWritable, IntWritable>(conf);
+    testReadWrite(partitionStore, conf);
+  }
+
+  @Test
+  public void testDiskBackedPartitionStore() {
+    Configuration conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_CLASS, MyVertex.class,
+        Vertex.class);
+    conf.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class,
+        WritableComparable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, IntWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.EDGE_VALUE_CLASS, NullWritable.class,
+        Writable.class);
+    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class,
+        Writable.class);
+
+    conf.setBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH, true);
+    conf.setInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY, 1);
+
+    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+        partitionStore = new DiskBackedPartitionStore<IntWritable,
+                IntWritable, NullWritable, IntWritable>(conf);
+    testReadWrite(partitionStore, conf);
+
+    conf.setInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY, 2);
+    partitionStore = new DiskBackedPartitionStore<IntWritable,
+            IntWritable, NullWritable, IntWritable>(conf);
+    testReadWrite(partitionStore, conf);
+  }
+
+  public void testReadWrite(PartitionStore<IntWritable, IntWritable,
+      NullWritable, IntWritable> partitionStore, Configuration conf) {
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+        new MyVertex();
+    v1.initialize(new IntWritable(1), new IntWritable(1), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
+        new MyVertex();
+    v2.initialize(new IntWritable(2), new IntWritable(2), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
+        new MyVertex();
+    v3.initialize(new IntWritable(3), new IntWritable(3), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
+        new MyVertex();
+    v4.initialize(new IntWritable(4), new IntWritable(4), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
+        new MyVertex();
+    v5.initialize(new IntWritable(5), new IntWritable(5), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
+        new MyVertex();
+    v6.initialize(new IntWritable(7), new IntWritable(7), null, null);
+    Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
+        new MyVertex();
+    v7.initialize(new IntWritable(7), new IntWritable(7), null, null);
+
+    partitionStore.addPartition(createPartition(conf, 1, v1, v2));
+    partitionStore.addPartition(createPartition(conf, 2, v3));
+    partitionStore.addPartitionVertices(2, Lists.newArrayList(v4));
+    partitionStore.addPartition(createPartition(conf, 3, v5));
+    partitionStore.addPartitionVertices(1, Lists.newArrayList(v6));
+    partitionStore.addPartitionVertices(4, Lists.newArrayList(v7));
+
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
+        partitionStore.getPartition(1);
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
+        partitionStore.getPartition(2);
+    Partition<IntWritable, IntWritable, NullWritable,
+        IntWritable> partition3 = partitionStore.removePartition(3);
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
+        partitionStore.getPartition(4);
+
+    assertEquals(3, partitionStore.getNumPartitions());
+    assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
+    assertEquals(3, Iterables.size(partitionStore.getPartitions()));
+    assertTrue(partitionStore.hasPartition(1));
+    assertTrue(partitionStore.hasPartition(2));
+    assertFalse(partitionStore.hasPartition(3));
+    assertTrue(partitionStore.hasPartition(4));
+    assertEquals(3, partition1.getVertices().size());
+    assertEquals(2, partition2.getVertices().size());
+    assertEquals(1, partition3.getVertices().size());
+    assertEquals(1, partition4.getVertices().size());
+
+    partitionStore.deletePartition(2);
+
+    assertEquals(2, partitionStore.getNumPartitions());
+  }
+}