You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/03/15 18:40:36 UTC

[4/5] git commit: updated refs/heads/trunk to fafecee

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
deleted file mode 100644
index cdafa3f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
+++ /dev/null
@@ -1,2149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.utils.VertexIdEdges;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VertexIterator;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.worker.BspServiceWorker;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Disk-backed PartitionStore. An instance of this class can be coupled with an
- * out-of-core engine. Out-of-core engine is responsible to determine when to
- * offload and what to offload to disk. The instance of this class handles the
- * interactions with disk.
- *
- * This class provides efficient scheduling mechanism while iterating over
- * partitions. It prefers spilling in-memory processed partitions, but the
- * scheduling can be improved upon further.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- */
-@SuppressWarnings("rawtypes")
-public class DiskBackedPartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends PartitionStore<I, V, E> {
-  /**
-   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
-   * decide whether vertex/edge buffers are large enough to flush to disk.
-   */
-  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
-      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
-          "Minimum size of a buffer (in bytes) to flush to disk. ");
-
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(DiskBackedPartitionStore.class);
-
-  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
-  private final int minBuffSize;
-  /**
-   * States the partition can be found in:
-   * INIT: the partition has just been created
-   * ACTIVE: there is at least one thread who holds a reference to the partition
-   *         and uses it
-   * INACTIVE: the partition is not being used by anyone, but it is in memory
-   * IN_TRANSIT: the partition is being transferred to disk, the transfer is
-   *             not yet complete
-   * ON_DISK: the partition resides on disk
-   */
-  private enum State { INIT, ACTIVE, INACTIVE, IN_TRANSIT, ON_DISK };
-
-  /** Hash map containing all the partitions  */
-  private final ConcurrentMap<Integer, MetaPartition> partitions =
-    Maps.newConcurrentMap();
-
-  /**
-   * Contains partitions that has been processed in the current iteration cycle,
-   * and are not in use by any thread. The 'State' of these partitions can only
-   * be INACTIVE, IN_TRANSIT, and ON_DISK.
-   */
-  private final Map<State, Set<Integer>> processedPartitions;
-  /**
-   * Contains partitions that has *not* been processed in the current iteration
-   * cycle. Similar to processedPartitions, 'State' if these partitions can only
-   * be INACTIVE, IN_TRANSIT, and ON_DISK.
-   */
-  private final Map<State, Set<Integer>> unProcessedPartitions;
-
-  /**
-   * Read/Write lock to avoid interleaving of the process of starting a new
-   * iteration cycle and the process of spilling data to disk. This is necessary
-   * as starting a new iteration changes the data structure holding data that is
-   * being spilled to disk. Spilling of different data can happen at the same
-   * time (a read lock used for spilling), and cannot be overlapped with
-   * change of data structure holding the data.
-   */
-  private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
-  /** Base path where the partition files are written to */
-  private final String[] basePaths;
-  /** Used to hash partition Ids */
-  private final HashFunction hasher = Hashing.murmur3_32();
-  /** Maximum number of partition slots in memory */
-  private final AtomicInteger maxPartitionsInMem = new AtomicInteger(-1);
-  /** Number of slots used */
-  private final AtomicInteger numPartitionsInMem = new AtomicInteger(0);
-
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
-  /** If moving of edges to vertices in INPUT_SUPERSTEP has been started */
-  private volatile boolean movingEdges;
-  /** Whether the partition store is initialized */
-  private volatile AtomicBoolean isInitialized;
-  /** Whether the number of partitions are fixed as requested by user */
-  private final boolean isNumPartitionsFixed;
-
-  /**
-   * Map of partition ids to list of input vertex buffers. The map will have an
-   * entry only for partitions that are currently out-of-core. We keep the
-   * aggregate size of buffers in as part of the values of the map to estimate
-   * how much memory would be free if we offload this buffer to disk.
-   */
-  private final ConcurrentMap<Integer, Pair<Integer, List<ExtendedDataOutput>>>
-      pendingInputVertices = Maps.newConcurrentMap();
-  /**
-   * When a partition is out-of-core, and we also offloaded some of its vertex
-   * buffers, we have to keep track of how many buffers we offloaded to disk.
-   * This contains this value for out-of-core partitions.
-   */
-  private final ConcurrentMap<Integer, Integer> numPendingInputVerticesOnDisk =
-      Maps.newConcurrentMap();
-  /** Lock to avoid overlap of addition and removal on pendingInputVertices */
-  private final ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock();
-
-  /**
-   * Similar to vertex buffer, but used for input edges (see comments for
-   * pendingInputVertices).
-   */
-  private final ConcurrentMap<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>>
-      pendingInputEdges = Maps.newConcurrentMap();
-  /** Similar to numPendingInputVerticesOnDisk but used for edge buffers */
-  private final ConcurrentMap<Integer, Integer> numPendingInputEdgesOnDisk =
-      Maps.newConcurrentMap();
-  /** Lock to avoid overlap of addition and removal on pendingInputEdges */
-  private final ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock();
-
-  /**
-   * For each out-of-core partitions, whether its edge store is also
-   * offloaded to disk in INPUT_SUPERSTEP.
-   */
-  private final ConcurrentMap<Integer, Boolean> hasEdgeStoreOnDisk =
-      Maps.newConcurrentMap();
-
-  /**
-   * Type of VertexIdMessage class (container for serialized messages) received
-   * for a particular message. If we write the received messages to disk before
-   * adding them to message store, we need this type when we want to read the
-   * messages back from disk (so that we know how to read the messages from
-   * disk).
-   */
-  private enum SerializedMessageClass {
-    /** ByteArrayVertexIdMessages */
-    BYTE_ARRAY_VERTEX_ID_MESSAGES,
-    /** ByteArrayOneMEssageToManyIds */
-    BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
-  }
-
-  /**
-   * Similar to vertex buffer and edge buffer, but used for messages (see
-   * comments for pendingInputVertices).
-   */
-  private volatile ConcurrentMap<Integer,
-      Pair<Integer, List<VertexIdMessages<I, Writable>>>>
-      pendingIncomingMessages = Maps.newConcurrentMap();
-  /** Whether a partition has any incoming message buffer on disk */
-  private volatile ConcurrentMap<Integer, Boolean> incomingMessagesOnDisk =
-      Maps.newConcurrentMap();
-
-  /**
-   * Similar to pendingIncomingMessages, but is used for messages for current
-   * superstep instead.
-   */
-  private volatile ConcurrentMap<Integer,
-      Pair<Integer, List<VertexIdMessages<I, Writable>>>>
-      pendingCurrentMessages = Maps.newConcurrentMap();
-  /** Similar to incomingMessagesOnDisk for messages for current superstep */
-  private volatile ConcurrentMap<Integer, Boolean> currentMessagesOnDisk =
-      Maps.newConcurrentMap();
-
-  /**
-   * Lock to avoid overlap of addition and removal of pending message buffers
-   */
-  private final ReadWriteLock messageBufferRWLock =
-      new ReentrantReadWriteLock();
-
-  /**
-   * Constructor
-   *
-   * @param conf Configuration
-   * @param context Context
-   * @param serviceWorker service worker reference
-   */
-  public DiskBackedPartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    super(conf, context, serviceWorker);
-    this.minBuffSize = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
-    int userMaxNumPartitions = MAX_PARTITIONS_IN_MEMORY.get(conf);
-    if (userMaxNumPartitions > 0) {
-      this.isNumPartitionsFixed = true;
-      this.maxPartitionsInMem.set(userMaxNumPartitions);
-      oocEngine = null;
-    } else {
-      this.isNumPartitionsFixed = false;
-      this.oocEngine =
-          new AdaptiveOutOfCoreEngine<I, V, E>(conf, serviceWorker);
-    }
-    this.movingEdges = false;
-    this.isInitialized = new AtomicBoolean(false);
-
-    this.processedPartitions = Maps.newHashMap();
-    this.processedPartitions
-        .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
-    this.processedPartitions
-        .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
-    this.processedPartitions
-        .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
-
-    this.unProcessedPartitions = Maps.newHashMap();
-    this.unProcessedPartitions
-        .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
-    this.unProcessedPartitions
-        .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
-    this.unProcessedPartitions
-        .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
-
-    // Take advantage of multiple disks
-    String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
-    basePaths = new String[userPaths.length];
-    int i = 0;
-    for (String path : userPaths) {
-      basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("DiskBackedPartitionStore with isStaticGraph=" +
-          conf.isStaticGraph() + ((userMaxNumPartitions > 0) ?
-          (" with maximum " + userMaxNumPartitions + " partitions in memory.") :
-          "."));
-    }
-  }
-
-  /**
-   * @return maximum number of partitions allowed in memory
-   */
-  public int getNumPartitionSlots() {
-    return maxPartitionsInMem.get();
-  }
-
-  /**
-   * @return number of partitions in memory
-   */
-  public int getNumPartitionInMemory() {
-    return numPartitionsInMem.get();
-  }
-
-  /**
-   * Sets the maximum number of partitions allowed in memory
-   *
-   * @param numPartitions Number of partitions to allow in memory
-   */
-  public void setNumPartitionSlots(int numPartitions) {
-    maxPartitionsInMem.set(numPartitions);
-  }
-
-  @Override
-  public void initialize() {
-    // "initialize" is called right before partition assignment in setup
-    // process. However, it might be the case that this worker is a bit slow
-    // and other workers start sending vertices/edges (in input superstep)
-    // to this worker before the initialize is called. So, we put a guard in
-    // necessary places to make sure the 'initialize' is called at a proper time
-    // and also only once.
-    if (isInitialized.compareAndSet(false, true)) {
-      // Set the maximum number of partition slots in memory if unset
-      if (maxPartitionsInMem.get() == -1) {
-        maxPartitionsInMem.set(serviceWorker.getNumPartitionsOwned());
-        // Check if master has not done partition assignment yet (may happen in
-        // test codes)
-        if (maxPartitionsInMem.get() == 0) {
-          LOG.warn("initialize: partitions assigned to this worker is not " +
-              "known yet");
-          maxPartitionsInMem.set(partitions.size());
-          if (maxPartitionsInMem.get() == 0) {
-            maxPartitionsInMem.set(Integer.MAX_VALUE);
-          }
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("initialize: set the max number of partitions in memory " +
-              "to " + maxPartitionsInMem.get());
-        }
-        oocEngine.initialize();
-      }
-    }
-  }
-
-  @Override
-  public Iterable<Integer> getPartitionIds() {
-    return Iterables.unmodifiableIterable(partitions.keySet());
-  }
-
-  @Override
-  public boolean hasPartition(final Integer id) {
-    return partitions.containsKey(id);
-  }
-
-  @Override
-  public int getNumPartitions() {
-    return partitions.size();
-  }
-
-  @Override
-  public long getPartitionVertexCount(Integer partitionId) {
-    MetaPartition meta = partitions.get(partitionId);
-    if (meta == null) {
-      return 0;
-    } else if (meta.getState() == State.ON_DISK) {
-      return meta.getVertexCount();
-    } else {
-      return meta.getPartition().getVertexCount();
-    }
-  }
-
-  @Override
-  public long getPartitionEdgeCount(Integer partitionId) {
-    MetaPartition meta = partitions.get(partitionId);
-    if (meta == null) {
-      return 0;
-    } else if (meta.getState() == State.ON_DISK) {
-      return meta.getEdgeCount();
-    } else {
-      return meta.getPartition().getEdgeCount();
-    }
-  }
-
-  /**
-   * Spill one partition to disk.
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
-  private void swapOnePartitionToDisk() {
-    Integer partitionId;
-    // The only partitions in memory are IN_TRANSIT, ACTIVE, and INACTIVE ones.
-    // If a partition is currently in transit, it means an OOC thread is
-    // pushing the partition to disk, or a compute thread is swapping the
-    // partition to open up space for another partition. So, such partitions
-    // eventually will free up space in memory. However, this method is usually
-    // called at critical points where freeing up space in memory is crucial.
-    // So, we should look into a partition to swap amongst other in-memory
-    // partitions. An in-memory partition that is not in-transit can be in
-    // three states:
-    //   1) already processed, which we can simply swap it to disk,
-    //   2) non-processed but active (means someone is in the middle of
-    //      processing the partition). In this case we cannot touch the
-    //      partition until its processing is done.
-    //   3) un-processed and inactive. It is bad to swap this partition to disk
-    //      as someone will load it again for processing in future. But, this
-    //      method is called to strictly swap a partition to disk. So, if there
-    //      is no partition in state 1, we should swap a partition in state 3 to
-    //      disk.
-    rwLock.readLock().lock();
-    synchronized (processedPartitions) {
-      partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
-    }
-    if (partitionId == null) {
-      synchronized (unProcessedPartitions) {
-        partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE));
-      }
-      if (partitionId == null) {
-        // At this point some partitions are being processed and we should
-        // wait until their processing is done
-        synchronized (processedPartitions) {
-          partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
-          while (partitionId == null) {
-            try {
-              // Here is the only place we wait on 'processedPartition', and
-              // this wait is only for INACTIVE entry of the map. So, only at
-              // times where a partition is added to INACTIVE entry of this map,
-              // we should call '.notifyAll()'. Although this might seem a bad
-              // practice, decoupling the INACTIVE entry from this map makes the
-              // synchronization mechanism cumbersome and error-prone.
-              processedPartitions.wait();
-            } catch (InterruptedException e) {
-              throw new IllegalStateException("swapOnePartitionToDisk: Caught" +
-                  "InterruptedException while waiting on a partition to" +
-                  "become inactive in memory and swapping it to disk");
-            }
-            partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
-          }
-        }
-      }
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("swapOnePartitionToDisk: decided to swap partition " +
-          partitionId + " to disk");
-    }
-    MetaPartition swapOutPartition = partitions.get(partitionId);
-    checkNotNull(swapOutPartition,
-        "swapOnePartitionToDisk: the partition is not found to spill to disk " +
-            "(impossible)");
-
-    // Since the partition is popped from the maps, it is not going to be
-    // processed (or change its process state) until spilling of the partition
-    // is done (the only way to access a partition is through
-    // processedPartitions or unProcessedPartitions Map, so once a partition is
-    // popped from a map, there is no need for synchronization on that
-    // partition).
-    Map<State, Set<Integer>> ownerMap = (swapOutPartition.isProcessed()) ?
-        processedPartitions :
-        unProcessedPartitions;
-
-    // Here is the *only* place that holds a lock on an in-transit partition.
-    // Anywhere else in the code should call wait() on the in-transit partition
-    // to release the lock. This is an important optimization as we are no
-    // longer have to keep the lock while partition is being transferred to
-    // disk.
-    synchronized (swapOutPartition) {
-      swapOutPartition.setState(State.IN_TRANSIT);
-      synchronized (ownerMap) {
-        ownerMap.get(State.IN_TRANSIT).add(partitionId);
-      }
-    }
-
-    try {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("swapOnePartitionToDisk: start swapping partition " +
-            partitionId + " to disk.");
-      }
-      offloadPartition(swapOutPartition);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("swapOnePartitionToDisk: done swapping partition " +
-            partitionId + " to disk.");
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "swapOnePartitionToDisk: Failed while offloading partition " +
-              partitionId);
-    }
-
-    synchronized (swapOutPartition) {
-      synchronized (ownerMap) {
-        boolean stillInMap = ownerMap.get(State.IN_TRANSIT).remove(partitionId);
-        swapOutPartition.setOnDisk();
-        numPartitionsInMem.getAndDecrement();
-        // If a compute thread gets an IN_TRANSIT partition (as the last resort
-        // to get the next partition to process), 'swapOutPartition' may no
-        // longer be in its map. But, if it is in its own map, we should update
-        // the map.
-        if (stillInMap) {
-          ownerMap.get(State.ON_DISK).add(partitionId);
-        }
-      }
-      // notifying all threads that are waiting for this partition to spill to
-      // disk.
-      swapOutPartition.notifyAll();
-    }
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Decrement maximum number of partitions allowed in memory by one and pushes
-   * one partition to disk if necessary.
-   */
-  public void spillOnePartition() {
-    if (maxPartitionsInMem.getAndDecrement() <= numPartitionsInMem.get()) {
-      swapOnePartitionToDisk();
-    }
-  }
-
-  @Override
-  public Partition<I, V, E> getNextPartition() {
-    Integer partitionId;
-    // We prioritize accesses to currently in-memory partitions first. If there
-    // is no such partition, we choose amongst on-disk partitions. This is a
-    // preferable choice over in-transit partitions since we can start bringing
-    // on-disk partitions to memory right away, while if we choose in-transit
-    // partitions, we first have to wait for the transit to be complete, and
-    // then bring the partition back in memory again.
-    synchronized (unProcessedPartitions) {
-      partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE));
-      if (partitionId == null) {
-        partitionId = popFromSet(unProcessedPartitions.get(State.ON_DISK));
-        if (partitionId == null) {
-          partitionId = popFromSet(unProcessedPartitions.get(State.IN_TRANSIT));
-        }
-      }
-    }
-
-    // Check if we are at the end of the current iteration cycle
-    if (partitionId == null) {
-      return null;
-    }
-
-    MetaPartition meta = partitions.get(partitionId);
-    checkNotNull(meta, "getNextPartition: partition " + partitionId +
-        " does not exist (impossible)");
-
-    // The only time we iterate through all partitions in INPUT_SUPERSTEP is
-    // when we want to move
-    // edges from edge store to vertices. So, we check if we have anything in
-    // edge store for the chosen partition, and if there is no edge store for
-    // this partition, we skip processing it. This avoids unnecessary loading
-    // of on-disk partitions that does not have edge store.
-    if (movingEdges) {
-      boolean shouldProcess = false;
-      synchronized (meta) {
-        if (meta.getState() == State.INACTIVE) {
-          shouldProcess = edgeStore.hasPartitionEdges(partitionId);
-        } else { // either ON_DISK or IN_TRANSIT
-          Integer numBuf = numPendingInputEdgesOnDisk.get(partitionId);
-          Boolean hasStore = hasEdgeStoreOnDisk.get(partitionId);
-          shouldProcess =
-              (numBuf != null && numBuf != 0) || (hasStore != null && hasStore);
-        }
-        if (!shouldProcess) {
-          meta.setProcessed(true);
-          synchronized (processedPartitions) {
-            processedPartitions.get(meta.getState()).add(partitionId);
-            if (meta.getState() == State.INACTIVE) {
-              processedPartitions.notifyAll();
-            }
-          }
-        }
-      }
-      if (!shouldProcess) {
-        return getNextPartition();
-      }
-    }
-    getPartition(meta);
-    return meta.getPartition();
-  }
-
-  /**
-   * Method that gets a partition from the store.
-   * The partition is produced as a side effect of the computation and is
-   * reflected inside the META object provided as parameter.
-   * This function is thread-safe since it locks the whole computation
-   * on the MetaPartition provided.
-   *
-   * When a thread tries to access an element on disk, it waits until space
-   * becomes available in memory by swapping partitions to disk.
-   *
-   * @param meta meta partition container with the partition itself
-   */
-  private void getPartition(MetaPartition meta) {
-    int partitionId = meta.getId();
-    synchronized (meta) {
-      boolean partitionInMemory = false;
-      while (!partitionInMemory) {
-        switch (meta.getState()) {
-        case INACTIVE:
-          // Check if the message store for the current superstep is in memory,
-          // and if not, load it from the disk.
-          Boolean messagesOnDisk = currentMessagesOnDisk.get(partitionId);
-          if (messagesOnDisk != null && messagesOnDisk) {
-            try {
-              loadMessages(partitionId);
-            } catch (IOException e) {
-              throw new IllegalStateException("getPartition: failed while " +
-                  "loading messages of current superstep for partition " +
-                  partitionId);
-            }
-          }
-          meta.setState(State.ACTIVE);
-          partitionInMemory = true;
-          break;
-        case IN_TRANSIT:
-          try {
-            // Wait until the partition transfer to disk is complete
-            meta.wait();
-          } catch (InterruptedException e) {
-            throw new IllegalStateException("getPartition: exception " +
-                "while waiting on IN_TRANSIT partition " + partitionId + " to" +
-                " fully spill to disk.");
-          }
-          break;
-        case ON_DISK:
-          boolean spaceAvailable = false;
-
-          while (numPartitionsInMem.get() >= maxPartitionsInMem.get()) {
-            swapOnePartitionToDisk();
-          }
-
-          // Reserve the space in memory for the partition
-          if (numPartitionsInMem.incrementAndGet() <=
-              maxPartitionsInMem.get()) {
-            spaceAvailable = true;
-          } else {
-            numPartitionsInMem.decrementAndGet();
-          }
-
-          if (spaceAvailable) {
-            Partition<I, V, E> partition;
-            try {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("getPartition: start reading partition " +
-                    partitionId + " from disk");
-              }
-              partition = loadPartition(meta);
-              if (LOG.isInfoEnabled()) {
-                LOG.info("getPartition: done reading partition " +
-                    partitionId + " from disk");
-              }
-            } catch (IOException e) {
-              LOG.error("getPartition: Failed while Loading Partition " +
-                  "from disk: " + e.getMessage());
-              throw new IllegalStateException(e);
-            }
-            meta.setActive(partition);
-            partitionInMemory = true;
-          }
-          break;
-        default:
-          throw new IllegalStateException("illegal state " + meta.getState() +
-              " for partition " + meta.getId());
-        }
-      }
-    }
-  }
-
-  @Override
-  public void prepareSuperstep() {
-    rwLock.writeLock().lock();
-    super.prepareSuperstep();
-    pendingCurrentMessages = pendingIncomingMessages;
-    currentMessagesOnDisk = incomingMessagesOnDisk;
-    pendingIncomingMessages = Maps.newConcurrentMap();
-    incomingMessagesOnDisk = Maps.newConcurrentMap();
-    rwLock.writeLock().unlock();
-  }
-
-  /**
-   * Spill message buffers (either buffers for messages for current superstep,
-   * or buffers for incoming messages) of a given partition to disk. Note that
-   * the partition should be ON_DISK or IN_TRANSIT.
-   *
-   * @param partitionId Id of the partition to spill its message buffers
-   * @throws IOException
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
-  public void spillPartitionMessages(Integer partitionId) throws IOException {
-    rwLock.readLock().lock();
-    spillMessages(partitionId, pendingCurrentMessages,
-        serviceWorker.getSuperstep());
-    spillMessages(partitionId, pendingIncomingMessages,
-        serviceWorker.getSuperstep() + 1);
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Spill message buffers of a particular type of message (current or incoming
-   * buffer) for a partition to disk.
-   *
-   * @param partitionId Id of the partition to spill the messages for
-   * @param pendingMessages The map to get the message buffers from
-   * @param superstep Superstep of which we want to offload messages. This is
-   *                  equal to current superstep number if we want to offload
-   *                  buffers for currentMessageStore, and is equal to next
-   *                  superstep number if we want to offload buffer for
-   *                  incomingMessageStore
-   * @throws IOException
-   */
-  private void spillMessages(Integer partitionId,
-      ConcurrentMap<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
-          pendingMessages, long superstep) throws IOException {
-    Pair<Integer, List<VertexIdMessages<I, Writable>>> entry;
-    messageBufferRWLock.writeLock().lock();
-    entry = pendingMessages.remove(partitionId);
-    if (entry != null && entry.getLeft() < minBuffSize) {
-      pendingMessages.put(partitionId, entry);
-      entry = null;
-    }
-    messageBufferRWLock.writeLock().unlock();
-
-    if (entry == null) {
-      return;
-    }
-
-    // Sanity check
-    checkState(!entry.getRight().isEmpty(),
-        "spillMessages: the message buffer that is supposed to be flushed to " +
-            "disk does not exist.");
-
-    File file = new File(getPendingMessagesBufferPath(partitionId, superstep));
-
-    FileOutputStream fos = new FileOutputStream(file, true);
-    BufferedOutputStream bos = new BufferedOutputStream(fos);
-    DataOutputStream dos = new DataOutputStream(bos);
-    for (VertexIdMessages<I, Writable> messages : entry.getRight()) {
-      SerializedMessageClass messageClass;
-      if (messages instanceof ByteArrayVertexIdMessages) {
-        messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
-      } else if (messages instanceof ByteArrayOneMessageToManyIds) {
-        messageClass =
-            SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
-      } else {
-        throw new IllegalStateException("spillMessages: serialized message " +
-            "type is not supported");
-      }
-      dos.writeInt(messageClass.ordinal());
-      messages.write(dos);
-    }
-    dos.close();
-  }
-
-  /**
-   * Looks through all partitions already on disk, and see if any of them has
-   * enough pending message in its buffer in memory. This can be message buffer
-   * of current superstep, or incoming superstep. If so, put that partition
-   * along with an approximate amount of memory it took (in bytes) in a list to
-   * return.
-
-   * @return List of pairs (partitionId, sizeInByte) of the partitions where
-   *         their pending messages are worth flushing to disk
-   */
-  public PairList<Integer, Integer> getOocPartitionIdsWithPendingMessages() {
-    PairList<Integer, Integer> pairList = new PairList<>();
-    pairList.initialize();
-    Set<Integer> partitionIds = Sets.newHashSet();
-    // First, iterating over pending incoming messages
-    if (pendingIncomingMessages != null) {
-      for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
-           entry : pendingIncomingMessages.entrySet()) {
-        if (entry.getValue().getLeft() > minBuffSize) {
-          pairList.add(entry.getKey(), entry.getValue().getLeft());
-          partitionIds.add(entry.getKey());
-        }
-      }
-    }
-    // Second, iterating over pending current messages (i.e. pending incoming
-    // messages of previous superstep)
-    if (pendingCurrentMessages != null) {
-      for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
-           entry : pendingCurrentMessages.entrySet()) {
-        if (entry.getValue().getLeft() > minBuffSize &&
-            !partitionIds.contains(entry.getKey())) {
-          pairList.add(entry.getKey(), entry.getValue().getLeft());
-        }
-      }
-    }
-    return pairList;
-  }
-
-  @Override
-  public <M extends Writable> void addPartitionCurrentMessages(
-      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
-    // Current messages are only added to the store in the event of partition
-    // migration. Presumably the partition has just migrated and its data is
-    // still available in memory. Note that partition migration only happens at
-    // the beginning of a superstep.
-    ((MessageStore<I, M>) currentMessageStore)
-        .addPartitionMessages(partitionId, messages);
-  }
-
-  @Override
-  public <M extends Writable> void addPartitionIncomingMessages(
-      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
-    if (conf.getIncomingMessageClasses().useMessageCombiner()) {
-      ((MessageStore<I, M>) incomingMessageStore)
-          .addPartitionMessages(partitionId, messages);
-    } else {
-      MetaPartition meta = partitions.get(partitionId);
-      checkNotNull(meta, "addPartitionIncomingMessages: trying to add " +
-          "messages to partition " + partitionId + " which does not exist " +
-          "in the partition set of this worker!");
-
-      synchronized (meta) {
-        switch (meta.getState()) {
-        case INACTIVE:
-        case ACTIVE:
-          // A partition might be in memory, but its message store might still
-          // be on disk. This happens because while we are loading the partition
-          // to memory, we only load its current messages, not the incoming
-          // messages. If a new superstep has been started, while the partition
-          // is still in memory, the incoming message store in the previous
-          // superstep (which is the current messages in the current superstep)
-          // is on disk.
-          // This may also happen when a partition is offloaded to disk while
-          // it was unprocessed, and then again loaded in the same superstep for
-          // processing.
-          Boolean isMsgOnDisk = incomingMessagesOnDisk.get(partitionId);
-          if (isMsgOnDisk == null || !isMsgOnDisk) {
-            ((MessageStore<I, M>) incomingMessageStore)
-                .addPartitionMessages(partitionId, messages);
-            break;
-          }
-          // Continue to IN_TRANSIT and ON_DISK cases as the partition is in
-          // memory, but it's messages are not yet loaded
-          // CHECKSTYLE: stop FallThrough
-        case IN_TRANSIT:
-        case ON_DISK:
-          // CHECKSTYLE: resume FallThrough
-          List<VertexIdMessages<I, Writable>> newMessages =
-              new ArrayList<VertexIdMessages<I, Writable>>();
-          newMessages.add((VertexIdMessages<I, Writable>) messages);
-          int length = messages.getSerializedSize();
-          Pair<Integer, List<VertexIdMessages<I, Writable>>> newPair =
-              new MutablePair<>(length, newMessages);
-          messageBufferRWLock.readLock().lock();
-          Pair<Integer, List<VertexIdMessages<I, Writable>>> oldPair =
-              pendingIncomingMessages.putIfAbsent(partitionId, newPair);
-          if (oldPair != null) {
-            synchronized (oldPair) {
-              MutablePair<Integer, List<VertexIdMessages<I, Writable>>> pair =
-                  (MutablePair<Integer, List<VertexIdMessages<I, Writable>>>)
-                      oldPair;
-              pair.setLeft(pair.getLeft() + length);
-              pair.getRight().add((VertexIdMessages<I, Writable>) messages);
-            }
-          }
-          messageBufferRWLock.readLock().unlock();
-          // In the case that the number of partitions is asked to be fixed by
-          // the user, we should offload the message buffers as necessary.
-          if (isNumPartitionsFixed &&
-              pendingIncomingMessages.get(partitionId).getLeft() >
-                  minBuffSize) {
-            try {
-              spillPartitionMessages(partitionId);
-            } catch (IOException e) {
-              throw new IllegalStateException("addPartitionIncomingMessages: " +
-                  "spilling message buffers for partition " + partitionId +
-                  " failed!");
-            }
-          }
-          break;
-        default:
-          throw new IllegalStateException("addPartitionIncomingMessages: " +
-              "illegal state " + meta.getState() + " for partition " +
-              meta.getId());
-        }
-      }
-    }
-  }
-
-  /**
-   * Spills edge store generated for specified partition in INPUT_SUPERSTEP
-   * Note that the partition should be ON_DISK or IN_TRANSIT.
-   *
-   * @param partitionId Id of partition to spill its edge buffer
-   */
-  public void spillPartitionInputEdgeStore(Integer partitionId)
-      throws IOException {
-    rwLock.readLock().lock();
-    if (movingEdges) {
-      rwLock.readLock().unlock();
-      return;
-    }
-    Pair<Integer, List<VertexIdEdges<I, E>>> entry;
-
-    // Look at the comment for the similar logic in
-    // 'spillPartitionInputVertexBuffer' for why this lock is necessary.
-    edgeBufferRWLock.writeLock().lock();
-    entry = pendingInputEdges.remove(partitionId);
-    edgeBufferRWLock.writeLock().unlock();
-
-    // Check if the intermediate edge store has already been moved to vertices
-    if (entry == null) {
-      rwLock.readLock().unlock();
-      return;
-    }
-
-    // Sanity check
-    checkState(!entry.getRight().isEmpty(),
-        "spillPartitionInputEdgeStore: the edge buffer that is supposed to " +
-            "be flushed to disk does not exist.");
-
-    List<VertexIdEdges<I, E>> bufferList = entry.getRight();
-    Integer numBuffers = numPendingInputEdgesOnDisk.putIfAbsent(partitionId,
-        bufferList.size());
-    if (numBuffers != null) {
-      numPendingInputEdgesOnDisk.replace(partitionId,
-          numBuffers + bufferList.size());
-    }
-
-    File file = new File(getPendingEdgesBufferPath(partitionId));
-    FileOutputStream fos = new FileOutputStream(file, true);
-    BufferedOutputStream bos = new BufferedOutputStream(fos);
-    DataOutputStream dos = new DataOutputStream(bos);
-    for (VertexIdEdges<I, E> edges : entry.getRight()) {
-      edges.write(dos);
-    }
-    dos.close();
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Looks through all partitions already on disk, and see if any of them has
-   * enough pending edges in its buffer in memory. If so, put that
-   * partition along with an approximate amount of memory it took (in bytes) in
-   * a list to return.
-
-   * @return List of pairs (partitionId, sizeInByte) of the partitions where
-   *         their pending edge store in input superstep in worth flushing to
-   *         disk
-   */
-  public PairList<Integer, Integer> getOocPartitionIdsWithPendingInputEdges() {
-    PairList<Integer, Integer> pairList = new PairList<>();
-    pairList.initialize();
-    if (!movingEdges) {
-      for (Entry<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>> entry :
-          pendingInputEdges.entrySet()) {
-        if (entry.getValue().getLeft() > minBuffSize) {
-          pairList.add(entry.getKey(), entry.getValue().getLeft());
-        }
-      }
-    }
-    return pairList;
-  }
-
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH")
-  public void addPartitionEdges(Integer partitionId,
-      VertexIdEdges<I, E> edges) {
-    if (!isInitialized.get()) {
-      initialize();
-    }
-
-    MetaPartition meta = new MetaPartition(partitionId);
-    MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
-    if (temp != null) {
-      meta = temp;
-    }
-
-    boolean createPartition = false;
-    synchronized (meta) {
-      switch (meta.getState()) {
-      case INIT:
-        Partition<I, V, E> partition =
-            conf.createPartition(partitionId, context);
-        meta.setPartition(partition);
-        // This is set to processed so that in the very next iteration cycle,
-        // when startIteration is called, all partitions seem to be processed
-        // and ready for the next iteration cycle. Otherwise, startIteration
-        // fails in its sanity check due to finding an unprocessed partition.
-        meta.setProcessed(true);
-        numPartitionsInMem.getAndIncrement();
-        meta.setState(State.INACTIVE);
-      synchronized (processedPartitions) {
-        processedPartitions.get(State.INACTIVE).add(partitionId);
-        processedPartitions.notifyAll();
-      }
-        createPartition = true;
-        // Continue to INACTIVE case to add the edges to the partition
-        // CHECKSTYLE: stop FallThrough
-      case INACTIVE:
-        // CHECKSTYLE: resume FallThrough
-        edgeStore.addPartitionEdges(partitionId, edges);
-        break;
-      case IN_TRANSIT:
-      case ON_DISK:
-        // Adding edges to in-memory buffer of the partition
-        List<VertexIdEdges<I, E>> newEdges =
-            new ArrayList<VertexIdEdges<I, E>>();
-        newEdges.add(edges);
-        int length = edges.getSerializedSize();
-        Pair<Integer, List<VertexIdEdges<I, E>>> newPair =
-            new MutablePair<>(length, newEdges);
-        edgeBufferRWLock.readLock().lock();
-        Pair<Integer, List<VertexIdEdges<I, E>>> oldPair =
-            pendingInputEdges.putIfAbsent(partitionId, newPair);
-        if (oldPair != null) {
-          synchronized (oldPair) {
-            MutablePair<Integer, List<VertexIdEdges<I, E>>> pair =
-                (MutablePair<Integer, List<VertexIdEdges<I, E>>>) oldPair;
-            pair.setLeft(pair.getLeft() + length);
-            pair.getRight().add(edges);
-          }
-        }
-        edgeBufferRWLock.readLock().unlock();
-        // In the case that the number of partitions is asked to be fixed by the
-        // user, we should offload the edge store as necessary.
-        if (isNumPartitionsFixed &&
-            pendingInputEdges.get(partitionId).getLeft() > minBuffSize) {
-          try {
-            spillPartitionInputEdgeStore(partitionId);
-          } catch (IOException e) {
-            throw new IllegalStateException("addPartitionEdges: spilling " +
-                "edge store for partition " + partitionId + " failed!");
-          }
-        }
-        break;
-      default:
-        throw new IllegalStateException("illegal state " + meta.getState() +
-            " for partition " + meta.getId());
-      }
-    }
-    // If creation of a new partition is violating the policy of maximum number
-    // of partitions in memory, we should spill a partition to disk.
-    if (createPartition &&
-        numPartitionsInMem.get() > maxPartitionsInMem.get()) {
-      swapOnePartitionToDisk();
-    }
-  }
-
-  /**
-   * Spills vertex buffer generated for specified partition in INPUT_SUPERSTEP
-   * Note that the partition should be ON_DISK or IN_TRANSIT.
-   *
-   * @param partitionId Id of partition to spill its vertex buffer
-   */
-  public void spillPartitionInputVertexBuffer(Integer partitionId)
-      throws IOException {
-    rwLock.readLock().lock();
-    if (movingEdges) {
-      rwLock.readLock().unlock();
-      return;
-    }
-    Pair<Integer, List<ExtendedDataOutput>> entry;
-    // Synchronization on the concurrent map is necessary to avoid inconsistent
-    // structure while execution of this method is interleaved with the
-    // execution of addPartitionVertices. For instance, consider
-    // the case where a thread wants to modify the value of an entry in
-    // addPartitionVertices while another thread is running this
-    // method removing the entry from the map. If removal and offloading the
-    // entry's value to disk happens first, the modification by former thread
-    // would be lost.
-    vertexBufferRWLock.writeLock().lock();
-    entry = pendingInputVertices.remove(partitionId);
-    vertexBufferRWLock.writeLock().unlock();
-
-    // Check if vertex buffer has already been merged with the partition
-    if (entry == null) {
-      rwLock.readLock().unlock();
-      return;
-    }
-    // Sanity check
-    checkState(!entry.getRight().isEmpty(),
-        "spillPartitionInputVertexBuffer: the vertex buffer that is " +
-            "supposed to be flushed to disk does not exist.");
-
-    List<ExtendedDataOutput> bufferList = entry.getRight();
-    Integer numBuffers = numPendingInputVerticesOnDisk.putIfAbsent(partitionId,
-        bufferList.size());
-    if (numBuffers != null) {
-      numPendingInputVerticesOnDisk.replace(partitionId,
-          numBuffers + bufferList.size());
-    }
-
-    File file = new File(getPendingVerticesBufferPath(partitionId));
-    FileOutputStream fos = new FileOutputStream(file, true);
-    BufferedOutputStream bos = new BufferedOutputStream(fos);
-    DataOutputStream dos = new DataOutputStream(bos);
-    for (ExtendedDataOutput extendedDataOutput : bufferList) {
-      WritableUtils.writeExtendedDataOutput(extendedDataOutput, dos);
-    }
-    dos.close();
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Looks through all partitions already on disk, and see if any of them has
-   * enough pending vertices in its buffer in memory. If so, put that
-   * partition along with an approximate amount of memory it took (in bytes) in
-   * a list to return.
-   *
-   * @return List of pairs (partitionId, sizeInByte) of the partitions where
-   *         their pending vertex buffer in input superstep is worth flushing to
-   *         disk
-   */
-  public PairList<Integer, Integer>
-  getOocPartitionIdsWithPendingInputVertices() {
-    PairList<Integer, Integer> pairList = new PairList<>();
-    pairList.initialize();
-    if (!movingEdges) {
-      for (Entry<Integer, Pair<Integer, List<ExtendedDataOutput>>> entry :
-          pendingInputVertices.entrySet()) {
-        if (entry.getValue().getLeft() > minBuffSize) {
-          pairList.add(entry.getKey(), entry.getValue().getLeft());
-        }
-      }
-    }
-    return pairList;
-  }
-
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH")
-  public void addPartitionVertices(Integer partitionId,
-      ExtendedDataOutput extendedDataOutput) {
-    if (!isInitialized.get()) {
-      initialize();
-    }
-
-    MetaPartition meta = new MetaPartition(partitionId);
-    MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
-    if (temp != null) {
-      meta = temp;
-    }
-
-    boolean createPartition = false;
-    synchronized (meta) {
-      switch (meta.getState()) {
-      case INIT:
-        Partition<I, V, E> partition =
-            conf.createPartition(partitionId, context);
-        meta.setPartition(partition);
-        // Look at the comments in 'addPartitionVertices' for why we set the
-        // this to true.
-        meta.setProcessed(true);
-        numPartitionsInMem.getAndIncrement();
-        meta.setState(State.INACTIVE);
-      synchronized (processedPartitions) {
-        processedPartitions.get(State.INACTIVE).add(partitionId);
-        processedPartitions.notifyAll();
-      }
-        createPartition = true;
-        // Continue to INACTIVE case to add the vertices to the partition
-        // CHECKSTYLE: stop FallThrough
-      case INACTIVE:
-        // CHECKSTYLE: resume FallThrough
-        meta.getPartition().addPartitionVertices(
-            new VertexIterator<I, V, E>(extendedDataOutput, conf));
-        break;
-      case IN_TRANSIT:
-      case ON_DISK:
-        // Adding vertices to in-memory buffer of the partition
-        List<ExtendedDataOutput> vertices = new ArrayList<ExtendedDataOutput>();
-        vertices.add(extendedDataOutput);
-        int length = extendedDataOutput.getPos();
-        Pair<Integer, List<ExtendedDataOutput>> newPair =
-            new MutablePair<>(length, vertices);
-        vertexBufferRWLock.readLock().lock();
-        Pair<Integer, List<ExtendedDataOutput>> oldPair =
-            pendingInputVertices.putIfAbsent(partitionId, newPair);
-        if (oldPair != null) {
-          synchronized (oldPair) {
-            MutablePair<Integer, List<ExtendedDataOutput>> pair =
-                (MutablePair<Integer, List<ExtendedDataOutput>>) oldPair;
-            pair.setLeft(pair.getLeft() + length);
-            pair.getRight().add(extendedDataOutput);
-          }
-        }
-        vertexBufferRWLock.readLock().unlock();
-        // In the case that the number of partitions is asked to be fixed by the
-        // user, we should offload the edge store as necessary.
-        if (isNumPartitionsFixed &&
-            pendingInputVertices.get(partitionId).getLeft() > minBuffSize) {
-          try {
-            spillPartitionInputVertexBuffer(partitionId);
-          } catch (IOException e) {
-            throw new IllegalStateException("addPartitionVertices: spilling " +
-                "vertex buffer for partition " + partitionId + " failed!");
-          }
-        }
-        break;
-      default:
-        throw new IllegalStateException("illegal state " + meta.getState() +
-            " for partition " + meta.getId());
-      }
-    }
-    // If creation of a new partition is violating the policy of maximum number
-    // of partitions in memory, we should spill a partition to disk.
-    if (createPartition &&
-        numPartitionsInMem.get() > maxPartitionsInMem.get()) {
-      swapOnePartitionToDisk();
-    }
-  }
-
-  @Override
-  public void putPartition(Partition<I, V, E> partition) {
-    checkArgument(partition != null);
-
-    Integer id = partition.getId();
-    MetaPartition meta = partitions.get(id);
-    checkNotNull(meta, "putPartition: partition to put does " +
-        "not exist in the store (impossible)");
-    synchronized (meta) {
-      checkState(meta.getState() == State.ACTIVE,
-          "It is not possible to put back a partition which is not ACTIVE. " +
-              "meta = " + meta.toString());
-
-      meta.setState(State.INACTIVE);
-      meta.setProcessed(true);
-      synchronized (processedPartitions) {
-        processedPartitions.get(State.INACTIVE).add(id);
-        // Notify OOC threads waiting for a partition to become available to put
-        // on disk.
-        processedPartitions.notifyAll();
-      }
-    }
-  }
-
-  @Override
-  public Partition<I, V, E> removePartition(Integer partitionId) {
-    if (hasPartition(partitionId)) {
-      MetaPartition meta = partitions.remove(partitionId);
-      // Since this method is called outside of the iteration cycle, all
-      // partitions in the store should be in the processed state.
-      checkState(processedPartitions.get(meta.getState()).remove(partitionId),
-          "removePartition: partition that is about to remove is not in " +
-              "processed list (impossible)");
-
-      getPartition(meta);
-      numPartitionsInMem.getAndDecrement();
-      return meta.getPartition();
-    }
-    return null;
-  }
-
-  @Override
-  public boolean addPartition(Partition<I, V, E> partition) {
-    if (!isInitialized.get()) {
-      initialize();
-    }
-
-    Integer id = partition.getId();
-    MetaPartition meta = new MetaPartition(id);
-    MetaPartition temp = partitions.putIfAbsent(id, meta);
-    if (temp != null) {
-      return false;
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("addPartition: partition " + id + " is  added to the store.");
-    }
-
-    meta.setPartition(partition);
-    meta.setState(State.INACTIVE);
-    meta.setProcessed(true);
-    synchronized (processedPartitions) {
-      processedPartitions.get(State.INACTIVE).add(id);
-      processedPartitions.notifyAll();
-    }
-    numPartitionsInMem.getAndIncrement();
-    // Swapping partitions to disk if addition of this partition violates the
-    // requirement on the number of partitions.
-    if (numPartitionsInMem.get() > maxPartitionsInMem.get()) {
-      swapOnePartitionToDisk();
-    }
-    return true;
-  }
-
-  @Override
-  public void shutdown() {
-    // Sanity check to check there is nothing left from previous superstep
-    checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
-            unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
-            unProcessedPartitions.get(State.ON_DISK).isEmpty(),
-        "shutdown: There are some unprocessed partitions left from the " +
-            "previous superstep. This should not be possible.");
-
-    for (MetaPartition meta : partitions.values()) {
-      synchronized (meta) {
-        while (meta.getState() == State.IN_TRANSIT) {
-          try {
-            meta.wait();
-          } catch (InterruptedException e) {
-            throw new IllegalStateException("shutdown: exception while" +
-                "waiting on an IN_TRANSIT partition to be written on disk");
-          }
-        }
-        if (meta.getState() == State.ON_DISK) {
-          deletePartitionFiles(meta.getId());
-        }
-      }
-    }
-
-    if (oocEngine != null) {
-      oocEngine.shutdown();
-    }
-  }
-
-  @Override
-  public void startIteration() {
-    if (!isInitialized.get()) {
-      initialize();
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("startIteration: with " + numPartitionsInMem.get() +
-          " partitions in memory, there can be maximum " + maxPartitionsInMem +
-          " partitions in memory out of " + partitions.size() + " that " +
-          "belongs to this worker.");
-    }
-    // Sanity check to make sure nothing left unprocessed from previous
-    // superstep
-    checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
-            unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
-            unProcessedPartitions.get(State.ON_DISK).isEmpty(),
-        "startIteration: There are some unprocessed and/or " +
-            "in-transition partitions left from the previous superstep. " +
-            "This should not be possible.");
-
-    rwLock.writeLock().lock();
-    for (MetaPartition meta : partitions.values()) {
-      // Sanity check
-      checkState(meta.isProcessed(), "startIteration: " +
-          "meta-partition " + meta + " has not been processed in the " +
-          "previous superstep.");
-
-      // The only case where a partition can be IN_TRANSIT is where it is still
-      // being offloaded to disk, and it happens only in swapOnePartitionToDisk,
-      // where we at least hold a read lock while transfer is in progress. Since
-      // the write lock is held in this section, no partition should be
-      // IN_TRANSIT.
-      checkState(meta.getState() != State.IN_TRANSIT,
-          "startIteration: meta-partition " + meta + " is still IN_TRANSIT " +
-              "(impossible)");
-
-      meta.setProcessed(false);
-    }
-
-    unProcessedPartitions.clear();
-    unProcessedPartitions.putAll(processedPartitions);
-    processedPartitions.clear();
-    processedPartitions
-        .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
-    processedPartitions
-        .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
-    processedPartitions
-        .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
-    rwLock.writeLock().unlock();
-    LOG.info("startIteration: done preparing the iteration");
-  }
-
-  @Override
-  public void moveEdgesToVertices() {
-    movingEdges = true;
-    edgeStore.moveEdgesToVertices();
-    movingEdges = false;
-  }
-
-  /**
-   * Pops an entry from the specified set. This is guaranteed that the set is
-   * being accessed from within a lock.
-   *
-   * @param set set to pop an entry from
-   * @return popped entry from the given set
-   */
-  private Integer popFromSet(Set<Integer> set) {
-    if (!set.isEmpty()) {
-      Iterator<Integer> it = set.iterator();
-      Integer id = it.next();
-      it.remove();
-      return id;
-    }
-    return null;
-  }
-
-  /**
-   * Allows more partitions to be stored in memory.
-   *
-   * @param numPartitionsToIncrease How many more partitions to allow in the
-   *                                store
-   */
-  public void increasePartitionSlots(Integer numPartitionsToIncrease) {
-    maxPartitionsInMem.getAndAdd(numPartitionsToIncrease);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("increasePartitionSlots: allowing partition store to have " +
-          numPartitionsToIncrease + " more partitions. Now, partition store " +
-          "can have up to " + maxPartitionsInMem.get() + " partitions.");
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (MetaPartition e : partitions.values()) {
-      sb.append(e.toString() + "\n");
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Writes vertex data (Id, value and halted state) to stream.
-   *
-   * @param output The output stream
-   * @param vertex The vertex to serialize
-   * @throws IOException
-   */
-  private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    vertex.getId().write(output);
-    vertex.getValue().write(output);
-    output.writeBoolean(vertex.isHalted());
-  }
-
-  /**
-   * Writes vertex edges (Id, edges) to stream.
-   *
-   * @param output The output stream
-   * @param vertex The vertex to serialize
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    vertex.getId().write(output);
-    OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
-    edges.write(output);
-  }
-
-  /**
-   * Read vertex data from an input and initialize the vertex.
-   *
-   * @param in The input stream
-   * @param vertex The vertex to initialize
-   * @throws IOException
-   */
-  private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
-    throws IOException {
-
-    I id = conf.createVertexId();
-    id.readFields(in);
-    V value = conf.createVertexValue();
-    value.readFields(in);
-    OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
-    vertex.initialize(id, value, edges);
-    if (in.readBoolean()) {
-      vertex.voteToHalt();
-    } else {
-      vertex.wakeUp();
-    }
-  }
-
-  /**
-   * Read vertex edges from an input and set them to the vertex.
-   *
-   * @param in The input stream
-   * @param partition The partition owning the vertex
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private void readOutEdges(DataInput in, Partition<I, V, E> partition)
-    throws IOException {
-
-    I id = conf.createVertexId();
-    id.readFields(in);
-    Vertex<I, V, E> v = partition.getVertex(id);
-    OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
-    edges.readFields(in);
-    partition.saveVertex(v);
-  }
-
-  /**
-   * Load messages for a given partition for the current superstep to memory.
-   *
-   * @param partitionId Id of the partition to load the messages for
-   * @throws IOException
-   */
-  private void loadMessages(int partitionId) throws IOException {
-    // Messages for current superstep
-    if (currentMessageStore != null &&
-        !conf.getOutgoingMessageClasses().useMessageCombiner()) {
-      checkState(!currentMessageStore.hasMessagesForPartition(partitionId),
-          "loadMessages: partition " + partitionId + " is on disk, " +
-              "but its message store is in memory (impossible)");
-      // First, reading the message store for the partition if there is any
-      File file = new File(
-          getMessagesPath(partitionId, serviceWorker.getSuperstep()));
-      if (file.exists()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadMessages: loading message store of partition " +
-              partitionId);
-        }
-        FileInputStream filein = new FileInputStream(file);
-        BufferedInputStream bufferin = new BufferedInputStream(filein);
-        DataInputStream inputStream = new DataInputStream(bufferin);
-        currentMessageStore.readFieldsForPartition(inputStream, partitionId);
-        inputStream.close();
-        checkState(file.delete(), "loadMessages: failed to delete %s.",
-            file.getAbsolutePath());
-      }
-
-      messageBufferRWLock.writeLock().lock();
-      Pair<Integer, List<VertexIdMessages<I, Writable>>> pendingMessages =
-          pendingCurrentMessages.remove(partitionId);
-      messageBufferRWLock.writeLock().unlock();
-
-      // Second, reading message buffers (incoming messages in previous
-      // superstep)
-      file = new File(getPendingMessagesBufferPath(partitionId,
-          serviceWorker.getSuperstep()));
-      if (file.exists()) {
-        FileInputStream filein = new FileInputStream(file);
-        BufferedInputStream bufferin = new BufferedInputStream(filein);
-        DataInputStream inputStream = new DataInputStream(bufferin);
-        while (true) {
-          int type;
-          try {
-            type = inputStream.readInt();
-          } catch (EOFException e) {
-            // Reached end of file, so all the records are read.
-            break;
-          }
-          SerializedMessageClass messageClass =
-              SerializedMessageClass.values()[type];
-          VertexIdMessages<I, Writable> vim;
-          switch (messageClass) {
-          case BYTE_ARRAY_VERTEX_ID_MESSAGES:
-            vim = new ByteArrayVertexIdMessages<>(
-                conf.createOutgoingMessageValueFactory());
-            vim.setConf(conf);
-            break;
-          case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
-            vim = new ByteArrayOneMessageToManyIds<>(
-                conf.createOutgoingMessageValueFactory());
-            vim.setConf(conf);
-            break;
-          default:
-            throw new IllegalStateException("loadMessages: unsupported " +
-                "serialized message type!");
-          }
-          vim.readFields(inputStream);
-          currentMessageStore.addPartitionMessages(partitionId, vim);
-        }
-        inputStream.close();
-        checkState(!file.delete(), "loadMessages: failed to delete %s",
-            file.getAbsolutePath());
-      }
-
-      // Third, applying message buffers already in memory
-      if (pendingMessages != null) {
-        for (VertexIdMessages<I, Writable> vim : pendingMessages.getRight()) {
-          currentMessageStore.addPartitionMessages(partitionId, vim);
-        }
-      }
-      currentMessagesOnDisk.put(partitionId, false);
-    }
-  }
-
-  /**
-   * Load a partition from disk. It deletes the files after the load,
-   * except for the edges, if the graph is static.
-   *
-   * @param meta meta partition to load the partition of
-   * @return The partition
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  private Partition<I, V, E> loadPartition(MetaPartition meta)
-      throws IOException {
-    Integer partitionId = meta.getId();
-    long numVertices = meta.getVertexCount();
-    Partition<I, V, E> partition = conf.createPartition(partitionId, context);
-
-    // Vertices
-    File file = new File(getVerticesPath(partitionId));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("loadPartition: loading partition vertices " +
-        partition.getId() + " from " + file.getAbsolutePath());
-    }
-
-    FileInputStream filein = new FileInputStream(file);
-    BufferedInputStream bufferin = new BufferedInputStream(filein);
-    DataInputStream inputStream  = new DataInputStream(bufferin);
-    for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V , E> vertex = conf.createVertex();
-      readVertexData(inputStream, vertex);
-      partition.putVertex(vertex);
-    }
-    inputStream.close();
-    checkState(file.delete(), "loadPartition: failed to delete %s",
-        file.getAbsolutePath());
-
-    // Edges
-    file = new File(getEdgesPath(partitionId));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("loadPartition: loading partition edges " +
-        partition.getId() + " from " + file.getAbsolutePath());
-    }
-
-    filein = new FileInputStream(file);
-    bufferin = new BufferedInputStream(filein);
-    inputStream  = new DataInputStream(bufferin);
-    for (int i = 0; i < numVertices; ++i) {
-      readOutEdges(inputStream, partition);
-    }
-    inputStream.close();
-    // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
-    // around.
-    if (!conf.isStaticGraph() ||
-        serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
-      checkState(file.delete(), "loadPartition: failed to delete %s",
-          file.getAbsolutePath());
-    }
-
-    // Load message for the current superstep
-    loadMessages(partitionId);
-
-    // Input vertex buffers
-    // First, applying vertex buffers on disk (since they came earlier)
-    Integer numBuffers = numPendingInputVerticesOnDisk.remove(partitionId);
-    if (numBuffers != null) {
-      file = new File(getPendingVerticesBufferPath(partitionId));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadPartition: loading " + numBuffers + " input vertex " +
-            "buffers of partition " + partitionId + " from " +
-            file.getAbsolutePath());
-      }
-      filein = new FileInputStream(file);
-      bufferin = new BufferedInputStream(filein);
-      inputStream = new DataInputStream(bufferin);
-      for (int i = 0; i < numBuffers; ++i) {
-        ExtendedDataOutput extendedDataOutput =
-            WritableUtils.readExtendedDataOutput(inputStream, conf);
-        partition.addPartitionVertices(
-            new VertexIterator<I, V, E>(extendedDataOutput, conf));
-      }
-      inputStream.close();
-      checkState(file.delete(), "loadPartition: failed to delete %s",
-          file.getAbsolutePath());
-    }
-    // Second, applying vertex buffers already in memory
-    Pair<Integer, List<ExtendedDataOutput>> vertexPair;
-    vertexBufferRWLock.writeLock().lock();
-    vertexPair = pendingInputVertices.remove(partitionId);
-    vertexBufferRWLock.writeLock().unlock();
-    if (vertexPair != null) {
-      for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) {
-        partition.addPartitionVertices(
-            new VertexIterator<I, V, E>(extendedDataOutput, conf));
-      }
-    }
-
-    // Edge store
-    if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
-      checkState(hasEdgeStoreOnDisk.containsKey(partitionId),
-          "loadPartition: partition is written to disk in INPUT_SUPERSTEP, " +
-              "but it is not clear whether its edge store is on disk or not " +
-              "(impossible)");
-
-      if (hasEdgeStoreOnDisk.remove(partitionId)) {
-        file = new File(getEdgeStorePath(partitionId));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartition: loading edge store of partition " +
-              partitionId + " from " + file.getAbsolutePath());
-        }
-        filein = new FileInputStream(file);
-        bufferin = new BufferedInputStream(filein);
-        inputStream = new DataInputStream(bufferin);
-        edgeStore.readPartitionEdgeStore(partitionId, inputStream);
-        inputStream.close();
-        checkState(file.delete(), "loadPartition: failed to delete %s",
-            file.getAbsolutePath());
-      }
-
-      // Input edge buffers
-      // First, applying edge buffers on disk (since they came earlier)
-      numBuffers = numPendingInputEdgesOnDisk.remove(partitionId);
-      if (numBuffers != null) {
-        file = new File(getPendingEdgesBufferPath(partitionId));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartition: loading " + numBuffers + " input edge " +
-              "buffers of partition " + partitionId + " from " +
-              file.getAbsolutePath());
-        }
-        filein = new FileInputStream(file);
-        bufferin = new BufferedInputStream(filein);
-        inputStream = new DataInputStream(bufferin);
-        for (int i = 0; i < numBuffers; ++i) {
-          VertexIdEdges<I, E> vertexIdEdges =
-              new ByteArrayVertexIdEdges<I, E>();
-          vertexIdEdges.setConf(conf);
-          vertexIdEdges.readFields(inputStream);
-          edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
-        }
-        inputStream.close();
-        checkState(file.delete(), "loadPartition: failed to delete %s",
-            file.getAbsolutePath());
-      }
-      // Second, applying edge buffers already in memory
-      Pair<Integer, List<VertexIdEdges<I, E>>> edgePair = null;
-      edgeBufferRWLock.writeLock().lock();
-      edgePair = pendingInputEdges.remove(partitionId);
-      edgeBufferRWLock.writeLock().unlock();
-      if (edgePair != null) {
-        for (VertexIdEdges<I, E> vertexIdEdges : edgePair.getRight()) {
-          edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
-        }
-      }
-    }
-    return partition;
-  }
-
-  /**
-   * Write a partition to disk.
-   *
-   * @param meta meta partition containing the partition to offload
-   * @throws IOException
-   */
-  private void offloadPartition(MetaPartition meta) throws IOException {
-    Partition<I, V, E> partition = meta.getPartition();
-    int partitionId = meta.getId();
-    File file = new File(getVerticesPath(partitionId));
-    File parent = file.getParentFile();
-    if (!parent.exists() && !parent.mkdirs() && LOG.isDebugEnabled()) {
-      LOG.debug("offloadPartition: directory " + parent.getAbsolutePath() +
-        " already exists.");
-    }
-
-    checkState(file.createNewFile(),
-        "offloadPartition: file %s already exists.", parent.getAbsolutePath());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("offloadPartition: writing partition vertices " +
-        partitionId + " to " + file.getAbsolutePath());
-    }
-
-    FileOutputStream fileout = new FileOutputStream(file);
-    BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-    DataOutputStream outputStream  = new DataOutputStream(bufferout);
-    for (Vertex<I, V, E> vertex : partition) {
-      writeVertexData(outputStream, vertex);
-    }
-    outputStream.close();
-
-    // Avoid writing back edges if we have already written them once and
-    // the graph is not changing.
-    // If we are in the input superstep, we need to write the files
-    // at least the first time, even though the graph is static.
-    file = new File(getEdgesPath(partitionId));
-    if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
-        meta.getPrevVertexCount() != partition.getVertexCount() ||
-        !conf.isStaticGraph() || !file.exists()) {
-
-      meta.setPrevVertexCount(partition.getVertexCount());
-
-      if (!file.createNewFile() && LOG.isDebugEnabled()) {
-        LOG.debug("offloadPartition: file " + file.getAbsolutePath() +
-            " already exists.");
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("offloadPartition: writing partition edges " +
-          partitionId + " to " + file.getAbsolutePath());
-      }
-
-      fileout = new FileOutputStream(file);
-      bufferout = new BufferedOutputStream(fileout);
-      outputStream = new DataOutputStream(bufferout);
-      for (Vertex<I, V, E> vertex : partition) {
-        writeOutEdges(outputStream, vertex);
-      }
-      outputStream.close();
-    }
-
-    if (currentMessageStore != null &&
-        !conf.getOutgoingMessageClasses().useMessageCombiner() &&
-        currentMessageStore.hasMessagesForPartition(partitionId)) {
-      writeMessageData(currentMessageStore, currentMessagesOnDisk, partitionId,
-          serviceWorker.getSuperstep());
-    }
-    if (incomingMessageStore != null &&
-        !conf.getIncomingMessageClasses().useMessageCombiner() &&
-        incomingMessageStore.hasMessagesForPartition(partitionId)) {
-      writeMessageData(incomingMessageStore, incomingMessagesOnDisk,
-          partitionId, serviceWorker.getSuperstep() + 1);
-    }
-
-    // Writing edge store to disk in the input superstep
-    if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
-      if (edgeStore.hasPartitionEdges(partitionId)) {
-        hasEdgeStoreOnDisk.put(partitionId, true);
-        file = new File(getEdgeStorePath(partitionId));
-        if (!file.createNewFile() && LOG.isDebugEnabled()) {
-          LOG.debug("offloadPartition: file " + file.getAbsolutePath() +
-              " already exists.");
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("offloadPartition: writing partition edge store of " +
-              partitionId + " to " + file.getAbsolutePath());
-        }
-
-        fileout = new FileOutputStream(file);
-        bufferout = new BufferedOutputStream(fileout);
-        outputStream = new DataOutputStream(bufferout);
-        edgeStore.writePartitionEdgeStore(partitionId, outputStream);
-        outputStream.close();
-      } else {
-        hasEdgeStoreOnDisk.put(partitionId, false);
-      }
-    }
-  }
-
-  /**
-   * Offload message data of a particular type of store (current or incoming) to
-   * disk.
-   *
-   * @param messageStore The message store to write to disk
-   * @param messagesOnDisk Map to update and let others know that this message
-   *                       store is on disk
-   * @param partitionId Id of the partition we want to offload the message store
-   *                    of
-   * @param superstep Superstep for which we want to offload message data for.
-   *                  It is equal the current superstep number for offloading
-   *                  currentMessageStore, and is equal to next superstep
-   *                  number for offloading incomingMessageStore
-   * @throws IOException
-   */
-  private void writeMessageData(MessageStore<I, Writable> messageStore,
-      ConcurrentMap<Integer, Boolean> messagesOnDisk, int partitionId,
-      long superstep) throws IOException {
-    File file = new File(getMessagesPath(partitionId, superstep));
-    checkState(!file.exists(),
-        "writeMessageData: message store file for partition " +
-            partitionId + " for messages in superstep " +
-            superstep + " already exist (impossible).");
-
-    checkState(file.createNewFile(),
-        "offloadPartition: cannot create message store file for " +
-            "partition " + partitionId);
-
-    FileOutputStream fileout = new FileOutputStream(file);
-    BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-    DataOutputStream outputStream = new DataOutputStream(bufferout);
-    messageStore.writePartition(outputStream, partitionId);
-    messageStore.clearPartition(partitionId);
-    outputStream.close();
-    messagesOnDisk.put(partitionId, true);
-  }
-
-  /**
-   * Delete a partition's files.
-   *
-   * @param id The id of the partition owning the file.
-   */
-  private void deletePartitionFiles(Integer id) {
-    // File containing vertices
-    File file = new File(getVerticesPath(id));
-    checkState(!file.exists() || file.delete(),
-        "deletePartitionFiles: Failed to delete file " +
-            file.getAbsolutePath());
-
-    // File containing edges
-    file = new File(getEdgesPath(id));
-    checkState(!file.exists() || file.delete(),
-        "deletePartitionFiles: Failed to delete file " +
-            file.getAbsolutePath());
-  }
-
-  /**
-   * Get the path and basename of the storage files.
-   *
-   * @param partitionId The partition
-   * @return The path to the given partition
-   */
-  private String getPartitionPath(Integer partitionId) {
-    int hash = hasher.hashInt(partitionId).asInt();
-    int idx = Math.abs(hash % basePaths.length);
-    return basePaths[idx] + "/partition-" + partitionId;
-  }
-
-  /**
-   * Get the path to the file where vertices are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the vertices file
-   */
-  private String getVerticesPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_vertices";
-  }
-
-  /**
-   * Get the path to the file where pending vertices in INPUT_SUPERSTEP
-   * are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the file
-   */
-  private String getPendingVerticesBufferPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_pending_vertices";
-  }
-
-  /**
-   * Get the path to the file where edge store of a partition in INPUT_SUPERSTEP
-   * is stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the file
-   */
-  private String getEdgeStorePath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_edge_store";
-  }
-
-  /**
-   * Get the path to the file where pending edges in INPUT_SUPERSTEP
-   * are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the file
-   */
-  private String getPendingEdgesBufferPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_pending_edges";
-  }
-
-  /**
-   * Get the path to the file where edges are stored.
-   *
-   * @param partitionId The partition
-   * @return The path to the edges file
-   */
-  private String getEdgesPath(Integer partitionId) {
-    return getPartitionPath(partitionId) + "_edges";
-  }
-
-  /**
-   * Get the path to the file where pending incoming messages are stored.
-   *
-   * @param partitionId The partition
-   * @param superstep superstep number
-   * @return The path to the file
-   */
-  private String getPendingMessagesBufferPath(Integer partitionId,
-      long superstep) {
-    return getPartitionPath(partitionId) + "_pending_messages_" + superstep;
-  }
-
-  /**
-   * Get the path to the file where messages are stored.
-   *
-   * @param partitionId The partition
-   * @param superstep superstep number
-   * @return The path to the file
-   */
-  private String getMessagesPath(Integer partitionId, long superstep) {
-    return getPartitionPath(partitionId) + "_messages_" + superstep;
-  }
-
-  /**
-   * Partition container holding additional meta data associated with each
-   * partition.
-   */
-  private class MetaPartition {
-    // ---- META INFORMATION ----
-    /** ID of the partition */
-    private int id;
-    /** State in which the partition is */
-    private State state;
-    /** Number of vertices contained in the partition */
-    private long vertexCount;
-    /** Previous number of vertices contained in the partition */
-    private long prevVertexCount;
-    /** Number of edges contained in the partition */
-    private long edgeCount;
-    /**
-     * Whether the partition is already processed in the current iteration
-     * cycle
-     */
-    private boolean isProcessed;
-
-    // ---- PARTITION ----
-    /** the actual partition. Depending on the state of the partition,
-        this object could be empty. */
-    private Partition<I, V, E> partition;
-
-    /**
-     * Initialization of the metadata enriched partition.
-     *
-     * @param id id of the partition
-     */
-    public MetaPartition(int id) {
-      this.id = id;
-      this.state = State.INIT;
-      this.vertexCount = 0;
-      this.prevVertexCount = 0;
-      this.edgeCount = 0;
-      this.isProcessed = false;
-
-      this.partition = null;
-    }
-
-    /**
-     * @return the id
-     */
-    public int getId() {
-      return id;
-    }
-
-    /**
-     * @return the state
-     */
-    public State getState() {
-      return state;
-    }
-
-    /**
-     * This function sets the metadata for on-disk partition.
-     */
-    public void setOnDisk() {
-      this.state = State.ON_DISK;
-      this.vertexCount = partition.getVertexCount();
-      this.edgeCount = partition.getEdgeCount();
-      this.partition = null;
-    }
-
-    /**
-     *
-     * @param partition the partition associated to this container
-     */
-    public void setActive(Partition<I, V, E> partition) {
-      if (partition != null) {
-        this.partition = partition;
-      }
-      this.state = State.ACTIVE;
-      this.prevVertexCount = this.vertexCount;
-      this.vertexCount = 0;
-    }
-
-    /**
-     * @param state the state to set
-     */
-    public void setState(State state) {
-      this.state = state;
-    }
-
-    /**
-     * set previous number of vertexes
-     * @param vertexCount number of vertexes
-     */
-    public void setPrevVertexCount(long vertexCount) {
-      this.prevVertexCount = vertexCount;
-    }
-
-    /**
-     * @return the vertexCount
-     */
-    public long getPrevVertexCount() {
-      return prevVertexCount;
-    }
-
-    /**
-     * @return the vertexCount
-     */
-    public long getVertexCount() {
-      return vertexCount;
-    }
-
-    /**
-     * @return the edgeCount
-     */
-    public long getEdgeCount() {
-      return edgeCount;
-    }
-
-    /**
-     * @return true iff the partition is marked as processed.
-     */
-    public boolean isProcessed() {
-      return isProcessed;
-    }
-
-    /**
-     * Set the state of this partition in terms of being already processed or
-     * not
-     * @param isProcessed whether the partition is processed or not
-     */
-    public void setProcessed(boolean isProcessed) {
-      this.isProcessed = isProcessed;
-    }
-
-    /**
-     * @return the partition
-     */
-    public Partition<I, V, E> getPartition() {
-      return partition;
-    }
-
-    /**
-     * @param partition the partition to set
-     */
-    public void setPartition(Partition<I, V, E> partition) {
-      this.partition = partition;
-    }
-
-    @Override
-    public String toString() {
-      StringBuffer sb = new StringBuffer();
-
-      sb.append("Meta Data: { ");
-      sb.append("ID: " + id + "; ");
-      sb.append("State: " + state + "; ");
-      sb.append("Number of Vertices: " + vertexCount + "; ");
-      sb.append("Previous number of Vertices: " + prevVertexCount + "; ");
-      sb.append("Number of edges: " + edgeCount + "; ");
-      sb.append("Is processed: " + isProcessed + "; }");
-      sb.append("Partition: " + partition + "; }");
-
-      return sb.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
new file mode 100644
index 0000000..9324239
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.data.MetaPartitionManager;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.StorePartitionIOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+/**
+ * Out-of-core engine maintaining fixed number of partitions in memory.
+ */
+public class FixedOutOfCoreEngine extends OutOfCoreEngine {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(FixedOutOfCoreEngine.class);
+  /**
+   * When getting partitions, how many milliseconds to wait if no partition was
+   * available in memory
+   */
+  private static final long MSEC_TO_WAIT = 1000;
+  /**
+   * Dummy object to wait on until a partition becomes available in memory
+   * for processing
+   */
+  private final Object partitionAvailable = new Object();
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   * @param service Service worker
+   * @param maxPartitionsInMemory Maximum number of partitions that can be kept
+   *                              in memory
+   */
+  public FixedOutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+                              CentralizedServiceWorker<?, ?, ?> service,
+                              int maxPartitionsInMemory) {
+    super(conf, service);
+    this.ioScheduler = new FixedOutOfCoreIOScheduler(maxPartitionsInMemory,
+        numIOThreads, this, conf);
+  }
+
+  @Override
+  public Integer getNextPartition() {
+    Integer partitionId;
+    synchronized (partitionAvailable) {
+      while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
+        try {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("getNextPartition: waiting until a partition becomes " +
+                "available!");
+          }
+          partitionAvailable.wait(MSEC_TO_WAIT);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("getNextPartition: caught " +
+              "InterruptedException while waiting to retrieve a partition to " +
+              "process");
+        }
+        if (jobFailed) {
+          throw new RuntimeException("Job Failed due to a failure in an " +
+              "out-of-core IO thread");
+        }
+      }
+    }
+    if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
+      partitionId = null;
+    }
+    return partitionId;
+  }
+
+  @Override
+  public void doneProcessingPartition(int partitionId) {
+    metaPartitionManager.setPartitionIsProcessed(partitionId);
+    // Put the partition in store IO command queue and announce this partition
+    // as a candidate to offload to disk.
+    if (LOG.isInfoEnabled()) {
+      LOG.info("doneProcessingPartition: processing partition " + partitionId +
+          " is done!");
+    }
+    ioScheduler.addIOCommand(new StorePartitionIOCommand(this, partitionId));
+  }
+
+  @Override
+  public void startIteration() {
+    getSuperstepLock().writeLock().lock();
+    metaPartitionManager.resetPartition();
+    ((FixedOutOfCoreIOScheduler) ioScheduler).clearStoreCommandQueue();
+    getSuperstepLock().writeLock().unlock();
+  }
+
+  @Override
+  public void retrievePartition(int partitionId) {
+    long superstep = service.getSuperstep();
+    if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+      ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
+          superstep));
+      synchronized (partitionAvailable) {
+        while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+          try {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("retrievePartition: waiting until partition " +
+                  partitionId + " becomes available");
+            }
+            partitionAvailable.wait();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException("retrievePartition: caught " +
+                "InterruptedException while waiting to retrieve partition " +
+                partitionId);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void ioCommandCompleted(IOCommand command) {
+    if (command instanceof LoadPartitionIOCommand ||
+        command instanceof WaitIOCommand) {
+      // Notifying compute threads who are waiting for a partition to become
+      // available in memory to process.
+      synchronized (partitionAvailable) {
+        partitionAvailable.notifyAll();
+      }
+    }
+  }
+}