You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/05/20 22:14:26 UTC

[2/5] git commit: updated refs/heads/trunk to 6256a76

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 6c08dfd..784d578 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -20,6 +20,7 @@ package org.apache.giraph.ooc.data;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.log4j.Logger;
@@ -27,7 +28,6 @@ import org.apache.log4j.Logger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -59,11 +59,11 @@ public class MetaPartitionManager {
 
   /** Number of in-memory partitions */
   private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
-  /** Map of partitions to their meta information */
+  /** Map (dictionary) of partitions to their meta information */
   private final ConcurrentMap<Integer, MetaPartition> partitions =
       Maps.newConcurrentMap();
-  /** List of partitions assigned to each IO threads */
-  private final List<PerThreadPartitionStatus> perThreadPartitions;
+  /** Reverse dictionaries of partitions assigned to each IO thread */
+  private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
   /** For each IO thread, set of partition ids that are on-disk and have
    * 'large enough' vertex/edge buffers to be offloaded on disk
    */
@@ -95,11 +95,11 @@ public class MetaPartitionManager {
    * @param oocEngine out-of-core engine
    */
   public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
-    perThreadPartitions = new ArrayList<>(numIOThreads);
+    perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
     perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
     perThreadMessageBuffers = new ArrayList<>(numIOThreads);
     for (int i = 0; i < numIOThreads; ++i) {
-      perThreadPartitions.add(new PerThreadPartitionStatus());
+      perThreadPartitionDictionary.add(new MetaPartitionDictionary());
       perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
       perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
     }
@@ -156,11 +156,7 @@ public class MetaPartitionManager {
     if (temp == null) {
       int ownerThread = oocEngine.getIOScheduler()
           .getOwnerThreadId(partitionId);
-      Set<MetaPartition> partitionSet =
-          perThreadPartitions.get(ownerThread).getInMemoryProcessed();
-      synchronized (partitionSet) {
-        partitionSet.add(meta);
-      }
+      perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
       numInMemoryPartitions.getAndIncrement();
     }
   }
@@ -173,6 +169,8 @@ public class MetaPartitionManager {
    */
   public void removePartition(Integer partitionId) {
     MetaPartition meta = partitions.remove(partitionId);
+    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
     checkState(!meta.isOnDisk());
     numInMemoryPartitions.getAndDecrement();
   }
@@ -216,20 +214,46 @@ public class MetaPartitionManager {
    * @return id of the partition to offload on disk
    */
   public Integer getOffloadPartitionId(int threadId) {
-    Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
-        .getInMemoryProcessed();
-    synchronized (partitionSet) {
-      MetaPartition meta = peekFromSet(partitionSet);
-      if (meta != null) {
-        return meta.getPartitionId();
-      }
-    }
-    partitionSet = perThreadPartitions.get(threadId).getInMemoryUnprocessed();
-    synchronized (partitionSet) {
-      MetaPartition meta = peekFromSet(partitionSet);
-      if (meta != null) {
-        return meta.getPartitionId();
-      }
+    MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.IN_MEM,
+        StorageState.IN_MEM,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.IN_MEM,
+        null,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        null,
+        StorageState.IN_MEM,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.IN_MEM,
+        null,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        null,
+        StorageState.IN_MEM,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
     }
     return null;
   }
@@ -241,8 +265,7 @@ public class MetaPartitionManager {
    * @return id of the partition to offload its vertex/edge buffers on disk
    */
   public Integer getOffloadPartitionBufferId(int threadId) {
-    if (oocEngine.getServiceWorker().getSuperstep() ==
-        BspServiceWorker.INPUT_SUPERSTEP) {
+    if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
       Integer partitionId =
           popFromSet(perThreadVertexEdgeBuffers.get(threadId));
       if (partitionId == null) {
@@ -270,8 +293,7 @@ public class MetaPartitionManager {
    * @return id of the partition to offload its message buffer on disk
    */
   public Integer getOffloadMessageBufferId(int threadId) {
-    if (oocEngine.getServiceWorker().getSuperstep() !=
-        BspServiceWorker.INPUT_SUPERSTEP) {
+    if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
       Integer partitionId =
           popFromSet(perThreadMessageBuffers.get(threadId));
       if (partitionId == null) {
@@ -297,53 +319,115 @@ public class MetaPartitionManager {
    * @return id of the partition to offload its message on disk
    */
   public Integer getOffloadMessageId(int threadId) {
-    Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
-        .getInDiskProcessed();
-    synchronized (partitionSet) {
-      for (MetaPartition meta : partitionSet) {
-        if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
-          return meta.getPartitionId();
-        }
-      }
-    }
-    partitionSet = perThreadPartitions.get(threadId).getInDiskUnprocessed();
-    synchronized (partitionSet) {
-      for (MetaPartition meta : partitionSet) {
-        if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
-          return meta.getPartitionId();
-        }
-      }
+    if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+      return null;
+    }
+    MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.ON_DISK,
+        null,
+        StorageState.IN_MEM);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.ON_DISK,
+        null,
+        StorageState.IN_MEM);
+    if (meta != null) {
+      return meta.getPartitionId();
     }
     return null;
   }
 
   /**
-   * Get id of a partition to prefetch its data to memory
+   * Get id of a partition to load its data to memory
    *
    * @param threadId id of the thread who is going to load the partition data
    * @return id of the partition to load its data to memory
    */
-  public Integer getPrefetchPartitionId(int threadId) {
-    Set<MetaPartition> partitionSet =
-        perThreadPartitions.get(threadId).getInDiskUnprocessed();
-    synchronized (partitionSet) {
-      MetaPartition meta = peekFromSet(partitionSet);
-      return (meta != null) ? meta.getPartitionId() : null;
+  public Integer getLoadPartitionId(int threadId) {
+    MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.IN_MEM,
+        StorageState.ON_DISK,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.UNPROCESSED,
+        StorageState.ON_DISK,
+        null,
+        null);
+    if (meta != null) {
+      return meta.getPartitionId();
+    }
+
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        StorageState.ON_DISK,
+        null,
+        null);
+    if (meta != null) {
+      meta.getPartitionId();
+    }
+
+    meta = perThreadPartitionDictionary.get(threadId).lookup(
+        ProcessingState.PROCESSED,
+        null,
+        StorageState.ON_DISK,
+        null);
+    if (meta != null) {
+      meta.getPartitionId();
     }
+    return null;
   }
 
   /**
-   * Mark a partition inaccessible to IO and compute threads
+   * Mark a partition as being 'IN_PROCESS'
    *
    * @param partitionId id of the partition to mark
    */
-  public void makePartitionInaccessible(int partitionId) {
+  public void markPartitionAsInProcess(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    perThreadPartitions.get(oocEngine.getIOScheduler()
-        .getOwnerThreadId(partitionId))
-        .remove(meta);
+    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
     synchronized (meta) {
+      perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.IN_PROCESS);
+      perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
+    }
+  }
+
+  /**
+   * Whether there is any processed partition stored in memory (excluding those
+   * that are prefetched to execute in the next superstep).
+   *
+   * @return true iff there is any processed partition in memory
+   */
+  public boolean hasProcessedOnMemory() {
+    for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
+      if (dictionary.hasProcessedOnMemory()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Whether a partition is *processed* in the current iteration cycle over
+   * partitions.
+   *
+   * @param partitionId id of the partition to check
+   * @return true iff processing the given partition is done
+   */
+  public boolean isPartitionProcessed(Integer partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    synchronized (meta) {
+      return meta.getProcessingState() == ProcessingState.PROCESSED;
     }
   }
 
@@ -354,14 +438,11 @@ public class MetaPartitionManager {
    */
   public void setPartitionIsProcessed(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
+    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
     synchronized (meta) {
+      perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.PROCESSED);
-    }
-    Set<MetaPartition> partitionSet = perThreadPartitions
-        .get(oocEngine.getIOScheduler().getOwnerThreadId(partitionId))
-        .getInMemoryProcessed();
-    synchronized (partitionSet) {
-      partitionSet.add(meta);
+      perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
     }
     numPartitionsProcessed.getAndIncrement();
   }
@@ -378,7 +459,7 @@ public class MetaPartitionManager {
     MetaPartition meta = partitions.get(partitionId);
     synchronized (meta) {
       boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
-      if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
+      if (superstep == oocEngine.getSuperstep()) {
         shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
       } else {
         shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
@@ -391,25 +472,27 @@ public class MetaPartitionManager {
    * Notify this meta store that load of a partition for a specific superstep
    * is completed
    *
-   * @param partitionId id of a the partition that load is completed
+   * @param partitionId id of the partition for which the load is completed
    * @param superstep superstep in which the partition is loaded for
    */
   public void doneLoadingPartition(int partitionId, long superstep) {
     MetaPartition meta = partitions.get(partitionId);
     numInMemoryPartitions.getAndIncrement();
     int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
-    boolean removed = perThreadPartitions.get(owner)
-        .remove(meta, StorageState.ON_DISK);
-    if (removed || meta.getProcessingState() == ProcessingState.IN_PROCESS) {
-      synchronized (meta) {
-        meta.setPartitionState(StorageState.IN_MEM);
-        if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
-          meta.setCurrentMessagesState(StorageState.IN_MEM);
-        } else {
-          meta.setIncomingMessagesState(StorageState.IN_MEM);
-        }
+    synchronized (meta) {
+      perThreadPartitionDictionary.get(owner).removePartition(meta);
+      meta.setPartitionState(StorageState.IN_MEM);
+      if (superstep == oocEngine.getSuperstep()) {
+        meta.setCurrentMessagesState(StorageState.IN_MEM);
+      } else {
+        meta.setIncomingMessagesState(StorageState.IN_MEM);
+      }
+      // Check whether load was to prefetch a partition from disk to memory for
+      // the next superstep
+      if (meta.getProcessingState() == ProcessingState.PROCESSED) {
+        perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
       }
-      perThreadPartitions.get(owner).add(meta, StorageState.IN_MEM);
+      perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
   }
 
@@ -422,9 +505,13 @@ public class MetaPartitionManager {
    */
   public boolean startOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
+    int ownerThread =
+        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
     synchronized (meta) {
       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
+        perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
         meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
+        perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
         return true;
       } else {
         return false;
@@ -441,8 +528,12 @@ public class MetaPartitionManager {
    */
   public void doneOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
+    int ownerThread =
+        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
     synchronized (meta) {
+      perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setIncomingMessagesState(StorageState.ON_DISK);
+      perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
     }
   }
 
@@ -478,17 +569,18 @@ public class MetaPartitionManager {
   public boolean startOffloadingPartition(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
     int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
-    boolean removed = perThreadPartitions.get(owner)
-        .remove(meta, StorageState.IN_MEM);
-    if (removed) {
-      synchronized (meta) {
+    synchronized (meta) {
+      if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
+          (meta.getPartitionState() == StorageState.IN_MEM ||
+          meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
+        perThreadPartitionDictionary.get(owner).removePartition(meta);
         meta.setPartitionState(StorageState.IN_TRANSIT);
         meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+        perThreadPartitionDictionary.get(owner).addPartition(meta);
+        return true;
+      } else {
+        return false;
       }
-      perThreadPartitions.get(owner).add(meta, StorageState.IN_TRANSIT);
-      return true;
-    } else {
-      return false;
     }
   }
 
@@ -502,14 +594,11 @@ public class MetaPartitionManager {
     numInMemoryPartitions.getAndDecrement();
     MetaPartition meta = partitions.get(partitionId);
     int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
-    boolean removed = perThreadPartitions.get(owner)
-        .remove(meta, StorageState.IN_TRANSIT);
-    if (removed) {
-      synchronized (meta) {
-        meta.setPartitionState(StorageState.ON_DISK);
-        meta.setCurrentMessagesState(StorageState.ON_DISK);
-      }
-      perThreadPartitions.get(owner).add(meta, StorageState.ON_DISK);
+    synchronized (meta) {
+      perThreadPartitionDictionary.get(owner).removePartition(meta);
+      meta.setPartitionState(StorageState.ON_DISK);
+      meta.setCurrentMessagesState(StorageState.ON_DISK);
+      perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
   }
 
@@ -517,15 +606,17 @@ public class MetaPartitionManager {
    * Reset the meta store for a new iteration cycle over all partitions.
    * Note: this is not thread-safe and should be called from a single thread.
    */
-  public void resetPartition() {
+  public void resetPartitions() {
     for (MetaPartition meta : partitions.values()) {
+      int owner =
+          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetPartition();
+      perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
-    int numPartition = 0;
-    for (PerThreadPartitionStatus status : perThreadPartitions) {
-      numPartition += status.reset();
+    for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
+      dictionary.reset();
     }
-    checkState(numPartition == partitions.size());
     numPartitionsProcessed.set(0);
   }
 
@@ -535,41 +626,22 @@ public class MetaPartitionManager {
    */
   public void resetMessages() {
     for (MetaPartition meta : partitions.values()) {
+      int owner =
+          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetMessages();
-    }
-    // After swapping incoming messages and current messages, it may be the case
-    // that a partition has data in memory (partitionState == IN_MEM), but now
-    // its current messages are on disk (currentMessageState == ON_DISK). So, we
-    // have to mark the partition as ON_DISK, and load its messages once it is
-    // about to be processed.
-    for (PerThreadPartitionStatus status : perThreadPartitions) {
-      Set<MetaPartition> partitionSet = status.getInMemoryUnprocessed();
-      Iterator<MetaPartition> it = partitionSet.iterator();
-      while (it.hasNext()) {
-        MetaPartition meta = it.next();
-        if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
-          it.remove();
-          status.getInDiskUnprocessed().add(meta);
-          numInMemoryPartitions.getAndDecrement();
-        }
-      }
-      partitionSet = status.getInMemoryProcessed();
-      it = partitionSet.iterator();
-      while (it.hasNext()) {
-        MetaPartition meta = it.next();
-        if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
-          it.remove();
-          status.getInDiskProcessed().add(meta);
-          numInMemoryPartitions.getAndDecrement();
-        }
+      if (meta.getPartitionState() == StorageState.IN_MEM &&
+          meta.getCurrentMessagesState() == StorageState.ON_DISK) {
+        numInMemoryPartitions.getAndDecrement();
       }
+      perThreadPartitionDictionary.get(owner).addPartition(meta);
     }
   }
 
   /**
    * Return the id of an unprocessed partition in memory. If all partitions are
    * processed, return an appropriate 'finisher signal'. If there are
-   * unprocessed partitions, but none are is memory, return null.
+   * unprocessed partitions, but none are in memory, return null.
    *
    * @return id of the partition to be processed next.
    */
@@ -577,20 +649,40 @@ public class MetaPartitionManager {
     if (numPartitionsProcessed.get() >= partitions.size()) {
       return NO_PARTITION_TO_PROCESS;
     }
-    int numThreads = perThreadPartitions.size();
+    int numThreads = perThreadPartitionDictionary.size();
     int index = randomGenerator.nextInt(numThreads);
     int startIndex = index;
+    MetaPartition meta;
     do {
-      Set<MetaPartition> partitionSet =
-          perThreadPartitions.get(index).getInMemoryUnprocessed();
-      MetaPartition meta;
-      synchronized (partitionSet) {
-        meta = popFromSet(partitionSet);
-      }
-      if (meta != null) {
-        synchronized (meta) {
-          meta.setProcessingState(ProcessingState.IN_PROCESS);
-          return meta.getPartitionId();
+      // We first look up a partition in the reverse dictionary. If there is a
+      // partition with the given properties, we then check whether we can
+      // return it as the next partition to process. If we cannot, there may
+      // still be other partitions in the dictionary, so we will continue
+      // looping through all of them. If all the partitions with our desired
+      // properties has been examined, we will break the loop.
+      while (true) {
+        meta = perThreadPartitionDictionary.get(index).lookup(
+            ProcessingState.UNPROCESSED,
+            StorageState.IN_MEM,
+            StorageState.IN_MEM,
+            null);
+        if (meta != null) {
+          // Here we should check if the 'meta' still has the same property as
+          // when it was looked up in the dictionary. There may be a case where
+          // meta changes from the time it is looked up until the moment the
+          // synchronize block is granted to progress.
+          synchronized (meta) {
+            if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
+                meta.getPartitionState() == StorageState.IN_MEM &&
+                meta.getCurrentMessagesState() == StorageState.IN_MEM) {
+              perThreadPartitionDictionary.get(index).removePartition(meta);
+              meta.setProcessingState(ProcessingState.IN_PROCESS);
+              perThreadPartitionDictionary.get(index).addPartition(meta);
+              return meta.getPartitionId();
+            }
+          }
+        } else {
+          break;
         }
       }
       index = (index + 1) % numThreads;
@@ -607,7 +699,9 @@ public class MetaPartitionManager {
    */
   public boolean isPartitionOnDisk(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    return meta.isOnDisk();
+    synchronized (meta) {
+      return meta.isOnDisk();
+    }
   }
 
   /**
@@ -623,18 +717,19 @@ public class MetaPartitionManager {
     /** Storage state of partition data */
     private StorageState partitionState;
     /** Processing state of a partition */
-    private volatile ProcessingState processingState;
+    private ProcessingState processingState;
 
     /**
      * Constructor
      *
      * @param partitionId id of the partition
      */
-    MetaPartition(int partitionId) {
+    public MetaPartition(int partitionId) {
       this.partitionId = partitionId;
-      this.processingState = ProcessingState.PROCESSED;
+      this.processingState = ProcessingState.UNPROCESSED;
       this.partitionState = StorageState.IN_MEM;
       this.currentMessagesState = StorageState.IN_MEM;
+      this.incomingMessagesState = StorageState.IN_MEM;
     }
 
     @Override
@@ -649,65 +744,30 @@ public class MetaPartitionManager {
       return sb.toString();
     }
 
-    /**
-     * Get id of the partition
-     *
-     * @return id of the partition
-     */
     public int getPartitionId() {
       return partitionId;
     }
 
-    /**
-     * Get storage state of incoming messages of the partition
-     *
-     * @return storage state of incoming messages
-     */
     public StorageState getIncomingMessagesState() {
       return incomingMessagesState;
     }
 
-    /**
-     * Set storage state of incoming messages of the partition
-     *
-     * @param incomingMessagesState storage state of incoming messages
-     */
     public void setIncomingMessagesState(StorageState incomingMessagesState) {
       this.incomingMessagesState = incomingMessagesState;
     }
 
-    /**
-     * Get storage state of current messages of the partition
-     *
-     * @return storage state of current messages
-     */
     public StorageState getCurrentMessagesState() {
       return currentMessagesState;
     }
 
-    /**
-     * Set storage state of current messages of the partition
-     *
-     * @param currentMessagesState storage state of current messages
-     */
     public void setCurrentMessagesState(StorageState currentMessagesState) {
       this.currentMessagesState = currentMessagesState;
     }
 
-    /**
-     * Get storage state of the partition
-     *
-     * @return storage state of the partition
-     */
     public StorageState getPartitionState() {
       return partitionState;
     }
 
-    /**
-     * Set storage state of the partition
-     *
-     * @param state storage state of the partition
-     */
     public void setPartitionState(StorageState state) {
       this.partitionState = state;
     }
@@ -748,200 +808,167 @@ public class MetaPartitionManager {
   }
 
   /**
-   * Representation of partitions' state per IO thread
+   * Class representing reverse dictionary for partitions. The main operation
+   * of the reverse dictionary is to lookup for a partition with certain
+   * properties. The responsibility of keeping the dictionary consistent
+   * when partition property changes in on the code that changes the property.
+   * One can simply remove a partition from the dictionary, change the property
+   * (or properties), and then add the partition to the dictionary.
    */
-  private static class PerThreadPartitionStatus {
+  private static class MetaPartitionDictionary {
     /**
-     * Contains partitions that has been processed in the current iteration
-     * cycle, and are not in use by any thread.
+     * Sets of partitions for each possible combination of properties. Each
+     * partition can have 4 properties, and each property can have any of 3
+     * different values. The properties are as follows (in the order in which
+     * it is used as the dimensions of the following 4-D array):
+     *  - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)
+     *  - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)
+     *  - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
+     *  - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
      */
-    private Map<StorageState, Set<MetaPartition>>
-        processedPartitions = Maps.newConcurrentMap();
+    private final Set<MetaPartition>[][][][] partitions =
+        (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
     /**
-     * Contains partitions that has *NOT* been processed in the current
-     * iteration cycle, and are not in use by any thread.
+     * Number of partitions that has been prefetched to be computed in the
+     * next superstep
      */
-    private Map<StorageState, Set<MetaPartition>>
-        unprocessedPartitions = Maps.newConcurrentMap();
+    private final AtomicInteger numPrefetch = new AtomicInteger(0);
 
     /**
      * Constructor
      */
-    public PerThreadPartitionStatus() {
-      processedPartitions.put(StorageState.IN_MEM,
-          Sets.<MetaPartition>newLinkedHashSet());
-      processedPartitions.put(StorageState.ON_DISK,
-          Sets.<MetaPartition>newLinkedHashSet());
-      processedPartitions.put(StorageState.IN_TRANSIT,
-          Sets.<MetaPartition>newLinkedHashSet());
-
-      unprocessedPartitions.put(StorageState.IN_MEM,
-          Sets.<MetaPartition>newLinkedHashSet());
-      unprocessedPartitions.put(StorageState.ON_DISK,
-          Sets.<MetaPartition>newLinkedHashSet());
-      unprocessedPartitions.put(StorageState.IN_TRANSIT,
-          Sets.<MetaPartition>newLinkedHashSet());
-    }
-
-    @Override
-    public String toString() {
-      StringBuffer sb = new StringBuffer();
-      sb.append("\nProcessed Partitions: " + processedPartitions + "; ");
-      sb.append("\nUnprocessedPartitions: " + unprocessedPartitions);
-      return sb.toString();
-    }
-
-    /**
-     * Get set of partitions that are in memory and are processed
-     *
-     * @return set of partition that are in memory are are processed
-     */
-    public Set<MetaPartition> getInMemoryProcessed() {
-      return processedPartitions.get(StorageState.IN_MEM);
-    }
-
-    /**
-     * Get set of partitions that are in memory are are not processed
-     *
-     * @return set of partitions that are in memory and are not processed
-     */
-    public Set<MetaPartition> getInMemoryUnprocessed() {
-      return unprocessedPartitions.get(StorageState.IN_MEM);
+    public MetaPartitionDictionary() {
+      for (int i = 0; i < 3; ++i) {
+        for (int j = 0; j < 3; ++j) {
+          for (int k = 0; k < 3; ++k) {
+            for (int t = 0; t < 3; ++t) {
+              partitions[i][j][k][t] = Sets.newLinkedHashSet();
+            }
+          }
+        }
+      }
     }
 
     /**
-     * Get set of partitions that are on disk and are processed
+     * Get a partition set associated with property combination that a given
+     * partition has
      *
-     * @return set of partitions that are on disk and are processed
+     * @param meta meta partition containing properties of a partition
+     * @return partition set with the same property combination as the given
+     *         meta partition
      */
-    public Set<MetaPartition> getInDiskProcessed() {
-      return processedPartitions.get(StorageState.ON_DISK);
+    private Set<MetaPartition> getSet(MetaPartition meta) {
+      return partitions[meta.getProcessingState().ordinal()]
+          [meta.getPartitionState().ordinal()]
+          [meta.getCurrentMessagesState().ordinal()]
+          [meta.getIncomingMessagesState().ordinal()];
     }
 
     /**
-     * Get set of partitions that are on disk and are not processed
+     * Add a partition to the dictionary
      *
-     * @return set of partitions that are on disk and are not processed
+     * @param meta meta information of the partition to add
      */
-    public Set<MetaPartition> getInDiskUnprocessed() {
-      return unprocessedPartitions.get(StorageState.ON_DISK);
+    public void addPartition(MetaPartition meta) {
+      Set<MetaPartition> partitionSet = getSet(meta);
+      synchronized (partitionSet) {
+        partitionSet.add(meta);
+      }
     }
 
     /**
-     * Remove a partition from meta information
+     * Remove a partition to the dictionary
      *
-     * @param meta meta-information of a partition to be removed
+     * @param meta meta infomation of the partition to remove
      */
-    public void remove(MetaPartition meta) {
-      Set<MetaPartition> partitionSet;
-      partitionSet = processedPartitions.get(StorageState.IN_MEM);
-      synchronized (partitionSet) {
-        if (partitionSet.remove(meta)) {
-          return;
-        }
-      }
-      partitionSet = unprocessedPartitions.get(StorageState.IN_MEM);
-      synchronized (partitionSet) {
-        if (partitionSet.remove(meta)) {
-          return;
-        }
-      }
-      partitionSet = processedPartitions.get(StorageState.IN_TRANSIT);
-      synchronized (partitionSet) {
-        if (partitionSet.remove(meta)) {
-          return;
-        }
-      }
-      partitionSet = unprocessedPartitions.get(StorageState.IN_TRANSIT);
-      synchronized (partitionSet) {
-        if (partitionSet.remove(meta)) {
-          return;
-        }
-      }
-      partitionSet = processedPartitions.get(StorageState.ON_DISK);
-      synchronized (partitionSet) {
-        if (partitionSet.remove(meta)) {
-          return;
-        }
-      }
-      partitionSet = unprocessedPartitions.get(StorageState.ON_DISK);
+    public void removePartition(MetaPartition meta) {
+      Set<MetaPartition> partitionSet = getSet(meta);
       synchronized (partitionSet) {
         partitionSet.remove(meta);
       }
     }
 
     /**
-     * Reset meta-information for the next iteration cycle over all partitions
+     * Lookup for a partition with given properties. One can use wildcard as
+     * a property in lookup operation (by passing null as the property).
      *
-     * @return total number of partitions kept for this thread
+     * @param processingState processing state property
+     * @param partitionStorageState partition storage property
+     * @param currentMessagesState current messages storage property
+     * @param incomingMessagesState incoming messages storage property
+     * @return a meta partition in the dictionary with the given combination of
+     *         properties. If there is no such partition, return null
      */
-    public int reset() {
-      checkState(unprocessedPartitions.get(StorageState.IN_MEM).size() == 0);
-      checkState(unprocessedPartitions.get(StorageState.IN_TRANSIT).size() ==
-          0);
-      checkState(unprocessedPartitions.get(StorageState.ON_DISK).size() == 0);
-      unprocessedPartitions.clear();
-      unprocessedPartitions.putAll(processedPartitions);
-      processedPartitions.clear();
-      processedPartitions.put(StorageState.IN_MEM,
-          Sets.<MetaPartition>newLinkedHashSet());
-      processedPartitions.put(StorageState.ON_DISK,
-          Sets.<MetaPartition>newLinkedHashSet());
-      processedPartitions.put(StorageState.IN_TRANSIT,
-          Sets.<MetaPartition>newLinkedHashSet());
-      return unprocessedPartitions.get(StorageState.IN_MEM).size() +
-          unprocessedPartitions.get(StorageState.IN_TRANSIT).size() +
-          unprocessedPartitions.get(StorageState.ON_DISK).size();
+    public MetaPartition lookup(ProcessingState processingState,
+                                StorageState partitionStorageState,
+                                StorageState currentMessagesState,
+                                StorageState incomingMessagesState) {
+      int iStart =
+          (processingState == null) ? 0 : processingState.ordinal();
+      int iEnd =
+          (processingState == null) ? 3 : (processingState.ordinal() + 1);
+      int jStart =
+          (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
+      int jEnd = (partitionStorageState == null) ? 3 :
+              (partitionStorageState.ordinal() + 1);
+      int kStart =
+          (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
+      int kEnd = (currentMessagesState == null) ? 3 :
+              (currentMessagesState.ordinal() + 1);
+      int tStart =
+          (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
+      int tEnd = (incomingMessagesState == null) ? 3 :
+          (incomingMessagesState.ordinal() + 1);
+      for (int i = iStart; i < iEnd; ++i) {
+        for (int j = jStart; j < jEnd; ++j) {
+          for (int k = kStart; k < kEnd; ++k) {
+            for (int t = tStart; t < tEnd; ++t) {
+              Set<MetaPartition> partitionSet = partitions[i][j][k][t];
+              synchronized (partitionSet) {
+                MetaPartition meta = peekFromSet(partitionSet);
+                if (meta != null) {
+                  return meta;
+                }
+              }
+            }
+          }
+        }
+      }
+      return null;
     }
 
     /**
-     * Remove a partition from partition set of a given state
+     * Whether there is an in-memory partition that is processed already,
+     * excluding those partitions that are prefetched
      *
-     * @param meta meta partition to remove
-     * @param state state from which the partition should be removed
-     * @return true iff the partition is actually removed
+     * @return true if there is a processed in-memory partition
      */
-    public boolean remove(MetaPartition meta, StorageState state) {
-      boolean removed = false;
-      Set<MetaPartition> partitionSet = null;
-      if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
-        partitionSet = unprocessedPartitions.get(state);
-      } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
-        partitionSet = processedPartitions.get(state);
-      } else {
-        LOG.info("remove: partition " + meta.getPartitionId() + " is " +
-            "already being processed! This should happen only if partition " +
-            "removal is done before start of an iteration over all partitions");
-      }
-      if (partitionSet != null) {
-        synchronized (partitionSet) {
-          removed = partitionSet.remove(meta);
+    public boolean hasProcessedOnMemory() {
+      int count = 0;
+      for (int i = 0; i < 3; ++i) {
+        for (int j = 0; j < 3; ++j) {
+          Set<MetaPartition> partitionSet =
+              partitions[ProcessingState.PROCESSED.ordinal()]
+                  [StorageState.IN_MEM.ordinal()][i][j];
+          synchronized (partitionSet) {
+            count += partitionSet.size();
+          }
         }
       }
-      return removed;
+      return count - numPrefetch.get() != 0;
+    }
+
+    /** Increase number of prefetch-ed partition by 1 */
+    public void increaseNumPrefetch() {
+      numPrefetch.getAndIncrement();
     }
 
     /**
-     * Add a partition to partition set of a given state
-     *
-     * @param meta meta partition to add
-     * @param state state to which the partition should be added
+     * Reset the dictionary preparing it for the next iteration cycle over
+     * partitions
      */
-    public void add(MetaPartition meta, StorageState state) {
-      Set<MetaPartition> partitionSet = null;
-      if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
-        partitionSet = unprocessedPartitions.get(state);
-      } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
-        partitionSet = processedPartitions.get(state);
-      } else {
-        LOG.info("add: partition " + meta.getPartitionId() + " is already " +
-            "being processed!");
-      }
-      if (partitionSet != null) {
-        synchronized (partitionSet) {
-          partitionSet.add(meta);
-        }
-      }
+    public void reset() {
+      numPrefetch.set(0);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
index 7d97e51..325850c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
@@ -181,18 +181,22 @@ public abstract class OutOfCoreDataManager<T> {
 
   /**
    * Loads and assembles all data for a given partition, and put it into the
-   * data store.
+   * data store. Returns the number of bytes transferred from disk to memory in
+   * the loading process.
    *
    * @param partitionId id of the partition to load ana assemble all data for
    * @param basePath path to load the data from
+   * @return number of bytes loaded from disk to memory
    * @throws IOException
    */
-  public void loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId, String basePath)
       throws IOException {
+    long numBytes = 0;
     ReadWriteLock rwLock = getPartitionLock(partitionId);
     rwLock.writeLock().lock();
     if (hasPartitionDataOnDisk.contains(partitionId)) {
-      loadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+      numBytes += loadInMemoryPartitionData(partitionId,
+          getPath(basePath, partitionId));
       hasPartitionDataOnDisk.remove(partitionId);
       // Loading raw data buffers from disk if there is any and applying those
       // to already loaded in-memory data.
@@ -213,6 +217,7 @@ public abstract class OutOfCoreDataManager<T> {
           addEntryToImMemoryPartitionData(partitionId, entry);
         }
         dis.close();
+        numBytes +=  file.length();
         checkState(file.delete(), "loadPartitionData: failed to delete %s.",
             file.getAbsoluteFile());
       }
@@ -225,38 +230,44 @@ public abstract class OutOfCoreDataManager<T> {
       }
     }
     rwLock.writeLock().unlock();
+    return numBytes;
   }
 
   /**
-   * Offloads partition data of a given partition in the data store to disk
+   * Offloads partition data of a given partition in the data store to disk, and
+   * returns the number of bytes offloaded from memory to disk.
    *
    * @param partitionId id of the partition to offload its data
    * @param basePath path to offload the data to
+   * @return number of bytes offloaded from memory to disk
    * @throws IOException
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
-  public void offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId, String basePath)
       throws IOException {
     ReadWriteLock rwLock = getPartitionLock(partitionId);
     rwLock.writeLock().lock();
     hasPartitionDataOnDisk.add(partitionId);
     rwLock.writeLock().unlock();
-    offloadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+    return offloadInMemoryPartitionData(partitionId,
+        getPath(basePath, partitionId));
   }
 
   /**
-   * Offloads raw data buffers of a given partition to disk
+   * Offloads raw data buffers of a given partition to disk, and returns the
+   * number of bytes offloaded from memory to disk.
    *
    * @param partitionId id of the partition to offload its raw data buffers
    * @param basePath path to offload the data to
+   * @return number of bytes offloaded from memory to disk
    * @throws IOException
    */
-  public void offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId, String basePath)
       throws IOException {
     Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
     if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
-      return;
+      return 0;
     }
     ReadWriteLock rwLock = getPartitionLock(partitionId);
     rwLock.writeLock().lock();
@@ -272,6 +283,7 @@ public abstract class OutOfCoreDataManager<T> {
       writeEntry(entry, dos);
     }
     dos.close();
+    long numBytes = dos.size();
     int numBuffers = pair.getRight().size();
     Integer oldNumBuffersOnDisk =
         numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
@@ -279,6 +291,7 @@ public abstract class OutOfCoreDataManager<T> {
       numDataBuffersOnDisk.replace(partitionId,
           oldNumBuffersOnDisk + numBuffers);
     }
+    return numBytes;
   }
 
   /**
@@ -345,24 +358,27 @@ public abstract class OutOfCoreDataManager<T> {
   protected abstract T readNextEntry(DataInput in) throws IOException;
 
   /**
-   * Loads data of a partition into data store.
+   * Loads data of a partition into data store. Returns number of bytes loaded.
    *
    * @param partitionId id of the partition to load its data
    * @param path path from which data should be loaded
+   * @return number of bytes loaded from disk to memory
    * @throws IOException
    */
-  protected abstract void loadInMemoryPartitionData(int partitionId,
+  protected abstract long loadInMemoryPartitionData(int partitionId,
                                                     String path)
       throws IOException;
 
   /**
-   * Offloads data of a partition in data store to disk.
+   * Offloads data of a partition in data store to disk. Returns the number of
+   * bytes offloaded to disk
    *
    * @param partitionId id of the partition to offload to disk
    * @param path path to which data should be offloaded
+   * @return number of bytes offloaded from memory to disk
    * @throws IOException
    */
-  protected abstract void offloadInMemoryPartitionData(int partitionId,
+  protected abstract long offloadInMemoryPartitionData(int partitionId,
                                                        String path)
       throws IOException;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
index eb6d2c9..e84ad29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
@@ -27,10 +27,32 @@ import java.io.IOException;
  * out-of-core mechanism.
  */
 public abstract class IOCommand {
+  /** Type of IO command */
+  public enum IOCommandType {
+    /** Loading a partition */
+    LOAD_PARTITION,
+    /** Storing a partition */
+    STORE_PARTITION,
+    /** Storing incoming messages of a partition */
+    STORE_MESSAGE,
+    /**
+     * Storing message/buffer raw data buffer of a currently out-of-core
+     * partition
+     */
+    STORE_BUFFER,
+    /** Doing nothing regarding IO */
+    WAIT
+  }
+
   /** Id of the partition involved for the IO */
   protected final int partitionId;
   /** Out-of-core engine */
   protected final OutOfCoreEngine oocEngine;
+  /**
+   * Number of bytes transferred to/from memory (loaded/stored) during the
+   * execution of the command
+   */
+  protected long numBytesTransferred;
 
   /**
    * Constructor
@@ -41,10 +63,11 @@ public abstract class IOCommand {
   public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
     this.oocEngine = oocEngine;
     this.partitionId = partitionId;
+    this.numBytesTransferred = 0;
   }
 
   /**
-   * GEt the id of the partition involved in the IO
+   * Get the id of the partition involved in the IO
    *
    * @return id of the partition
    */
@@ -54,12 +77,30 @@ public abstract class IOCommand {
 
   /**
    * Execute (load/store of data) the IO command, and change the data stores
-   * appropriately based on the data loaded/stored.
+   * appropriately based on the data loaded/stored. Return true iff the command
+   * is actually executed (resulted in loading or storing data).
    *
    * @param basePath the base path (prefix) to the files/folders IO command
    *                 should read/write data from/to
+   * @return whether the command is actually executed
    * @throws IOException
    */
-  public abstract void execute(String basePath) throws IOException;
+  public abstract boolean execute(String basePath) throws IOException;
+
+  /**
+   * Get the type of the command.
+   *
+   * @return type of the command
+   */
+  public abstract IOCommandType getType();
+
+  /**
+   * Get the number of bytes transferred (loaded/stored from/to disk).
+   *
+   * @return number of bytes transferred during the execution of the command
+   */
+  public long bytesTransferred() {
+    return numBytesTransferred;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
index c28a0da..ce24fe2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
@@ -54,19 +54,22 @@ public class LoadPartitionIOCommand extends IOCommand {
   }
 
   @Override
-  public void execute(String basePath) throws IOException {
+  public boolean execute(String basePath) throws IOException {
+    boolean executed = false;
     if (oocEngine.getMetaPartitionManager()
         .startLoadingPartition(partitionId, superstep)) {
-      long currentSuperstep = oocEngine.getServiceWorker().getSuperstep();
+      long currentSuperstep = oocEngine.getSuperstep();
       DiskBackedPartitionStore partitionStore =
           (DiskBackedPartitionStore)
               oocEngine.getServerData().getPartitionStore();
-      partitionStore.loadPartitionData(partitionId, basePath);
+      numBytesTransferred +=
+          partitionStore.loadPartitionData(partitionId, basePath);
       if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
           superstep == currentSuperstep) {
         DiskBackedEdgeStore edgeStore =
             (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
-        edgeStore.loadPartitionData(partitionId, basePath);
+        numBytesTransferred +=
+            edgeStore.loadPartitionData(partitionId, basePath);
       }
       MessageStore messageStore;
       if (currentSuperstep == superstep) {
@@ -76,12 +79,19 @@ public class LoadPartitionIOCommand extends IOCommand {
         messageStore = oocEngine.getServerData().getIncomingMessageStore();
       }
       if (messageStore != null) {
-        ((DiskBackedMessageStore) messageStore)
+        numBytesTransferred += ((DiskBackedMessageStore) messageStore)
             .loadPartitionData(partitionId, basePath);
       }
       oocEngine.getMetaPartitionManager()
           .doneLoadingPartition(partitionId, superstep);
+      executed = true;
     }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.LOAD_PARTITION;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
index 41a0682..f1769dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
@@ -54,7 +54,8 @@ public class StoreDataBufferIOCommand extends IOCommand {
   }
 
   @Override
-  public void execute(String basePath) throws IOException {
+  public boolean execute(String basePath) throws IOException {
+    boolean executed = false;
     if (oocEngine.getMetaPartitionManager()
         .startOffloadingBuffer(partitionId)) {
       switch (type) {
@@ -62,23 +63,32 @@ public class StoreDataBufferIOCommand extends IOCommand {
         DiskBackedPartitionStore partitionStore =
             (DiskBackedPartitionStore)
                 oocEngine.getServerData().getPartitionStore();
-        partitionStore.offloadBuffers(partitionId, basePath);
+        numBytesTransferred +=
+            partitionStore.offloadBuffers(partitionId, basePath);
         DiskBackedEdgeStore edgeStore =
             (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
-        edgeStore.offloadBuffers(partitionId, basePath);
+        numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath);
         break;
       case MESSAGE:
         DiskBackedMessageStore messageStore =
             (DiskBackedMessageStore)
                 oocEngine.getServerData().getIncomingMessageStore();
-        messageStore.offloadBuffers(partitionId, basePath);
+        numBytesTransferred +=
+            messageStore.offloadBuffers(partitionId, basePath);
         break;
       default:
         throw new IllegalStateException("execute: requested data buffer type " +
             "does not exist!");
       }
       oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
+      executed = true;
     }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_BUFFER;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
index 9c1c0a2..c9d8829 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
@@ -41,16 +41,25 @@ public class StoreIncomingMessageIOCommand extends IOCommand {
   }
 
   @Override
-  public void execute(String basePath) throws IOException {
+  public boolean execute(String basePath) throws IOException {
+    boolean executed = false;
     if (oocEngine.getMetaPartitionManager()
         .startOffloadingMessages(partitionId)) {
       DiskBackedMessageStore messageStore =
           (DiskBackedMessageStore)
               oocEngine.getServerData().getIncomingMessageStore();
       checkState(messageStore != null);
-      messageStore.offloadPartitionData(partitionId, basePath);
+      numBytesTransferred +=
+          messageStore.offloadPartitionData(partitionId, basePath);
       oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+      executed = true;
     }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_MESSAGE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
index 77955dc..797ac9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
@@ -44,29 +44,38 @@ public class StorePartitionIOCommand extends IOCommand {
   }
 
   @Override
-  public void execute(String basePath) throws IOException {
+  public boolean execute(String basePath) throws IOException {
+    boolean executed = false;
     if (oocEngine.getMetaPartitionManager()
         .startOffloadingPartition(partitionId)) {
       DiskBackedPartitionStore partitionStore =
           (DiskBackedPartitionStore)
               oocEngine.getServerData().getPartitionStore();
-      partitionStore.offloadPartitionData(partitionId, basePath);
-      if (oocEngine.getServiceWorker().getSuperstep() !=
-          BspService.INPUT_SUPERSTEP) {
+      numBytesTransferred +=
+          partitionStore.offloadPartitionData(partitionId, basePath);
+      if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
         MessageStore messageStore =
             oocEngine.getServerData().getCurrentMessageStore();
         if (messageStore != null) {
-          ((DiskBackedMessageStore) messageStore)
+          numBytesTransferred += ((DiskBackedMessageStore) messageStore)
               .offloadPartitionData(partitionId, basePath);
         }
       } else {
         DiskBackedEdgeStore edgeStore =
             (DiskBackedEdgeStore)
                 oocEngine.getServerData().getEdgeStore();
-        edgeStore.offloadPartitionData(partitionId, basePath);
+        numBytesTransferred +=
+            edgeStore.offloadPartitionData(partitionId, basePath);
       }
       oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+      executed = true;
     }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_PARTITION;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
index b6e0546..74e72eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
@@ -42,13 +42,19 @@ public class WaitIOCommand extends IOCommand {
   }
 
   @Override
-  public void execute(String basePath) throws IOException {
+  public boolean execute(String basePath) throws IOException {
     try {
       TimeUnit.MILLISECONDS.sleep(waitDuration);
     } catch (InterruptedException e) {
       throw new IllegalStateException("execute: caught InterruptedException " +
           "while IO thread is waiting!");
     }
+    return true;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.WAIT;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
index 09c1bdf..43a92a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
@@ -38,6 +38,12 @@ public final class AdjustableSemaphore extends Semaphore {
     maxPermits = permits;
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UG_SYNC_SET_UNSYNC_GET")
+  public int getMaxPermits() {
+    return maxPermits;
+  }
+
   /**
    * Adjusts the maximum number of available permits.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index d29e46d..bf48ea8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -75,6 +75,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionExchange;
 import org.apache.giraph.partition.PartitionOwner;
@@ -213,6 +214,10 @@ public class BspServiceWorker<I extends WritableComparable,
     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
         graphTaskManager.createUncaughtExceptionHandler());
     workerServer.setFlowControl(workerClient.getFlowControl());
+    OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
+    if (oocEngine != null) {
+      oocEngine.setFlowControl(workerClient.getFlowControl());
+    }
 
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index b7f1eb6..87583ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -27,6 +27,7 @@ import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.InputType;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.hadoop.io.Writable;
@@ -154,7 +155,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     long inputSplitEdgesLoaded = 0;
     long inputSplitEdgesFiltered = 0;
 
+    int count = 0;
+    OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
     while (edgeReader.nextEdge()) {
+      // If out-of-core mechanism is used, check whether this thread
+      // can stay active or it should temporarily suspend and stop
+      // processing and generating more data for the moment.
+      if (oocEngine != null &&
+          (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+        oocEngine.activeThreadCheckIn();
+      }
       I sourceId = edgeReader.getCurrentSourceId();
       Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
       if (sourceId == null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 92b23bd..40a3bb0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.concurrent.Callable;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -33,6 +34,7 @@ import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MeterDesc;
 import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -78,8 +80,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   private final long startNanos = TIME.getNanoseconds();
   /** Whether to prioritize local input splits. */
   private final boolean useLocality;
+  /** Service worker */
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
 
-  // CHECKSTYLE: stop ParameterNumberCheck
   /**
    * Constructor.
    *
@@ -101,8 +104,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     this.useLocality = configuration.useInputSplitLocality();
     this.splitsHandler = splitsHandler;
     this.configuration = configuration;
+    this.serviceWorker = bspServiceWorker;
   }
-  // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
    * Get input format
@@ -208,13 +211,26 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
     byte[] serializedInputSplit;
     int inputSplitsProcessed = 0;
     try {
+      OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
+      if (oocEngine != null) {
+        oocEngine.processingThreadStart();
+      }
       while ((serializedInputSplit =
           splitsHandler.reserveInputSplit(getInputType())) != null) {
+        // If out-of-core mechanism is used, check whether this thread
+        // can stay active or it should temporarily suspend and stop
+        // processing and generating more data for the moment.
+        if (oocEngine != null) {
+          oocEngine.activeThreadCheckIn();
+        }
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
             loadInputSplit(serializedInputSplit));
         context.progress();
         ++inputSplitsProcessed;
       }
+      if (oocEngine != null) {
+        oocEngine.processingThreadFinish();
+      }
     } catch (InterruptedException e) {
       throw new IllegalStateException("call: InterruptedException", e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 540a6b4..a3f1300 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -31,6 +31,7 @@ import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.io.InputType;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -168,7 +169,16 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     long edgesSinceLastUpdate = 0;
     long inputSplitEdgesLoaded = 0;
 
+    int count = 0;
+    OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
     while (vertexReader.nextVertex()) {
+      // If out-of-core mechanism is used, check whether this thread
+      // can stay active or it should temporarily suspend and stop
+      // processing and generating more data for the moment.
+      if (oocEngine != null &&
+          (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+        oocEngine.activeThreadCheckIn();
+      }
       Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
       if (readerVertex.getId() == null) {
         throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 7893940..3bb35eb 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -279,13 +280,14 @@ public class TestPartitionStores {
     GiraphConstants.STATIC_GRAPH.set(conf, true);
     testMultiThreaded();
   }
-/*
+
   @Test
   public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
     GiraphConstants.STATIC_GRAPH.set(conf, true);
+    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
     testMultiThreaded();
   }
-*/
+
   private void testMultiThreaded() throws Exception {
     final AtomicInteger vertexCounter = new AtomicInteger(0);
     ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index 6fdfc75..397605d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph;
 
+import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
@@ -62,12 +63,13 @@ public class TestOutOfCore extends BspCase {
         SimplePageRankComputation.SimplePageRankWorkerContext.class);
     conf.setMasterComputeClass(
         SimplePageRankComputation.SimplePageRankMasterCompute.class);
+    GiraphConstants.METRICS_ENABLE.set(conf, true);
     GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
     GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8);
     GiraphConstants.NUM_INPUT_THREADS.set(conf, 8);
-    GiraphConstants.NUM_OOC_THREADS.set(conf, 4);
     GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8);
     GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2");
     GiraphJob job = prepareJob(getCallingMethodName(), conf,