You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/05 13:21:11 UTC

git commit: GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache

Updated Branches:
  refs/heads/trunk af1c39b43 -> 62c12fa0b


GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/62c12fa0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/62c12fa0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/62c12fa0

Branch: refs/heads/trunk
Commit: 62c12fa0b899ee76962d6e3fa05298e2a23d5e68
Parents: af1c39b
Author: Claudio Martella <cl...@apache.org>
Authored: Tue Feb 5 13:02:27 2013 +0100
Committer: Claudio Martella <cl...@apache.org>
Committed: Tue Feb 5 13:02:27 2013 +0100

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/bsp/CentralizedServiceWorker.java       |   19 -
 .../giraph/comm/netty/NettyWorkerServer.java       |    2 +
 .../org/apache/giraph/graph/ComputeCallable.java   |    2 +
 .../giraph/partition/DiskBackedPartitionStore.java |  872 +++++++++++----
 .../apache/giraph/partition/PartitionStore.java    |   28 +-
 .../giraph/partition/SimplePartitionStore.java     |    3 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |   35 +-
 .../java/org/apache/giraph/comm/RequestTest.java   |    7 +-
 .../giraph/partition/TestPartitionStores.java      |   17 +-
 10 files changed, 708 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index aec8fdf..2d25746 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio)
+
   GIRAPH-494: Make Edge an interface (nitay)
 
   GIRAPH-492: Saving vertices has no status report, making it hard to 

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 30d4462..71f8f72 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -34,8 +34,6 @@ import org.apache.giraph.partition.PartitionStore;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.worker.WorkerInfo;
@@ -135,15 +133,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
       List<PartitionStats> partitionStatsList);
 
   /**
-   * Get the partition that a vertex id would belong to.
-   *
-   * @param vertexId Id of the vertex that is used to find the correct
-   *        partition.
-   * @return Correct partition if exists on this worker, null otherwise.
-   */
-  Partition<I, V, E, M> getPartition(I vertexId);
-
-  /**
    * Get the partition id that a vertex id would belong to.
    *
    * @param vertexId Vertex id
@@ -176,14 +165,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
   Iterable<? extends PartitionOwner> getPartitionOwners();
 
   /**
-   * Look up a vertex on a worker given its vertex index.
-   *
-   * @param vertexId Vertex index to look for
-   * @return Vertex if it exists on this worker.
-   */
-  Vertex<I, V, E, M> getVertex(I vertexId);
-
-  /**
    * If desired by the user, vertex partitions are redistributed among
    * workers according to the chosen WorkerGraphPartitioner.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index e2866fd..1b7cc54 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -184,6 +184,7 @@ public class NettyWorkerServer<I extends WritableComparable,
             }
           }
         }
+        service.getPartitionStore().putPartition(partition);
       }
     }
     // Resolve all graph mutations
@@ -226,6 +227,7 @@ public class NettyWorkerServer<I extends WritableComparable,
           partition.removeVertex(originalVertex.getId());
         }
       }
+      service.getPartitionStore().putPartition(partition);
     }
     if (!serverData.getVertexMutations().isEmpty()) {
       throw new IllegalStateException("resolveMutations: Illegally " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 042fd47..a87561d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -157,6 +157,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       } catch (IOException e) {
         throw new IllegalStateException("call: Caught unexpected IOException," +
             " failing.", e);
+      } finally {
+        serviceWorker.getPartitionStore().putPartition(partition);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 09e5d75..844a229 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,69 +18,106 @@
 
 package org.apache.giraph.partition;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.collect.Sets;
 
 /**
- * A partition store that can possibly spill to disk.
+ * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
+ * Thread-safe, but expects the caller to synchronized between deletes, adds,
+ * puts and gets.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
  * @param <M> Message data
  */
+@SuppressWarnings("rawtypes")
 public class DiskBackedPartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends PartitionStore<I, V, E, M> {
   /** Class logger. */
   private static final Logger LOG =
       Logger.getLogger(DiskBackedPartitionStore.class);
-  /** Map of partitions kept in memory. */
-  private final ConcurrentMap<Integer, Partition<I, V, E, M>>
-  inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
-  /** Maximum number of partitions to keep in memory. */
-  private int maxInMemoryPartitions;
-  /** Map of partitions kept out-of-core. The values are partition sizes. */
-  private final ConcurrentMap<Integer, Integer> onDiskPartitions =
-      Maps.newConcurrentMap();
-  /** Directory on the local file system for storing out-of-core partitions. */
-  private final String basePath;
-  /** Configuration. */
+  /** States the partition can be found in */
+  private enum State { ACTIVE, INACTIVE, LOADING, OFFLOADING, ONDISK };
+  /** Global lock to the whole partition */
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  /**
+   * Global write lock. Must be hold to modify class state for read and write.
+   * Conditions are bond to this lock.
+   */
+  private final Lock wLock = lock.writeLock();
+  /** The ids of the partitions contained in the store */
+  private final Set<Integer> partitionIds = Sets.newHashSet();
+  /** Partitions' states store */
+  private final Map<Integer, State> states = Maps.newHashMap();
+  /** Current active partitions, which have not been put back yet */
+  private final Map<Integer, Partition<I, V, E, M>> active = Maps.newHashMap();
+  /** Inactive partitions to re-activate or spill to disk to make space */
+  private final Map<Integer, Partition<I, V, E, M>> inactive =
+      Maps.newLinkedHashMap();
+  /** Ids of partitions stored on disk and number of vertices contained */
+  private final Map<Integer, Integer> onDisk = Maps.newHashMap();
+  /** Per-partition users counters (clearly only for active partitions) */
+  private final Map<Integer, Integer> counters = Maps.newHashMap();
+  /** These Conditions are used to partitions' change of state */
+  private final Map<Integer, Condition> pending = Maps.newHashMap();
+  /**
+   * Used to signal threads waiting to load partitions. Can be used when new
+   * inactive partitions are avaiable, or when free slots are available.
+   */
+  private final Condition notEmpty = wLock.newCondition();
+  /** Executors for users requests. Uses caller threads */
+  private final ExecutorService pool = new DirectExecutorService();
+  /** Giraph configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Slot for loading out-of-core partitions. */
-  private Partition<I, V, E, M> loadedPartition;
-  /** Locks for accessing and modifying partitions. */
-  private final ConcurrentMap<Integer, Lock> partitionLocks =
-      Maps.newConcurrentMap();
-  /** Context used to report progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Mapper context */
+  private final Context context;
+  /** Base path where the partition files are written to */
+  private final String basePath;
+  /** Maximum number of slots */
+  private final int maxInMemoryPartitions;
+  /** Number of slots used */
+  private int inMemoryPartitions;
 
   /**
-   * Constructor.
+   * Constructor
    *
    * @param conf Configuration
-   * @param context Mapper context
+   * @param context Context
    */
   public DiskBackedPartitionStore(
       ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
@@ -96,76 +133,233 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
             GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
   }
 
-  /**
-   * Get the path to the file where a partition is stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the given partition
-   */
-  private String getPartitionPath(Integer partitionId) {
-    return basePath + "/partition-" + partitionId;
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    try {
+      return pool.submit(new Callable<Iterable<Integer>>() {
+
+        @Override
+        public Iterable<Integer> call() throws Exception {
+          wLock.lock();
+          try {
+            return Iterables.unmodifiableIterable(partitionIds);
+          } finally {
+            wLock.unlock();
+          }
+        }
+      }).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getPartitionIds: cannot retrieve partition ids", e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "getPartitionIds: cannot retrieve partition ids", e);
+    }
   }
 
-  /**
-   * Create a new lock for a partition, lock it, and return it. If already
-   * existing, return null.
-   *
-   * @param partitionId Partition id
-   * @return A newly created lock, or null if already present
-   */
-  private Lock createLock(Integer partitionId) {
-    Lock lock = new ReentrantLock(true);
-    lock.lock();
-    if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
-      return null;
+  @Override
+  public boolean hasPartition(final Integer id) {
+    try {
+      return pool.submit(new Callable<Boolean>() {
+
+        @Override
+        public Boolean call() throws Exception {
+          wLock.lock();
+          try {
+            return partitionIds.contains(id);
+          } finally {
+            wLock.unlock();
+          }
+        }
+      }).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "hasPartition: cannot check partition", e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "hasPartition: cannot check partition", e);
+    }
+  }
+
+  @Override
+  public int getNumPartitions() {
+    try {
+      return pool.submit(new Callable<Integer>() {
+
+        @Override
+        public Integer call() throws Exception {
+          wLock.lock();
+          try {
+            return partitionIds.size();
+          } finally {
+            wLock.unlock();
+          }
+        }
+      }).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getNumPartitions: cannot retrieve partition ids", e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "getNumPartitions: cannot retrieve partition ids", e);
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer id) {
+    try {
+      return pool.submit(new GetPartition(id)).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getPartition: cannot retrieve partition " + id, e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "getPartition: cannot retrieve partition " + id, e);
+    }
+  }
+
+  @Override
+  public void putPartition(Partition<I, V, E, M> partition) {
+    Integer id = partition.getId();
+    try {
+      pool.submit(new PutPartition(id, partition)).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "putPartition: cannot put back partition " + id, e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "putPartition: cannot put back partition " + id, e);
     }
-    return lock;
+  }
+
+  @Override
+  public void deletePartition(Integer id) {
+    try {
+      pool.submit(new DeletePartition(id)).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "deletePartition: cannot delete partition " + id, e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "deletePartition: cannot delete partition " + id, e);
+    }
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer id) {
+    Partition<I, V, E, M> partition = getPartition(id);
+    // we put it back, so the partition can turn INACTIVE and be deleted.
+    putPartition(partition);
+    deletePartition(id);
+    return partition;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    Integer id = partition.getId();
+    try {
+      pool.submit(new AddPartition(partition.getId(), partition)).get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "addPartition: cannot add partition " + id, e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException(
+          "addPartition: cannot add partition " + id, e);
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    try {
+      pool.shutdown();
+      try {
+        if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
+          pool.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        pool.shutdownNow();
+      }
+    } finally {
+      for (Integer id : onDisk.values()) {
+        deletePartitionFile(id);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(partitionIds.toString());
+    sb.append("\nActive\n");
+    for (Entry<Integer, Partition<I, V, E, M>> e : active.entrySet()) {
+      sb.append(e.getKey() + ":" + e.getValue() + "\n");
+    }
+    sb.append("Inactive\n");
+    for (Entry<Integer, Partition<I, V, E, M>> e : inactive.entrySet()) {
+      sb.append(e.getKey() + ":" + e.getValue() + "\n");
+    }
+    sb.append("OnDisk\n");
+    for (Entry<Integer, Integer> e : onDisk.entrySet()) {
+      sb.append(e.getKey() + ":" + e.getValue() + "\n");
+    }
+    sb.append("Counters\n");
+    for (Entry<Integer, Integer> e : counters.entrySet()) {
+      sb.append(e.getKey() + ":" + e.getValue() + "\n");
+    }
+    sb.append("Pending\n");
+    for (Entry<Integer, Condition> e : pending.entrySet()) {
+      sb.append(e.getKey() + "\n");
+    }
+    return sb.toString();
   }
 
   /**
-   * Get the lock for a partition id.
+   * Increment the number of active users for a partition. Caller should hold
+   * the global write lock.
    *
-   * @param partitionId Partition id
-   * @return The lock
+   * @param id The id of the counter to increment
+   * @return The new value
    */
-  private Lock getLock(Integer partitionId) {
-    return partitionLocks.get(partitionId);
+  private Integer incrementCounter(Integer id) {
+    Integer count = counters.get(id);
+    if (count == null) {
+      count = 0;
+    }
+    counters.put(id, ++count);
+    return count;
   }
 
   /**
-   * Write a partition to disk.
+   * Decrement the number of active users for a partition. Caller should hold
+   * the global write lock.
    *
-   * @param partition The partition object to write
-   * @throws java.io.IOException
+   * @param id The id of the counter to decrement
+   * @return The new value
    */
-  private void writePartition(Partition<I, V, E, M> partition)
-    throws IOException {
-    File file = new File(getPartitionPath(partition.getId()));
-    file.getParentFile().mkdirs();
-    file.createNewFile();
-    DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      vertex.write(outputStream);
+  private Integer decrementCounter(Integer id) {
+    Integer count = counters.get(id);
+    if (count == null) {
+      throw new IllegalStateException("no counter for partition " + id);
     }
-    outputStream.close();
+    counters.put(id, --count);
+    return count;
   }
 
   /**
-   * Read a partition from disk.
+   * Load a partition from disk. It deletes the file after the load.
    *
-   * @param partitionId Id of the partition to read
-   * @return The partition object
+   * @param id The id of the partition to load
+   * @param numVertices The number of vertices contained on disk
+   * @return The partition
    * @throws IOException
    */
-  private Partition<I, V, E, M> readPartition(Integer partitionId)
+  private Partition<I, V, E, M> loadPartition(Integer id, int numVertices)
     throws IOException {
     Partition<I, V, E, M> partition =
-        conf.createPartition(partitionId, context);
-    File file = new File(getPartitionPath(partitionId));
+        conf.createPartition(id, context);
+    File file = new File(getPartitionPath(id));
     DataInputStream inputStream = new DataInputStream(
         new BufferedInputStream(new FileInputStream(file)));
-    int numVertices = onDiskPartitions.get(partitionId);
     for (int i = 0; i < numVertices; ++i) {
       Vertex<I, V, E, M> vertex = conf.createVertex();
       vertex.readFields(inputStream);
@@ -177,16 +371,22 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Append some vertices of another partition to an out-of-core partition.
+   * Write a partition to disk.
    *
-   * @param partition Partition to add
+   * @param partition The partition to offload
    * @throws IOException
    */
-  private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
+  private void offloadPartition(Partition<I, V, E, M> partition)
     throws IOException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("offloadPartition: writing partition " + partition.getId() +
+          " to disk.");
+    }
     File file = new File(getPartitionPath(partition.getId()));
+    file.getParentFile().mkdirs();
+    file.createNewFile();
     DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file, true)));
+        new BufferedOutputStream(new FileOutputStream(file)));
     for (Vertex<I, V, E, M> vertex : partition) {
       vertex.write(outputStream);
     }
@@ -194,171 +394,407 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Load an out-of-core partition in memory.
+   * Append a partition on disk at the end of the file. Expects the caller
+   * to hold the global lock.
    *
-   * @param partitionId Partition id
+   * @param partition The partition
+   * @throws IOException
    */
-  private void loadPartition(Integer partitionId) {
-    if (loadedPartition != null) {
-      if (loadedPartition.getId() == partitionId) {
-        return;
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
-            " out of core with size " + loadedPartition.getVertexCount());
-      }
-      try {
-        writePartition(loadedPartition);
-        onDiskPartitions.put(loadedPartition.getId(),
-            (int) loadedPartition.getVertexCount());
-        loadedPartition = null;
-      } catch (IOException e) {
-        throw new IllegalStateException("loadPartition: failed writing " +
-            "partition " + loadedPartition.getId() + " to disk", e);
-      }
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadPartition: loading partition " + partitionId +
-          " in memory");
-    }
-    try {
-      loadedPartition = readPartition(partitionId);
-    } catch (IOException e) {
-      throw new IllegalStateException("loadPartition: failed reading " +
-          "partition " + partitionId + " from disk");
+  private void addToOOCPartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    Integer id = partition.getId();
+    Integer count = onDisk.get(id);
+    onDisk.put(id, count + (int) partition.getVertexCount());
+    File file = new File(getPartitionPath(id));
+    DataOutputStream outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file, true)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      vertex.write(outputStream);
     }
+    outputStream.close();
   }
 
   /**
-   * Add a new partition without requiring a lock.
+   * Delete a partition file
    *
-   * @param partition Partition to be added
+   * @param id The id of the partition owning the file.
    */
-  private void addPartitionNoLock(Partition<I, V, E, M> partition) {
-    synchronized (inMemoryPartitions) {
-      if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
-        inMemoryPartitions.put(partition.getId(), partition);
+  public void deletePartitionFile(Integer id) {
+    File file = new File(getPartitionPath(id));
+    file.delete();
+  }
 
-        return;
-      }
+  /**
+   * Get the path to the file where a partition is stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the given partition
+   */
+  private String getPartitionPath(Integer partitionId) {
+    return basePath + "/partition-" + partitionId;
+  }
+
+  /**
+   * Task that gets a partition from the store
+   */
+  private class GetPartition implements Callable<Partition<I, V, E, M>> {
+    /** Partition id */
+    private Integer id;
+
+    /**
+     * Constructor
+     *
+     * @param id Partition id
+     */
+    public GetPartition(Integer id) {
+      this.id = id;
     }
-    try {
-      writePartition(partition);
-      onDiskPartitions.put(partition.getId(),
-          (int) partition.getVertexCount());
-    } catch (IOException e) {
-      throw new IllegalStateException("addPartition: failed writing " +
-          "partition " + partition.getId() + "to disk");
+
+    /**
+     * Removes and returns the last recently used entry.
+     *
+     * @return The last recently used entry.
+     */
+    private Entry<Integer, Partition<I, V, E, M>> getLRUEntry() {
+      Iterator<Entry<Integer, Partition<I, V, E, M>>> i =
+          inactive.entrySet().iterator();
+      Entry<Integer, Partition<I, V, E, M>> lruEntry = i.next();
+      i.remove();
+      return lruEntry;
     }
-  }
 
-  @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
-    if (inMemoryPartitions.containsKey(partition.getId())) {
-      Partition<I, V, E, M> existingPartition =
-          inMemoryPartitions.get(partition.getId());
-      existingPartition.addPartition(partition);
-    } else if (onDiskPartitions.containsKey(partition.getId())) {
-      Lock lock = getLock(partition.getId());
-      lock.lock();
-      if (loadedPartition != null && loadedPartition.getId() ==
-          partition.getId()) {
-        loadedPartition.addPartition(partition);
-      } else {
+    @Override
+    public Partition<I, V, E, M> call() throws Exception {
+      Partition<I, V, E, M> partition = null;
+
+      while (partition == null) {
+        wLock.lock();
         try {
-          appendPartitionOutOfCore(partition);
-          onDiskPartitions.put(partition.getId(),
-              onDiskPartitions.get(partition.getId()) +
-                  (int) partition.getVertexCount());
-        } catch (IOException e) {
-          throw new IllegalStateException("addPartition: failed " +
-              "writing vertices to partition " + partition.getId() + " on disk",
-              e);
+          State pState = states.get(id);
+          switch (pState) {
+          case ONDISK:
+            Entry<Integer, Partition<I, V, E, M>> lru = null;
+            states.put(id, State.LOADING);
+            int numVertices = onDisk.remove(id);
+            /*
+             * Wait until we have space in memory or inactive data for a switch
+             */
+            while (inMemoryPartitions >= maxInMemoryPartitions &&
+                inactive.size() == 0) {
+              notEmpty.await();
+            }
+            /*
+             * we have to make some space first
+             */
+            if (inMemoryPartitions >= maxInMemoryPartitions) {
+              lru = getLRUEntry();
+              states.put(lru.getKey(), State.OFFLOADING);
+              pending.get(lru.getKey()).signalAll();
+            } else { // there is space, just add it to the in-memory partitions
+              inMemoryPartitions++;
+            }
+            /*
+             * do IO without contention, the threads interested to these
+             * partitions will subscribe to the relative Condition.
+             */
+            wLock.unlock();
+            if (lru != null) {
+              offloadPartition(lru.getValue());
+            }
+            partition = loadPartition(id, numVertices);
+            wLock.lock();
+            /*
+             * update state and signal the pending threads
+             */
+            if (lru != null) {
+              states.put(lru.getKey(), State.ONDISK);
+              onDisk.put(lru.getKey(), (int) lru.getValue().getVertexCount());
+              pending.get(lru.getKey()).signalAll();
+            }
+            active.put(id, partition);
+            states.put(id, State.ACTIVE);
+            pending.get(id).signalAll();
+            incrementCounter(id);
+            break;
+          case INACTIVE:
+            partition = inactive.remove(id);
+            active.put(id, partition);
+            states.put(id, State.ACTIVE);
+            incrementCounter(id);
+            break;
+          case ACTIVE:
+            partition = active.get(id);
+            incrementCounter(id);
+            break;
+          case LOADING:
+            pending.get(id).await();
+            break;
+          case OFFLOADING:
+            pending.get(id).await();
+            break;
+          default:
+            throw new IllegalStateException(
+                "illegal state " + pState + " for partition " + id);
+          }
+        } finally {
+          wLock.unlock();
         }
       }
-      lock.unlock();
-    } else {
-      Lock lock = createLock(partition.getId());
-      if (lock != null) {
-        addPartitionNoLock(partition);
-        lock.unlock();
-      } else {
-        // Another thread is already creating the partition,
-        // so we make sure it's done before repeating the call.
-        lock = getLock(partition.getId());
-        lock.lock();
-        lock.unlock();
-        addPartition(partition);
-      }
+      return partition;
     }
   }
 
-  @Override
-  public Partition<I, V, E, M> getPartition(Integer partitionId) {
-    if (inMemoryPartitions.containsKey(partitionId)) {
-      return inMemoryPartitions.get(partitionId);
-    } else if (onDiskPartitions.containsKey(partitionId)) {
-      loadPartition(partitionId);
-      return loadedPartition;
-    } else {
-      throw new IllegalStateException("getPartition: partition " +
-          partitionId + " does not exist");
+  /**
+   * Task that puts a partition back to the store
+   */
+  private class PutPartition implements Callable<Void> {
+    /** Partition id */
+    private Integer id;
+
+    /**
+     * Constructor
+     *
+     * @param id The partition id
+     * @param partition The partition
+     */
+    public PutPartition(Integer id, Partition<I, V, E, M> partition) {
+      this.id = id;
     }
-  }
 
-  @Override
-  public Partition<I, V, E, M> removePartition(Integer partitionId) {
-    partitionLocks.remove(partitionId);
-    if (onDiskPartitions.containsKey(partitionId)) {
-      Partition<I, V, E, M> partition;
-      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
-        partition = loadedPartition;
-        loadedPartition = null;
-      } else {
-        try {
-          partition = readPartition(partitionId);
-        } catch (IOException e) {
-          throw new IllegalStateException("removePartition: failed reading " +
-              "partition " + partitionId + " from disk", e);
+    @Override
+    public Void call() throws Exception {
+      wLock.lock();
+      try {
+        if (decrementCounter(id) == 0) {
+          inactive.put(id, active.remove(id));
+          states.put(id, State.INACTIVE);
+          pending.get(id).signalAll();
+          notEmpty.signal();
         }
+        return null;
+      } finally {
+        wLock.unlock();
       }
-      onDiskPartitions.remove(partitionId);
-      return partition;
-    } else {
-      return inMemoryPartitions.remove(partitionId);
     }
   }
 
-  @Override
-  public void deletePartition(Integer partitionId) {
-    partitionLocks.remove(partitionId);
-    if (inMemoryPartitions.containsKey(partitionId)) {
-      inMemoryPartitions.remove(partitionId);
-    } else {
-      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
-        loadedPartition = null;
-      } else {
-        File file = new File(getPartitionPath(partitionId));
-        file.delete();
+  /**
+   * Task that adds a partition to the store
+   */
+  private class AddPartition implements Callable<Void> {
+    /** Partition id */
+    private Integer id;
+    /** Partition */
+    private Partition<I, V, E, M> partition;
+
+    /**
+     * Constructor
+     *
+     * @param id The partition id
+     * @param partition The partition
+     */
+    public AddPartition(Integer id, Partition<I, V, E, M> partition) {
+      this.id = id;
+      this.partition = partition;
+    }
+
+    @Override
+    public Void call() throws Exception {
+
+      wLock.lock();
+      try {
+        if (partitionIds.contains(id)) {
+          Partition<I, V, E, M> existing = null;
+          boolean isOOC = false;
+          boolean done  = false;
+          while (!done) {
+            State pState = states.get(id);
+            switch (pState) {
+            case ONDISK:
+              isOOC = true;
+              done  = true;
+              break;
+            /*
+             * just add data to the in-memory partitions,
+             * concurrency should be managed by the caller.
+             */
+            case INACTIVE:
+              existing = inactive.get(id);
+              done = true;
+              break;
+            case ACTIVE:
+              existing = active.get(id);
+              done = true;
+              break;
+            case LOADING:
+              pending.get(id).await();
+              break;
+            case OFFLOADING:
+              pending.get(id).await();
+              break;
+            default:
+              throw new IllegalStateException(
+                  "illegal state " + pState + " for partition " + id);
+            }
+          }
+          if (isOOC) {
+            addToOOCPartition(partition);
+          } else {
+            existing.addPartition(partition);
+          }
+        } else {
+          Condition newC = wLock.newCondition();
+          pending.put(id, newC);
+          partitionIds.add(id);
+          if (inMemoryPartitions < maxInMemoryPartitions) {
+            inMemoryPartitions++;
+            states.put(id, State.INACTIVE);
+            inactive.put(id, partition);
+            notEmpty.signal();
+          } else {
+            states.put(id, State.OFFLOADING);
+            onDisk.put(id, (int) partition.getVertexCount());
+            wLock.unlock();
+            offloadPartition(partition);
+            wLock.lock();
+            states.put(id, State.ONDISK);
+            newC.signalAll();
+          }
+        }
+        return null;
+      } finally {
+        wLock.unlock();
       }
-      onDiskPartitions.remove(partitionId);
     }
   }
 
-  @Override
-  public boolean hasPartition(Integer partitionId) {
-    return partitionLocks.containsKey(partitionId);
-  }
+  /**
+   * Task that deletes a partition to the store
+   */
+  private class DeletePartition implements Callable<Void> {
+    /** Partition id */
+    private Integer id;
 
-  @Override
-  public Iterable<Integer> getPartitionIds() {
-    return Iterables.concat(inMemoryPartitions.keySet(),
-        onDiskPartitions.keySet());
-  }
+    /**
+     * Constructor
+     *
+     * @param id The partition id
+     */
+    public DeletePartition(Integer id) {
+      this.id = id;
+    }
 
-  @Override
-  public int getNumPartitions() {
-    return partitionLocks.size();
+    @Override
+    public Void call() throws Exception {
+      boolean done = false;
+
+      wLock.lock();
+      try {
+        while (!done) {
+          State pState = states.get(id);
+          switch (pState) {
+          case ONDISK:
+            onDisk.remove(id);
+            deletePartitionFile(id);
+            done = true;
+            break;
+          case INACTIVE:
+            inactive.remove(id);
+            inMemoryPartitions--;
+            notEmpty.signal();
+            done = true;
+            break;
+          case ACTIVE:
+            pending.get(id).await();
+            break;
+          case LOADING:
+            pending.get(id).await();
+            break;
+          case OFFLOADING:
+            pending.get(id).await();
+            break;
+          default:
+            throw new IllegalStateException(
+                "illegal state " + pState + " for partition " + id);
+          }
+        }
+        partitionIds.remove(id);
+        states.remove(id);
+        counters.remove(id);
+        pending.remove(id).signalAll();
+        return null;
+      } finally {
+        wLock.unlock();
+      }
+    }
   }
 
+  /**
+   * Direct Executor that executes tasks within the calling threads.
+   */
+  private class DirectExecutorService extends AbstractExecutorService {
+    /** Executor state */
+    private volatile boolean shutdown = false;
+
+    /**
+     * Constructor
+     */
+    public DirectExecutorService() { }
+
+    /**
+     * Execute the task in the calling thread.
+     *
+     * @param task Task to execute
+     */
+    public void execute(Runnable task) {
+      task.run();
+    }
+
+    /**
+     * Shutdown the executor.
+     */
+    public void shutdown() {
+      this.shutdown = true;
+    }
+
+    /**
+     * Shutdown the executor and return the current queue (empty).
+     *
+     * @return The list of awaiting tasks
+     */
+    public List<Runnable> shutdownNow() {
+      this.shutdown = true;
+      return Collections.emptyList();
+    }
+
+    /**
+     * Return current shutdown state.
+     *
+     * @return Shutdown state
+     */
+    public boolean isShutdown() {
+      return shutdown;
+    }
+
+    /**
+     * Return current termination state.
+     *
+     * @return Termination state
+     */
+    public boolean isTerminated() {
+      return shutdown;
+    }
+
+    /**
+     * Do nothing and return shutdown state.
+     *
+     * @param timeout Timeout
+     * @param unit Time unit
+     * @return Shutdown state
+     */
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+      return shutdown;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 3e8dda9..4206ce3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.partition;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -43,7 +41,8 @@ public abstract class PartitionStore<I extends WritableComparable,
   public abstract void addPartition(Partition<I, V, E, M> partition);
 
   /**
-   * Get a partition.
+   * Get a partition. Note: user has to put back it to the store through
+   * {@link #putPartition(Integer, Partition)} after use.
    *
    * @param partitionId Partition id
    * @return The requested partition
@@ -51,6 +50,14 @@ public abstract class PartitionStore<I extends WritableComparable,
   public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
 
   /**
+   * Put a partition back to the store. Use this method to be put a partition
+   * back after it has been retrieved through {@link #getPartition(Integer)}.
+   *
+   * @param partition Partition
+   */
+  public abstract void putPartition(Partition<I, V, E, M> partition);
+
+  /**
    * Remove a partition and return it.
    *
    * @param partitionId Partition id
@@ -99,18 +106,7 @@ public abstract class PartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Return all the stored partitions as an Iterable.  Note that this may force
-   * out-of-core partitions to be loaded into memory if using out-of-core.
-   *
-   * @return The partition objects
+   * Called at the end of the computation.
    */
-  public Iterable<Partition<I, V, E, M>> getPartitions() {
-    return Iterables.transform(getPartitionIds(),
-        new Function<Integer, Partition<I, V, E, M>>() {
-          @Override
-          public Partition<I, V, E, M> apply(Integer partitionId) {
-            return getPartition(partitionId);
-          }
-        });
-  }
+  public void shutdown() { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 7bd0bb1..74cc3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -101,5 +101,6 @@ public class SimplePartitionStore<I extends WritableComparable,
     return partitions.size();
   }
 
-
+  @Override
+  public void putPartition(Partition<I, V, E, M> partition) { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f542344..a48c5ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -546,8 +546,9 @@ else[HADOOP_NON_SECURE]*/
     // if necessary
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
-    for (Partition<I, V, E, M> partition :
-        getPartitionStore().getPartitions()) {
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      Partition<I, V, E, M> partition =
+          getPartitionStore().getPartition(partitionId);
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
               partition.getVertexCount(),
@@ -555,6 +556,7 @@ else[HADOOP_NON_SECURE]*/
               partition.getEdgeCount(),
               0);
       partitionStatsList.add(partitionStats);
+      getPartitionStore().putPartition(partition);
     }
     workerGraphPartitioner.finalizePartitionStats(
         partitionStatsList, getPartitionStore());
@@ -894,8 +896,9 @@ else[HADOOP_NON_SECURE]*/
     long nextPrintMsecs = System.currentTimeMillis() + 15000;
     int partitionIndex = 0;
     int numPartitions = getPartitionStore().getNumPartitions();
-    for (Partition<I, V, E, M> partition :
-        getPartitionStore().getPartitions()) {
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      Partition<I, V, E, M> partition =
+          getPartitionStore().getPartition(partitionId);
       for (Vertex<I, V, E, M> vertex : partition) {
         getContext().progress();
         vertexWriter.writeVertex(vertex);
@@ -914,6 +917,7 @@ else[HADOOP_NON_SECURE]*/
           nextPrintVertices = verticesWritten + 250000;
         }
       }
+      getPartitionStore().putPartition(partition);
       getContext().progress();
       ++partitionIndex;
     }
@@ -928,6 +932,7 @@ else[HADOOP_NON_SECURE]*/
     workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
     saveVertices(finishedSuperstepStats.getLocalVertexCount());
+    getPartitionStore().shutdown();
     // All worker processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions
     // for workers and masters, the master will clean up the ZooKeeper
@@ -1017,8 +1022,9 @@ else[HADOOP_NON_SECURE]*/
         getFs().create(verticesFilePath);
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
     DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
-    for (Partition<I, V, E, M> partition :
-        getPartitionStore().getPartitions()) {
+    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+      Partition<I, V, E, M> partition =
+          getPartitionStore().getPartition(partitionId);
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
       // write messages
@@ -1037,6 +1043,7 @@ else[HADOOP_NON_SECURE]*/
             (verticesOutputStream.getPos() - startPos) +
             ", partition = " + partition.toString());
       }
+      getPartitionStore().putPartition(partition);
       getContext().progress();
     }
     // Metadata is buffered and written at the end since it's small and
@@ -1388,11 +1395,6 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Partition<I, V, E, M> getPartition(I vertexId) {
-    return getPartitionStore().getPartition(getPartitionId(vertexId));
-  }
-
-  @Override
   public Integer getPartitionId(I vertexId) {
     PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
     return partitionOwner.getPartitionId();
@@ -1404,17 +1406,6 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Vertex<I, V, E, M> getVertex(I vertexId) {
-    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
-    if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
-      return getPartitionStore().getPartition(
-          partitionOwner.getPartitionId()).getVertex(vertexId);
-    } else {
-      return null;
-    }
-  }
-
-  @Override
   public ServerData<I, V, E, M> getServerData() {
     return workerServer.getServerData();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 7187928..d779fe4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -137,12 +137,15 @@ public class RequestTest {
         serverData.getPartitionStore();
     assertTrue(partitionStore.hasPartition(partitionId));
     int total = 0;
+    Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition2 =
+        partitionStore.getPartition(partitionId);	
     for (Vertex<IntWritable, IntWritable,
-        IntWritable, IntWritable> vertex :
-        partitionStore.getPartition(partitionId)) {
+        IntWritable, IntWritable> vertex : partition2) {
       total += vertex.getId().get();
     }
+    partitionStore.putPartition(partition2);
     assertEquals(total, 45);
+    partitionStore.shutdown();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index b02ed3a..b4cddf6 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -34,6 +34,8 @@ import org.junit.Test;
 import com.google.common.collect.Iterables;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -80,6 +82,7 @@ public class TestPartitionStores {
         partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
                 NullWritable, IntWritable>(conf, context);
     testReadWrite(partitionStore, conf);
+    partitionStore.shutdown();
   }
 
   @Test
@@ -137,11 +140,13 @@ public class TestPartitionStores {
         partitionStore = new DiskBackedPartitionStore<IntWritable,
                         IntWritable, NullWritable, IntWritable>(conf, context);
     testReadWrite(partitionStore, conf);
+    partitionStore.shutdown();
 
     conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 2);
     partitionStore = new DiskBackedPartitionStore<IntWritable,
             IntWritable, NullWritable, IntWritable>(conf, context);
     testReadWrite(partitionStore, conf);
+    partitionStore.shutdown();
   }
 
   /**
@@ -185,16 +190,26 @@ public class TestPartitionStores {
 
     Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
         partitionStore.getPartition(1);
+    partitionStore.putPartition(partition1);
     Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
         partitionStore.getPartition(2);
+    partitionStore.putPartition(partition2);
     Partition<IntWritable, IntWritable, NullWritable,
         IntWritable> partition3 = partitionStore.removePartition(3);
     Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
         partitionStore.getPartition(4);
+    partitionStore.putPartition(partition4);
 
     assertEquals(3, partitionStore.getNumPartitions());
     assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
-    assertEquals(3, Iterables.size(partitionStore.getPartitions()));
+    int partitionsNumber = 0;
+    for (Integer partitionId : partitionStore.getPartitionIds()) {
+      Partition<IntWritable, IntWritable, NullWritable, IntWritable> p = 
+          partitionStore.getPartition(partitionId);
+      partitionStore.putPartition(p);
+      partitionsNumber++;
+    }
+    assertEquals(3, partitionsNumber);
     assertTrue(partitionStore.hasPartition(1));
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));