You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/06/28 00:05:26 UTC

[1/4] git commit: updated refs/heads/trunk to 3793c9e

Repository: giraph
Updated Branches:
  refs/heads/trunk 8eb1f763d -> 3793c9ef6


http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 3bb35eb..a7451bc 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -30,6 +30,7 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
@@ -166,6 +167,9 @@ public class TestPartitionStores {
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = new GraphTaskManager<>(context);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         partitionStore =
@@ -192,6 +196,9 @@ public class TestPartitionStores {
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = new GraphTaskManager<>(context);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         partitionStore =
@@ -307,6 +314,9 @@ public class TestPartitionStores {
     ServerData<IntWritable, IntWritable, NullWritable>
         serverData = new ServerData<>(serviceWorker, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+    GraphTaskManager<IntWritable, IntWritable, NullWritable>
+        graphTaskManager = new GraphTaskManager<>(context);
+    Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
         store =

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
index 9af00e0..0e8d83e 100644
--- a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
@@ -186,7 +186,7 @@ public class RexsterVertexOutputFormat<I extends WritableComparable,
       String id = context.getTaskAttemptID().toString();
       String zkBasePath = ZooKeeperManager.getBasePath(getConf()) +
         BspService.BASE_DIR + "/" +
-        getConf().get("mapred.job.id", "Unknown Job");
+        getConf().getJobId();
       prepareBarrier(zkBasePath);
       enterBarrier(zkBasePath, id);
       checkBarrier(zkBasePath, context);


[4/4] git commit: updated refs/heads/trunk to 3793c9e

Posted by ma...@apache.org.
Decouple out-of-core persistence infrastructure from out-of-core computation

Summary:
This diff proposes the following:
  - The persistence layer is decoupled from out-of-core infrastructure. This way one can simply implement different data accessors for various persistence resources. The persistence layer for reading/writing from/to local file system is implemented in this diff.
  - Previously, out-of-core data were indexed by string literals. This has changed for more flexibility. Now, data are accessible by a more flexible data indexing mechanism, in which a chain of indices are used to address a particular data.
  - With different implementations of data accessor, now there may be more emphasis on having more IO threads. It is important that these IO threads are load-balanced. In this diff, the mechanism to assign partitions to IO threads has changed.
  - All the coolness of Kryo's (de)serialization and RandomAccessFile (in D59277) is included in this diff, all at one place.

Test Plan:
mvn clean verify
out-of-core snapshot test passes

Reviewers: dionysis.logothetis, maja.kabiljo, sergey.edunov

Differential Revision: https://reviews.facebook.net/D59691


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3793c9ef
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3793c9ef
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3793c9ef

Branch: refs/heads/trunk
Commit: 3793c9ef69993bf4b180b6f6b15bb2a5edde5530
Parents: 8eb1f76
Author: Hassan Eslami <he...@fb.com>
Authored: Mon Jun 27 14:13:29 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Jun 27 14:13:34 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 .../java/org/apache/giraph/bsp/BspService.java  |   2 +-
 .../java/org/apache/giraph/comm/ServerData.java |   6 +-
 .../apache/giraph/conf/GiraphConfiguration.java |   9 -
 .../org/apache/giraph/conf/GiraphConstants.java |  27 +-
 .../ImmutableClassesGiraphConfiguration.java    |  19 +
 .../apache/giraph/graph/GraphTaskManager.java   |   2 +-
 .../giraph/ooc/FixedPartitionsOracle.java       | 139 ------
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |  32 +-
 .../apache/giraph/ooc/OutOfCoreIOCallable.java  |  25 +-
 .../giraph/ooc/OutOfCoreIOCallableFactory.java  |  64 +--
 .../apache/giraph/ooc/OutOfCoreIOScheduler.java |  31 +-
 .../giraph/ooc/OutOfCoreIOStatistics.java       |   2 +-
 .../org/apache/giraph/ooc/OutOfCoreOracle.java  | 131 ------
 .../giraph/ooc/SimpleGCMonitoringOracle.java    | 355 ---------------
 .../apache/giraph/ooc/ThresholdBasedOracle.java | 364 ----------------
 .../apache/giraph/ooc/command/IOCommand.java    | 104 +++++
 .../ooc/command/LoadPartitionIOCommand.java     | 102 +++++
 .../ooc/command/StoreDataBufferIOCommand.java   |  99 +++++
 .../command/StoreIncomingMessageIOCommand.java  |  69 +++
 .../ooc/command/StorePartitionIOCommand.java    |  85 ++++
 .../giraph/ooc/command/WaitIOCommand.java       |  64 +++
 .../apache/giraph/ooc/command/package-info.java |  21 +
 .../giraph/ooc/data/DiskBackedDataStore.java    | 432 +++++++++++++++++++
 .../giraph/ooc/data/DiskBackedEdgeStore.java    |  92 ++--
 .../giraph/ooc/data/DiskBackedMessageStore.java |  96 ++---
 .../ooc/data/DiskBackedPartitionStore.java      | 181 +++-----
 .../giraph/ooc/data/MetaPartitionManager.java   |  59 ++-
 .../giraph/ooc/data/OutOfCoreDataManager.java   | 401 -----------------
 .../org/apache/giraph/ooc/io/IOCommand.java     | 106 -----
 .../giraph/ooc/io/LoadPartitionIOCommand.java   | 102 -----
 .../giraph/ooc/io/StoreDataBufferIOCommand.java |  99 -----
 .../ooc/io/StoreIncomingMessageIOCommand.java   |  69 ---
 .../giraph/ooc/io/StorePartitionIOCommand.java  |  85 ----
 .../org/apache/giraph/ooc/io/WaitIOCommand.java |  64 ---
 .../org/apache/giraph/ooc/io/package-info.java  |  21 -
 .../giraph/ooc/persistence/DataIndex.java       | 198 +++++++++
 .../ooc/persistence/LocalDiskDataAccessor.java  | 252 +++++++++++
 .../ooc/persistence/OutOfCoreDataAccessor.java  | 115 +++++
 .../giraph/ooc/persistence/package-info.java    |  22 +
 .../ooc/policy/FixedPartitionsOracle.java       | 140 ++++++
 .../giraph/ooc/policy/OutOfCoreOracle.java      | 135 ++++++
 .../ooc/policy/SimpleGCMonitoringOracle.java    | 357 +++++++++++++++
 .../giraph/ooc/policy/ThresholdBasedOracle.java | 365 ++++++++++++++++
 .../apache/giraph/ooc/policy/package-info.java  |  21 +
 .../org/apache/giraph/zk/ZooKeeperManager.java  |   2 +-
 .../giraph/partition/TestPartitionStores.java   |  10 +
 .../rexster/io/RexsterVertexOutputFormat.java   |   2 +-
 48 files changed, 2871 insertions(+), 2309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3ae52e2..03acf8e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,7 +7,7 @@
 # Build files:
 *.class
 target
-Unknown Job*
+UnknownJob*
 failed-profile.txt
 
 # IntelliJ IDEA files:

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index fc0fa95..9545a25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -241,7 +241,7 @@ public abstract class BspService<I extends WritableComparable,
     this.context = context;
     this.graphTaskManager = graphTaskManager;
     this.conf = graphTaskManager.getConf();
-    this.jobId = conf.get("mapred.job.id", "Unknown Job");
+    this.jobId = conf.getJobId();
     this.taskPartition = conf.getTaskPartition();
     this.restartedSuperstep = conf.getLong(
         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 4156d8c..e926b6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -150,7 +150,7 @@ public class ServerData<I extends WritableComparable,
       oocEngine = new OutOfCoreEngine(conf, service);
       partitionStore =
           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
-              conf, context, service, oocEngine);
+              conf, context, oocEngine);
       edgeStore =
           new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
     } else {
@@ -268,7 +268,7 @@ public class ServerData<I extends WritableComparable,
         nextCurrentMessageStore = messageStore;
       } else {
         nextCurrentMessageStore = new DiskBackedMessageStore<>(
-            conf, messageStore,
+            conf, oocEngine, messageStore,
             conf.getIncomingMessageClasses().useMessageCombiner(),
             serviceWorker.getSuperstep());
       }
@@ -280,7 +280,7 @@ public class ServerData<I extends WritableComparable,
       nextIncomingMessageStore = messageStore;
     } else {
       nextIncomingMessageStore = new DiskBackedMessageStore<>(
-          conf, messageStore,
+          conf, oocEngine, messageStore,
           conf.getOutgoingMessageClasses().useMessageCombiner(),
           serviceWorker.getSuperstep() + 1);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7f1cb2b..4164c3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1146,15 +1146,6 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Whether the application with change or not the graph topology.
-   *
-   * @return true if the graph is static, false otherwise.
-   */
-  public boolean isStaticGraph() {
-    return STATIC_GRAPH.isTrue(this);
-  }
-
-  /**
    * Get the output directory to write YourKit snapshots to
    *
    * @param context Map context

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c592a12..ee67bed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,8 +71,10 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.ooc.OutOfCoreOracle;
-import org.apache.giraph.ooc.ThresholdBasedOracle;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
+import org.apache.giraph.ooc.policy.ThresholdBasedOracle;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
@@ -949,11 +951,32 @@ public interface GiraphConstants {
           "Comma-separated list of directories in the local filesystem for " +
           "out-of-core partitions.");
 
+  /**
+   * Number of IO threads used in out-of-core mechanism. If local disk is used
+   * for spilling data to and reading data from, this number should be equal to
+   * the number of available disks on each machine. In such case, one should
+   * use giraph.partitionsDirectory to specify directories mounted on different
+   * disks.
+   */
+  IntConfOption NUM_OUT_OF_CORE_THREADS =
+      new IntConfOption("giraph.numOutOfCoreThreads", 1, "Number of IO " +
+          "threads used in out-of-core mechanism. If using local disk to " +
+          "spill data, this should be equal to the number of available " +
+          "disks. In such case, use giraph.partitionsDirectory to specify " +
+          "mount points on different disks.");
+
   /** Enable out-of-core graph. */
   BooleanConfOption USE_OUT_OF_CORE_GRAPH =
       new BooleanConfOption("giraph.useOutOfCoreGraph", false,
           "Enable out-of-core graph.");
 
+  /** Data accessor resource/object */
+  ClassConfOption<OutOfCoreDataAccessor> OUT_OF_CORE_DATA_ACCESSOR =
+      ClassConfOption.create("giraph.outOfCoreDataAccessor",
+          LocalDiskDataAccessor.class, OutOfCoreDataAccessor.class,
+          "Data accessor used in out-of-core computation (local-disk, " +
+              "in-memory, HDFS, etc.)");
+
   /**
    * Out-of-core oracle that is to be used for adaptive out-of-core engine. If
    * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index b9ecf2d..1b79cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -128,6 +128,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * extended data input/output classes for messages
    */
   private final boolean useBigDataIOForMessages;
+  /** Is the graph static (meaning there is no mutation)? */
+  private final boolean isStaticGraph;
 
   /**
    * Constructor.  Takes the configuration and then gets the classes out of
@@ -144,6 +146,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
         GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
+    isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
     valueFactories = new ValueFactories<I, V, E>(this);
   }
 
@@ -1326,4 +1329,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
       return null;
     }
   }
+
+  /**
+   * Whether the application with change or not the graph topology.
+   *
+   * @return true if the graph is static, false otherwise.
+   */
+  public boolean isStaticGraph() {
+    return isStaticGraph;
+  }
+
+  /**
+   * @return job id
+   */
+  public String getJobId() {
+    return get("mapred.job.id", "UnknownJob");
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 725d327..a1d8522 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -1054,7 +1054,7 @@ end[PURE_YARN]*/
    * @return Time spent in GC recorder by the GC listener
    */
   public long getSuperstepGCTime() {
-    return gcTimeMetric.count();
+    return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
deleted file mode 100644
index f7badcb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/** Oracle for fixed out-of-core mechanism */
-public class FixedPartitionsOracle implements OutOfCoreOracle {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(FixedPartitionsOracle.class);
-  /** Maximum number of partitions to be kept in memory */
-  private final int maxPartitionsInMemory;
-  /**
-   * Number of partitions to be added (loaded) or removed (stored) to/from
-   * memory. Each outstanding load partition counts +1 and each outstanding
-   * store partition counts -1 toward this counter.
-   */
-  private final AtomicInteger deltaNumPartitionsInMemory =
-      new AtomicInteger(0);
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
-
-  /**
-   * Constructor
-   *
-   * @param conf configuration
-   * @param oocEngine out-of-core engine
-   */
-  public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
-                               OutOfCoreEngine oocEngine) {
-    this.maxPartitionsInMemory =
-        GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
-    this.oocEngine = oocEngine;
-  }
-
-  @Override
-  public IOAction[] getNextIOActions() {
-    int numPartitionsInMemory =
-        oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
-          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
-          " to be loaded");
-    }
-    int numPartitions =
-        numPartitionsInMemory + deltaNumPartitionsInMemory.get();
-    // Fixed out-of-core policy:
-    //   - if the number of partitions in memory is less than the max number of
-    //     partitions in memory, we should load a partition to memory. This
-    //     basically means we are prefetching partition's data either for the
-    //     current superstep, or for the next superstep.
-    //   - if the number of partitions in memory is equal to the the max number
-    //     of partitions in memory, we do a 'soft store', meaning, we store
-    //     processed partition to disk only if there is an unprocessed partition
-    //     on disk. This basically makes room for unprocessed partitions on disk
-    //     to be prefetched.
-    //   - if the number of partitions in memory is more than the max number of
-    //     partitions in memory, we do a 'hard store', meaning we store a
-    //     partition to disk, regardless of its processing state.
-    if (numPartitions < maxPartitionsInMemory) {
-      return new IOAction[]{
-        IOAction.LOAD_PARTITION,
-        IOAction.STORE_MESSAGES_AND_BUFFERS};
-    } else if (numPartitions > maxPartitionsInMemory) {
-      LOG.warn("getNextIOActions: number of partitions in memory passed the " +
-          "specified threshold!");
-      return new IOAction[]{
-        IOAction.STORE_PARTITION,
-        IOAction.STORE_MESSAGES_AND_BUFFERS};
-    } else {
-      return new IOAction[]{
-        IOAction.STORE_MESSAGES_AND_BUFFERS,
-        IOAction.LOAD_TO_SWAP_PARTITION};
-    }
-  }
-
-  @Override
-  public boolean approve(IOCommand command) {
-    int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
-        .getNumInMemoryPartitions();
-    // If loading a partition result in having more partition in memory, the
-    // command should be denied. Also, if number of partitions in memory is
-    // already less than the max number of partitions, any command for storing
-    // a partition should be denied.
-    if (command instanceof LoadPartitionIOCommand &&
-        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
-            maxPartitionsInMemory) {
-      deltaNumPartitionsInMemory.getAndDecrement();
-      return false;
-
-    } else if (command instanceof StorePartitionIOCommand &&
-        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
-            maxPartitionsInMemory) {
-      deltaNumPartitionsInMemory.getAndIncrement();
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void commandCompleted(IOCommand command) {
-    if (command instanceof LoadPartitionIOCommand) {
-      deltaNumPartitionsInMemory.getAndDecrement();
-    } else if (command instanceof StorePartitionIOCommand) {
-      deltaNumPartitionsInMemory.getAndIncrement();
-    }
-  }
-
-  @Override
-  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
-
-  @Override
-  public void shutdown() { }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 2037abe..3187468 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -30,8 +30,11 @@ import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.ooc.data.MetaPartitionManager;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
 import org.apache.giraph.utils.AdjustableSemaphore;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.log4j.Logger;
@@ -87,6 +90,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
    * with out-of-core operations (actual IO operations).
    */
   private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
+  /** Data accessor object (DAO) used as persistence layer in out-of-core */
+  private final OutOfCoreDataAccessor dataAccessor;
   /** Callable factory for IO threads */
   private final OutOfCoreIOCallableFactory oocIOCallableFactory;
   /**
@@ -149,9 +154,20 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
                          CentralizedServiceWorker<?, ?, ?> service) {
     this.service = service;
-    this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this);
-    /* How many disk (i.e. IO threads) do we have? */
-    int numIOThreads = oocIOCallableFactory.getNumDisks();
+    Class<? extends OutOfCoreDataAccessor> accessorClass =
+        GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
+    try {
+      Constructor<?> constructor = accessorClass.getConstructor(
+          ImmutableClassesGiraphConfiguration.class);
+      this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
+    } catch (NoSuchMethodException | InstantiationException |
+        InvocationTargetException | IllegalAccessException e) {
+      throw new IllegalStateException("OutOfCoreEngine: caught exception " +
+          "while creating the data accessor instance!", e);
+    }
+    int numIOThreads = dataAccessor.getNumAccessorThreads();
+    this.oocIOCallableFactory =
+        new OutOfCoreIOCallableFactory(this, numIOThreads);
     this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
     this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
@@ -188,6 +204,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
    * Initialize/Start the out-of-core engine.
    */
   public void initialize() {
+    dataAccessor.initialize();
     oocIOCallableFactory.createCallable();
   }
 
@@ -201,6 +218,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
     }
     ioScheduler.shutdown();
     oocIOCallableFactory.shutdown();
+    dataAccessor.shutdown();
   }
 
   /**
@@ -500,4 +518,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   public void setFlowControl(FlowControl flowControl) {
     this.flowControl = flowControl;
   }
+
+  public OutOfCoreDataAccessor getDataAccessor() {
+    return dataAccessor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index 962bd6a..bea3994 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -23,9 +23,9 @@ import com.yammer.metrics.core.Histogram;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
 import org.apache.log4j.Logger;
 
 import java.util.concurrent.Callable;
@@ -47,8 +47,6 @@ public class OutOfCoreIOCallable implements Callable<Void>,
   private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
   /** Out-of-core engine */
   private final OutOfCoreEngine oocEngine;
-  /** Base path that this thread will write to/read from */
-  private final String basePath;
   /** Thread id/Disk id */
   private final int diskId;
   /** How many bytes of data is read from disk */
@@ -64,13 +62,10 @@ public class OutOfCoreIOCallable implements Callable<Void>,
    * Constructor
    *
    * @param oocEngine out-of-core engine
-   * @param basePath base path this thread will be using
    * @param diskId thread id/disk id
    */
-  public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, String basePath,
-                             int diskId) {
+  public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) {
     this.oocEngine = oocEngine;
-    this.basePath = basePath;
     this.diskId = diskId;
     newSuperstep(GiraphMetrics.get().perSuperstep());
     GiraphMetrics.get().addSuperstepResetObserver(this);
@@ -98,15 +93,23 @@ public class OutOfCoreIOCallable implements Callable<Void>,
       long bytes;
       // CHECKSTYLE: stop IllegalCatch
       try {
+        long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
+            .getSuperstepGCTime();
         long startTime = System.currentTimeMillis();
-        commandExecuted = command.execute(basePath);
+        commandExecuted = command.execute();
         duration = System.currentTimeMillis() - startTime;
+        timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
+            .getSuperstepGCTime() - timeInGC;
         bytes = command.bytesTransferred();
         if (LOG.isInfoEnabled()) {
           LOG.info("call: thread " + diskId + "'s command " + command +
               " completed: bytes= " + bytes + ", duration=" + duration + ", " +
               "bandwidth=" + String.format("%.2f", (double) bytes / duration *
-              1000 / 1024 / 1024));
+              1000 / 1024 / 1024) +
+              ((command instanceof WaitIOCommand) ? "" :
+                  (", bandwidth (excluding GC time)=" + String.format("%.2f",
+                      (double) bytes / (duration - timeInGC) *
+                          1000 / 1024 / 1024))));
         }
       } catch (Exception e) {
         oocEngine.failTheJob();

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
index d4fea22..6aeb196 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
@@ -18,13 +18,11 @@
 
 package org.apache.giraph.ooc;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -35,9 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-
 /**
  * Factory class to create IO threads for out-of-core engine.
  */
@@ -49,37 +44,22 @@ public class OutOfCoreIOCallableFactory {
   private final OutOfCoreEngine oocEngine;
   /** Result of IO threads at the end of the computation */
   private final List<Future> results;
-  /** How many disks (i.e. IO threads) do we have? */
-  private int numDisks;
-  /** Path prefix for different disks */
-  private final String[] basePaths;
+  /** Number of threads used for IO operations */
+  private final int numIOThreads;
   /** Executor service for IO threads */
   private ExecutorService outOfCoreIOExecutor;
+
   /**
    * Constructor
    *
-   * @param conf Configuration
    * @param oocEngine Out-of-core engine
+   * @param numIOThreads Number of IO threads used
    */
-  public OutOfCoreIOCallableFactory(
-      ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
-      OutOfCoreEngine oocEngine) {
+  public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
+                                    int numIOThreads) {
     this.oocEngine = oocEngine;
-    this.results = new ArrayList<>();
-    // Take advantage of multiple disks
-    String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
-    this.numDisks = userPaths.length;
-    this.basePaths = new String[numDisks];
-    int ptr = 0;
-    for (String path : userPaths) {
-      File file = new File(path);
-      if (!file.exists()) {
-        checkState(file.mkdirs(), "OutOfCoreIOCallableFactory: cannot create " +
-            "directory " + file.getAbsolutePath());
-      }
-      basePaths[ptr] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
-      ptr++;
-    }
+    this.numIOThreads = numIOThreads;
+    this.results = new ArrayList<>(numIOThreads);
   }
 
   /**
@@ -90,11 +70,10 @@ public class OutOfCoreIOCallableFactory {
       new CallableFactory<Void>() {
         @Override
         public Callable<Void> newCallable(int callableId) {
-          return new OutOfCoreIOCallable(oocEngine, basePaths[callableId],
-              callableId);
+          return new OutOfCoreIOCallable(oocEngine, callableId);
         }
       };
-    outOfCoreIOExecutor = new ThreadPoolExecutor(numDisks, numDisks, 0L,
+    outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
         ThreadUtils.createThreadFactory("ooc-io-%d")) {
       @Override
@@ -120,7 +99,7 @@ public class OutOfCoreIOCallableFactory {
       }
     };
 
-    for (int i = 0; i < numDisks; ++i) {
+    for (int i = 0; i < numIOThreads; ++i) {
       Future<Void> future = outOfCoreIOExecutor.submit(
           new LogStacktraceCallable<>(
               outOfCoreIOCallableFactory.newCallable(i)));
@@ -131,15 +110,6 @@ public class OutOfCoreIOCallableFactory {
   }
 
   /**
-   * How many disks do we have?
-   *
-   * @return number of disks (IO threads)
-   */
-  public int getNumDisks() {
-    return numDisks;
-  }
-
-  /**
    * Check whether all IO threads terminated gracefully.
    */
   public void shutdown() {
@@ -156,7 +126,7 @@ public class OutOfCoreIOCallableFactory {
             "InterruptedException while waiting for IO threads to finish");
       }
     }
-    for (int i = 0; i < numDisks; ++i) {
+    for (int i = 0; i < numIOThreads; ++i) {
       try {
         // Check whether the tread terminated gracefully
         results.get(i).get();
@@ -170,15 +140,5 @@ public class OutOfCoreIOCallableFactory {
         throw new IllegalStateException(e);
       }
     }
-    for (String path : basePaths) {
-      File file = new File(path).getParentFile();
-      for (String subFileName : file.list()) {
-        File subFile = new File(file.getPath(), subFileName);
-        checkState(subFile.delete(), "shutdown: cannot delete file %s",
-            subFile.getAbsoluteFile());
-      }
-      checkState(file.delete(), "shutdown: cannot delete directory %s",
-          file.getAbsoluteFile());
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
index 6428c30..906607d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -18,15 +18,15 @@
 
 package org.apache.giraph.ooc;
 
-import com.google.common.hash.Hashing;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StoreDataBufferIOCommand;
-import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
+import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
+import org.apache.giraph.ooc.command.StorePartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
@@ -57,8 +57,6 @@ public class OutOfCoreIOScheduler {
   private final OutOfCoreEngine oocEngine;
   /** How much an IO thread should wait if there is no IO command */
   private final int waitInterval;
-  /** How many disks (i.e. IO threads) do we have? */
-  private final int numDisks;
   /**
    * Queue of IO commands for loading partitions to memory. Load commands are
    * urgent and should be done once loading data is a viable IO command.
@@ -77,7 +75,6 @@ public class OutOfCoreIOScheduler {
   OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
                        OutOfCoreEngine oocEngine, int numDisks) {
     this.oocEngine = oocEngine;
-    this.numDisks = numDisks;
     this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
     threadLoadCommandQueue = new ArrayList<>(numDisks);
     for (int i = 0; i < numDisks; ++i) {
@@ -88,17 +85,6 @@ public class OutOfCoreIOScheduler {
   }
 
   /**
-   * Get the thread id that is responsible for a particular partition
-   *
-   * @param partitionId id of the given partition
-   * @return id of the thread responsible for the given partition
-   */
-  public int getOwnerThreadId(int partitionId) {
-    int result = Hashing.murmur3_32().hashInt(partitionId).asInt() % numDisks;
-    return (result >= 0) ? result : (result + numDisks);
-  }
-
-  /**
    * Generate and return the next appropriate IO command for a given thread
    *
    * @param threadId id of the thread ready to execute the next IO command
@@ -254,8 +240,9 @@ public class OutOfCoreIOScheduler {
    * @param ioCommand IO command to add to the scheduler
    */
   public void addIOCommand(IOCommand ioCommand) {
-    int ownerThread = getOwnerThreadId(ioCommand.getPartitionId());
     if (ioCommand instanceof LoadPartitionIOCommand) {
+      int ownerThread = oocEngine.getMetaPartitionManager()
+          .getOwnerThreadId(ioCommand.getPartitionId());
       threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
     } else {
       throw new IllegalStateException("addIOCommand: IO command type is not " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
index a225a4c..44a0d2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.ooc.io.IOCommand.IOCommandType;
+import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
 import org.apache.log4j.Logger;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
deleted file mode 100644
index fa8e6bd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.ooc.io.IOCommand;
-
-/**
- * Interface for any out-of-core oracle. An out-of-core oracle is the brain of
- * the out-of-core mechanism, determining/deciding on out-of-core actions (load
- * or store) that should happen.
- */
-public interface OutOfCoreOracle {
-  /**
-   * Different types of IO actions that can potentially lead to a more desired
-   * state of computation for out-of-core mechanism. These actions are issued
-   * based on the status of the memory (memory pressure, rate of data transfer
-   * to memory, etc.)
-   */
-  enum IOAction {
-    /**
-     * Either of:
-     *    - storing incoming messages of any partition currently on disk, or
-     *    - storing incoming messages' raw data buffer of any partition
-     *      currently on disk, or
-     *    - storing partitions' raw data buffer for those partitions that are
-     *      currently on disk.
-     */
-    STORE_MESSAGES_AND_BUFFERS,
-    /**
-     * Storing a partition that is *processed* in the current iteration cycle.
-     * This action is also known as "soft store"
-     */
-    STORE_PROCESSED_PARTITION,
-    /**
-     * Storing a partition from memory on disk, prioritizing to *processed*
-     * partitions on memory. However, if there is no *processed* partition,
-     * store should happen at any cost, even if an *unprocessed* partition has
-     * to be stored. This action is also know as "hard store".
-     */
-    STORE_PARTITION,
-    /**
-     * Loading an *unprocessed* partition from disk to memory, only if there are
-     * *processed* partitions in memory. This action basically initiates a swap
-     * operation.
-     */
-    LOAD_TO_SWAP_PARTITION,
-    /**
-     * Loading an *unprocessed* partition from disk to memory. This action is
-     * also known as "soft load".
-     */
-    LOAD_UNPROCESSED_PARTITION,
-    /**
-     * Loading a partition (prioritizing *unprocessed* over *processed*) from
-     * disk to memory. Loading a *processed* partition to memory is a prefetch
-     * of that partition to be processed in the next superstep. This action is
-     * also known as "hard load".
-     */
-    LOAD_PARTITION,
-    /**
-     * Loading a partition regardless of the memory situation. An out-of-core
-     * mechanism may use this action to signal IO threads that it is allowed to
-     * load a partition that is specifically requested.
-     */
-    URGENT_LOAD_PARTITION
-  }
-
-  /**
-   * Get the next set of viable IO actions to help bring memory to a more
-   * desired state.
-   *
-   * @return an array of viable IO actions, sorted from highest priority to
-   *         lowest priority
-   */
-  IOAction[] getNextIOActions();
-
-  /**
-   * Whether a command is appropriate to bring the memory to a more desired
-   * state. A command is not executed unless it is approved by the oracle. This
-   * method is specially important where there are multiple IO threads
-   * performing IO operations for the out-of-core mechanism. The approval
-   * becomes significantly important to prevent all IO threads from performing
-   * identical command type, if that is a necessity. For instance, execution of
-   * a particular command type by only one thread may bring the memory to a
-   * desired state, and the rest of IO threads may perform other types of
-   * commands.
-   *
-   * @param command the IO command that is about to execute
-   * @return 'true' if the command is approved for execution. 'false' if the
-   *         command should not be executed
-   */
-  boolean approve(IOCommand command);
-
-  /**
-   * Notification of command completion. Oracle may update its status and commit
-   * the changes a command may cause.
-   *
-   * @param command the IO command that is completed
-   */
-  void commandCompleted(IOCommand command);
-
-  /**
-   * Notification of GC completion. Oracle may take certain decisions based on
-   * GC information (such as amount of time it took, memory it reclaimed, etc.)
-   *
-   * @param gcInfo GC information
-   */
-  void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
-
-  /**
-   * Shut down the out-of-core oracle. Necessary specifically for cases where
-   * out-of-core oracle is using additional monitoring threads.
-   */
-  void shutdown();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
deleted file mode 100644
index 0dfc9de..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.google.common.collect.Maps;
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.conf.FloatConfOption;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
-import org.apache.log4j.Logger;
-
-import java.lang.management.MemoryUsage;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Out-of-core oracle to adaptively control data kept in memory, with the goal
- * of keeping the memory state constantly at a desired state. This oracle
- * monitors GC behavior to keep track of memory pressure.
- *
- * After each GC is done, this oracle retrieve statistics about the memory
- * pressure (memory used, max memory, and how far away memory is compared to a
- * max optimal pressure). Based on the the past 2 recent memory statistics,
- * the oracle predicts the status of the memory, and sets the rate of load/store
- * of data from/to disk. If the rate of loading data from disk is 'l', and the
- * rate of storing data to disk is 's', the rate of data injection to memory
- * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
- * be based on the prediction of memory status.
- *
- * Assume that based on the previous GC call the memory usage at time t_0 is
- * m_0, and based on the most recent GC call the memory usage at time t_1 is
- * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
- * Assume that the ideal memory pressure happens when the memory usage is
- * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
- * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
- * injection rate to memory so far was i, the new injection rate should be:
- * i_new = i - (alpha - beta)
- */
-public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
-  /**
-   * The optimal memory pressure at which GC behavior is close to ideal. This
-   * fraction may be dependant on the GC strategy used for running a job, but
-   * generally should not be dependent on the graph processing application.
-   */
-  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
-          "The memory pressure (fraction of used memory) at which the job " +
-              "shows the optimal GC behavior. This fraction may be dependent " +
-              "on the GC strategy used in running the job.");
-
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SimpleGCMonitoringOracle.class);
-  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
-  private final float optimalMemoryPressure;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
-  /** Status of memory from the last GC call */
-  private GCObservation lastGCObservation;
-  /** Desired rate of data injection to memory */
-  private final AtomicLong desiredDiskToMemoryDataRate =
-      new AtomicLong(0);
-  /** Number of on the fly (outstanding) IO commands for each command type */
-  private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
-      Maps.newConcurrentMap();
-
-  /**
-   * Constructor
-   *
-   * @param conf configuration
-   * @param oocEngine out-of-core engine
-   */
-  public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
-                                  OutOfCoreEngine oocEngine) {
-    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
-    this.oocEngine = oocEngine;
-    this.lastGCObservation = new GCObservation(-1, 0, 0);
-    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
-      commandOccurrences.put(type, new AtomicInteger(0));
-    }
-  }
-
-  @Override
-  public synchronized void gcCompleted(GarbageCollectionNotificationInfo
-                                             gcInfo) {
-    long time = System.currentTimeMillis();
-    Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
-        .getMemoryUsageAfterGc();
-    long usedMemory = 0;
-    long maxMemory = 0;
-    for (MemoryUsage memDetail : memAfter.values()) {
-      usedMemory += memDetail.getUsed();
-      maxMemory += memDetail.getMax();
-    }
-    GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("gcCompleted: GC completed with: " + observation);
-    }
-    // Whether this is not the first GC call in the application
-    if (lastGCObservation.isValid()) {
-      long deltaDataRate =
-          lastGCObservation.getDesiredDeltaDataRate(observation);
-      long diskBandwidthEstimate =
-          oocEngine.getIOStatistics().getDiskBandwidth();
-      // Update the desired data injection rate to memory. The data injection
-      // rate cannot be less than -disk_bandwidth (the extreme case happens if
-      // we only do 'store'), and cannot be more than disk_bandwidth (the
-      // extreme case happens if we only do 'load').
-      long dataInjectionRate = desiredDiskToMemoryDataRate.get();
-      desiredDiskToMemoryDataRate.set(Math.max(
-          Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
-              diskBandwidthEstimate), -diskBandwidthEstimate));
-      if (LOG.isInfoEnabled()) {
-        LOG.info("gcCompleted: changing data injection rate from " +
-            String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
-            " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
-            1024.0 / 1024.0));
-      }
-    }
-    lastGCObservation = observation;
-  }
-
-  /**
-   * Get the current data injection rate to memory based on the commands ran
-   * in the history (retrieved from statistics collector), and outstanding
-   * commands issued by the IO scheduler.
-   *
-   * @return the current data injection rate to memory
-   */
-  private long getCurrentDataInjectionRate() {
-    long effectiveBytesTransferred = 0;
-    long effectiveDuration = 0;
-    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
-      OutOfCoreIOStatistics.BytesDuration stats =
-          oocEngine.getIOStatistics().getCommandTypeStats(type);
-      int occurrence = commandOccurrences.get(type).get();
-      long typeBytesTransferred = stats.getBytes();
-      long typeDuration = stats.getDuration();
-      // If there is an outstanding command, we still do not know how many bytes
-      // it will transfer, and how long it will take. So, we guesstimate these
-      // numbers based on other similar commands happened in the history. We
-      // simply take the average number of bytes transferred for the particular
-      // command, and we take average duration for the particular command. We
-      // should multiply these numbers by the number of outstanding commands of
-      // this particular command type.
-      if (stats.getOccurrence() != 0) {
-        typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
-            occurrence;
-        typeDuration += stats.getDuration() / stats.getOccurrence() *
-            occurrence;
-      }
-      if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
-        effectiveBytesTransferred += typeBytesTransferred;
-      } else {
-        // Store (data going out of memory), or wait (no data transferred)
-        effectiveBytesTransferred -= typeBytesTransferred;
-      }
-      effectiveDuration += typeDuration;
-    }
-    if (effectiveDuration == 0) {
-      return 0;
-    } else {
-      return effectiveBytesTransferred / effectiveDuration;
-    }
-  }
-
-  @Override
-  public IOAction[] getNextIOActions() {
-    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
-    long desiredRate = desiredDiskToMemoryDataRate.get();
-    long currentRate = getCurrentDataInjectionRate();
-    if (desiredRate > error) {
-      // 'l-s' is positive, we should do more load than store.
-      if (currentRate > desiredRate + error) {
-        // We should decrease 'l-s'. This can be done either by increasing 's'
-        // or issuing wait command. We prioritize wait over hard store.
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PROCESSED_PARTITION};
-      } else if (currentRate < desiredRate - error) {
-        // We should increase 'l-s'. We can simply load partitions/data.
-        return new IOAction[]{IOAction.LOAD_PARTITION};
-      } else {
-        // We are in a proper state and we should keep up with the rate. We can
-        // either soft store data or load data (hard load, since we desired rate
-        // is positive).
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PROCESSED_PARTITION,
-          IOAction.LOAD_PARTITION};
-      }
-    } else if (desiredRate < -error) {
-      // 'l-s' is negative, we should do more store than load.
-      if (currentRate < desiredRate - error) {
-        // We should increase 'l-s', but we should be cautious. We only do soft
-        // load, or wait.
-        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
-      } else if (currentRate > desiredRate + error) {
-        // We should reduce 'l-s', we do hard store.
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PARTITION};
-      } else {
-        // We should keep up with the rate. We can either soft store data, or
-        // soft load data.
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PROCESSED_PARTITION,
-          IOAction.LOAD_UNPROCESSED_PARTITION};
-      }
-    } else {
-      // 'l-s' is almost zero. If current rate is over the desired rate, we do
-      // soft store. If the current rate is below the desired rate, we do soft
-      // load.
-      if (currentRate > desiredRate + error) {
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PROCESSED_PARTITION};
-      } else if (currentRate < desiredRate - error) {
-        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
-      } else {
-        return new IOAction[]{
-          IOAction.STORE_MESSAGES_AND_BUFFERS,
-          IOAction.STORE_PROCESSED_PARTITION,
-          IOAction.LOAD_UNPROCESSED_PARTITION};
-      }
-    }
-  }
-
-  @Override
-  public synchronized boolean approve(IOCommand command) {
-    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
-    long desiredRate = desiredDiskToMemoryDataRate.get();
-    long currentRate = getCurrentDataInjectionRate();
-    // The command is denied iff the current rate is above the desired rate and
-    // we are doing load (instead of store), or the current rate is below the
-    // desired rate and we are doing store (instead of loading).
-    if (currentRate > desiredRate + error &&
-        command instanceof LoadPartitionIOCommand) {
-      return false;
-    }
-    if (currentRate < desiredRate - error &&
-        !(command instanceof LoadPartitionIOCommand) &&
-        !(command instanceof WaitIOCommand)) {
-      return false;
-    }
-    commandOccurrences.get(command.getType()).getAndIncrement();
-    return true;
-  }
-
-  @Override
-  public void commandCompleted(IOCommand command) {
-    commandOccurrences.get(command.getType()).getAndDecrement();
-  }
-
-  @Override
-  public void shutdown() { }
-
-  /** Helper class to record memory status after GC calls */
-  private class GCObservation {
-    /** The time at which the GC happened (in milliseconds) */
-    private long time;
-    /** Amount of memory used after the GC call */
-    private long usedMemory;
-    /** Maximum amounts of memory reported by GC listener */
-    private long maxMemory;
-
-    /**
-     * Constructor
-     *
-     * @param time time of GC
-     * @param usedMemory amount of used memory after GC
-     * @param maxMemory amount of all available memory based on GC observation
-     */
-    public GCObservation(long time, long usedMemory, long maxMemory) {
-      this.time = time;
-      this.usedMemory = usedMemory;
-      this.maxMemory = maxMemory;
-    }
-
-    /**
-     * Is this a valid observation?
-     *
-     * @return true iff it is a valid observation
-     */
-    public boolean isValid() {
-      return time > 0;
-    }
-
-    /**
-     * Considering a new observation of memory status after the most recent GC,
-     * what is the desired rate for data injection to memory.
-     *
-     * @param newObservation the most recent GC observation
-     * @return desired rate of data injection to memory
-     */
-    public long getDesiredDeltaDataRate(GCObservation newObservation) {
-      long newUsedMemory = newObservation.usedMemory;
-      long newMaxMemory = newObservation.maxMemory;
-      long lastUsedMemory = usedMemory;
-      long lastMaxMemory = maxMemory;
-      // Scale the memory status of two GC observation to be the same
-      long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
-      newUsedMemory =
-          (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
-      lastUsedMemory =
-          (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
-      long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
-            "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
-            "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
-            String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
-                1024.0));
-      }
-      long interval = newObservation.time - time;
-      if (interval == 0) {
-        interval = 1;
-        LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
-            "time!");
-      }
-      long currentDataRate = (long) ((double) (newUsedMemory -
-          lastUsedMemory) / interval * 1000);
-      long desiredDataRate = (long) ((double) (desiredUsedMemory -
-          newUsedMemory) / interval * 1000);
-      return currentDataRate - desiredDataRate;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
-          "time: %d ms)", usedMemory / 1024.0 / 1024.0,
-          maxMemory / 1024.0 / 1024.0, time);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
deleted file mode 100644
index 3e05dce..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
-import org.apache.giraph.comm.flow_control.FlowControl;
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.conf.FloatConfOption;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.LongConfOption;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Out-of-core oracle to adaptively control data kept in memory, with the goal
- * of keeping the memory usage at a desired state. Out-of-core policy in this
- * oracle is based on several user-defined thresholds. Also, this oracle spawns
- * a thread to periodically check the memory usage. This thread would issue
- * manual GC calls if JVM fails to call major/full GC for a while and the amount
- * of used memory is about to cause high-memory pressure. This oracle, also,
- * monitors GC activities. The monitoring mechanism looks for major/full GC
- * calls, and updates out-of-core decisions based on the amount of available
- * memory after such GCs. There are three out-of-core decisions:
- *  - Which IO operations should be done (load/offload of partitions and
- *    messages)
- *  - What the incoming messages rate should be (updating credits announced by
- *    this worker in credit-based flow-control mechanism)
- *  - How many processing threads should remain active (tethering rate of
- *    data generation)
- *
- * The following table shows the relationship of these decisions and
- * used-defined thresholds.
- * --------------------------------------------------------------
- * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
- * (memory usage)      |   GC?   | Action |          | Threads  |
- * --------------------------------------------------------------
- *                     |  Yes    | hard   |  0       |  0       |
- *                     |         | store  |          |          |
- * failPressure -------------------------------------------------
- *                     |  Yes    | hard   |  0       | fraction |
- *                     |         | store  |          |          |
- * emergencyPressure --------------------------------------------
- *                     |  Yes    | hard   | fraction |  max     |
- *                     |         | store  |          |          |
- * highPressure -------------------------------------------------
- *                     |  No     | soft   | fraction |  max     |
- *                     |         | store  |          |          |
- * optimalPressure ----------------------------------------------
- *                     |  No     | soft   |  max     |  max     |
- *                     |         | load   |          |          |
- * lowPressure --------------------------------------------------
- *                     |  No     | hard   |  max     |  max     |
- *                     |         | load   |          |          |
- * --------------------------------------------------------------
- *
- */
-public class ThresholdBasedOracle implements OutOfCoreOracle {
-  /** The memory pressure at/above which the job would fail */
-  public static final FloatConfOption FAIL_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.failPressure", 0.975f,
-          "The memory pressure (fraction of used memory) at/above which the " +
-              "job would fail.");
-  /**
-   * The memory pressure at which the job is cloe to fail, even though we were
-   * using maximal disk bandwidth and minimal network rate. We should reduce
-   * job processing rate.
-   */
-  public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
-          "The memory pressure (fraction of used memory) at which the job " +
-              "is close to fail, hence we should reduce its processing rate " +
-              "as much as possible.");
-  /** The memory pressure at which the job is suffering from GC overhead. */
-  public static final FloatConfOption HIGH_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.highPressure", 0.875f,
-          "The memory pressure (fraction of used memory) at which the job " +
-              "is suffering from GC overhead.");
-  /**
-   * The memory pressure at which we expect GC to perform optimally for a
-   * memory intensive job.
-   */
-  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
-          "The memory pressure (fraction of used memory) at which a " +
-              "memory-intensive job shows the optimal GC behavior.");
-  /**
-   * The memory pressure at/below which the job can use more memory without
-   * suffering from GC overhead.
-   */
-  public static final FloatConfOption LOW_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.lowPressure", 0.7f,
-          "The memory pressure (fraction of used memory) at/below which the " +
-              "job can use more memory without suffering the performance.");
-  /** The interval at which memory observer thread wakes up. */
-  public static final LongConfOption CHECK_MEMORY_INTERVAL =
-      new LongConfOption("giraph.checkMemoryInterval", 2500,
-          "The interval/period where memory observer thread wakes up and " +
-              "monitors memory footprint (in milliseconds)");
-  /**
-   * Memory observer thread would manually call GC if major/full GC has not
-   * been called for a while. The period where we expect GC to be happened in
-   * past is specified in this parameter
-   */
-  public static final LongConfOption LAST_GC_CALL_INTERVAL =
-      new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
-          "How long after last major/full GC should we call manual GC?");
-
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(ThresholdBasedOracle.class);
-  /** Cached value for FAIL_MEMORY_PRESSURE */
-  private final float failMemoryPressure;
-  /** Cached value for EMERGENCY_MEMORY_PRESSURE */
-  private final float emergencyMemoryPressure;
-  /** Cached value for HIGH_MEMORY_PRESSURE */
-  private final float highMemoryPressure;
-  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
-  private final float optimalMemoryPressure;
-  /** Cached value for LOW_MEMORY_PRESSURE */
-  private final float lowMemoryPressure;
-  /** Cached value for CHECK_MEMORY_INTERVAL */
-  private final long checkMemoryInterval;
-  /** Cached value for LAST_GC_CALL_INTERVAL */
-  private final long lastGCCallInterval;
-  /**
-   * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
-   * credit used for credit-based flow-control mechanism)
-   */
-  private final short maxRequestsCredit;
-  /**
-   * Whether the job is shutting down. Used for terminating the memory
-   * observer thread.
-   */
-  private final CountDownLatch shouldTerminate;
-  /** Result of memory observer thread */
-  private final Future<Void> checkMemoryThreadResult;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
-  /** Last time a major/full GC has been called (in milliseconds) */
-  private volatile long lastMajorGCTime;
-  /** Last time a non major/full GC has been called (in milliseconds) */
-  private volatile long lastMinorGCTime;
-
-  /**
-   * Constructor
-   *
-   * @param conf configuration
-   * @param oocEngine out-of-core engine
-   */
-  public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
-                              OutOfCoreEngine oocEngine) {
-    this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
-    this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
-    this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
-    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
-    this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
-    this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
-    this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
-    this.maxRequestsCredit = (short)
-        CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
-    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
-    boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
-    checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
-        "must be enabled. Use giraph.waitForPerWorkerRequests=true");
-    this.shouldTerminate = new CountDownLatch(1);
-    this.oocEngine = oocEngine;
-    this.lastMajorGCTime = 0;
-
-    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
-      @Override
-      public Callable<Void> newCallable(int callableId) {
-        return new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            while (true) {
-              boolean done = shouldTerminate.await(checkMemoryInterval,
-                  TimeUnit.MILLISECONDS);
-              if (done) {
-                break;
-              }
-              double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
-              long time = System.currentTimeMillis();
-              if ((usedMemoryFraction > highMemoryPressure &&
-                  time - lastMajorGCTime >= lastGCCallInterval) ||
-                  (usedMemoryFraction > optimalMemoryPressure &&
-                  time - lastMajorGCTime >= lastGCCallInterval &&
-                  time - lastMinorGCTime >= lastGCCallInterval)) {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("call: last GC happened a while ago and the " +
-                      "amount of used memory is high (used memory " +
-                      "fraction is " +
-                      String.format("%.2f", usedMemoryFraction) + "). " +
-                      "Calling GC manually");
-                }
-                System.gc();
-                time = System.currentTimeMillis() - time;
-                usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("call: manual GC is done. It took " +
-                      String.format("%.2f", (double) time / 1000) +
-                      " seconds. Used memory fraction is " +
-                      String.format("%.2f", usedMemoryFraction));
-                }
-              }
-              updateRates(usedMemoryFraction);
-            }
-            return null;
-          }
-        };
-      }
-    };
-    ExecutorService executor = Executors.newSingleThreadExecutor(
-        ThreadUtils.createThreadFactory("check-memory"));
-    this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
-        callableFactory.newCallable(0)));
-    executor.shutdown();
-  }
-
-  /**
-   * upon major/full GC calls.
-   */
-  /**
-   * Update statistics and rate regarding communication credits and number of
-   * active threads.
-   *
-   * @param usedMemoryFraction the fraction of used memory over max memory
-   */
-  public void updateRates(double usedMemoryFraction) {
-    // Update the fraction of processing threads that should remain active
-    if (usedMemoryFraction >= failMemoryPressure) {
-      oocEngine.updateActiveThreadsFraction(0);
-    } else if (usedMemoryFraction < emergencyMemoryPressure) {
-      oocEngine.updateActiveThreadsFraction(1);
-    } else {
-      oocEngine.updateActiveThreadsFraction(1 -
-          (usedMemoryFraction - emergencyMemoryPressure) /
-              (failMemoryPressure - emergencyMemoryPressure));
-    }
-
-    // Update the fraction of credit that should be used in credit-based flow-
-    // control
-    if (usedMemoryFraction >= emergencyMemoryPressure) {
-      updateRequestsCredit((short) 0);
-    } else if (usedMemoryFraction < optimalMemoryPressure) {
-      updateRequestsCredit(maxRequestsCredit);
-    } else {
-      updateRequestsCredit((short) (maxRequestsCredit *
-          (1 - (usedMemoryFraction - optimalMemoryPressure) /
-              (emergencyMemoryPressure - optimalMemoryPressure))));
-    }
-  }
-
-  @Override
-  public IOAction[] getNextIOActions() {
-    double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
-    if (LOG.isInfoEnabled()) {
-      LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
-          usedMemoryFraction));
-    }
-    if (usedMemoryFraction > highMemoryPressure) {
-      return new IOAction[]{
-        IOAction.STORE_MESSAGES_AND_BUFFERS,
-        IOAction.STORE_PARTITION};
-    } else if (usedMemoryFraction > optimalMemoryPressure) {
-      return new IOAction[]{
-        IOAction.LOAD_UNPROCESSED_PARTITION,
-        IOAction.STORE_MESSAGES_AND_BUFFERS,
-        IOAction.STORE_PROCESSED_PARTITION};
-    } else if (usedMemoryFraction > lowMemoryPressure) {
-      return new IOAction[]{
-        IOAction.LOAD_UNPROCESSED_PARTITION,
-        IOAction.STORE_MESSAGES_AND_BUFFERS,
-        IOAction.LOAD_PARTITION};
-    } else {
-      return new IOAction[]{IOAction.LOAD_PARTITION};
-    }
-  }
-
-  @Override
-  public boolean approve(IOCommand command) {
-    return true;
-  }
-
-  @Override
-  public void commandCompleted(IOCommand command) {
-    // Do nothing
-  }
-
-  @Override
-  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
-    String gcAction = gcInfo.getGcAction().toLowerCase();
-    if (gcAction.contains("full") || gcAction.contains("major")) {
-      if (!gcInfo.getGcCause().contains("No GC")) {
-        lastMajorGCTime = System.currentTimeMillis();
-      }
-    } else {
-      lastMinorGCTime = System.currentTimeMillis();
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    shouldTerminate.countDown();
-    try {
-      checkMemoryThreadResult.get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error("shutdown: caught exception while waiting on check-memory " +
-          "thread to terminate!");
-      throw new IllegalStateException(e);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
-    }
-  }
-
-  /**
-   * Update the credit announced for this worker in Netty. The lower the credit
-   * is, the lower rate incoming messages arrive at this worker. Thus, credit
-   * is an indirect way of controlling amount of memory incoming messages would
-   * take.
-   *
-   * @param newCredit the new credit to announce to other workers
-   */
-  private void updateRequestsCredit(short newCredit) {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
-    }
-    FlowControl flowControl = oocEngine.getFlowControl();
-    if (flowControl != null) {
-      ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
new file mode 100644
index 0000000..b6c986d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * Representation of an IO command (moving data to disk/memory) used in
+ * out-of-core mechanism.
+ */
+public abstract class IOCommand {
+  /** Type of IO command */
+  public enum IOCommandType {
+    /** Loading a partition */
+    LOAD_PARTITION,
+    /** Storing a partition */
+    STORE_PARTITION,
+    /** Storing incoming messages of a partition */
+    STORE_MESSAGE,
+    /**
+     * Storing message/buffer raw data buffer of a currently out-of-core
+     * partition
+     */
+    STORE_BUFFER,
+    /** Doing nothing regarding IO */
+    WAIT
+  }
+
+  /** Id of the partition involved for the IO */
+  protected final int partitionId;
+  /** Out-of-core engine */
+  protected final OutOfCoreEngine oocEngine;
+  /**
+   * Number of bytes transferred to/from memory (loaded/stored) during the
+   * execution of the command
+   */
+  protected long numBytesTransferred;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine Out-of-core engine
+   * @param partitionId Id of the partition involved in the IO
+   */
+  public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
+    this.oocEngine = oocEngine;
+    this.partitionId = partitionId;
+    this.numBytesTransferred = 0;
+  }
+
+  /**
+   * Get the id of the partition involved in the IO
+   *
+   * @return id of the partition
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Execute (load/store of data) the IO command, and change the data stores
+   * appropriately based on the data loaded/stored. Return true iff the command
+   * is actually executed (resulted in loading or storing data).
+   *
+   * @return whether the command is actually executed
+   * @throws IOException
+   */
+  public abstract boolean execute() throws IOException;
+
+  /**
+   * Get the type of the command.
+   *
+   * @return type of the command
+   */
+  public abstract IOCommandType getType();
+
+  /**
+   * Get the number of bytes transferred (loaded/stored from/to disk).
+   *
+   * @return number of bytes transferred during the execution of the command
+   */
+  public long bytesTransferred() {
+    return numBytesTransferred;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
new file mode 100644
index 0000000..ee12159
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps). Also, this command can be used to
+ * prefetch a partition to be processed in the next superstep.
+ */
+public class LoadPartitionIOCommand extends IOCommand {
+  /**
+   * Which superstep this partition should be loaded for? (can be current
+   * superstep or next superstep -- in case of prefetching).
+   */
+  private final long superstep;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to be loaded
+   * @param superstep superstep to load the partition for
+   */
+  public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
+                                long superstep) {
+    super(oocEngine, partitionId);
+    this.superstep = superstep;
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startLoadingPartition(partitionId, superstep)) {
+      long currentSuperstep = oocEngine.getSuperstep();
+      DiskBackedPartitionStore partitionStore =
+          (DiskBackedPartitionStore)
+              oocEngine.getServerData().getPartitionStore();
+      numBytesTransferred +=
+          partitionStore.loadPartitionData(partitionId);
+      if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
+          superstep == currentSuperstep) {
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+        numBytesTransferred +=
+            edgeStore.loadPartitionData(partitionId);
+      }
+      MessageStore messageStore;
+      if (currentSuperstep == superstep) {
+        messageStore = oocEngine.getServerData().getCurrentMessageStore();
+      } else {
+        Preconditions.checkState(superstep == currentSuperstep + 1);
+        messageStore = oocEngine.getServerData().getIncomingMessageStore();
+      }
+      if (messageStore != null) {
+        numBytesTransferred += ((DiskBackedMessageStore) messageStore)
+            .loadPartitionData(partitionId);
+      }
+      oocEngine.getMetaPartitionManager()
+          .doneLoadingPartition(partitionId, superstep);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.LOAD_PARTITION;
+  }
+
+  @Override
+  public String toString() {
+    return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
+        "superstep = " + superstep + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
new file mode 100644
index 0000000..beda796
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store raw data buffers on disk.
+ */
+public class StoreDataBufferIOCommand extends IOCommand {
+  /**
+   * Types of raw data buffer to offload to disk (either vertices/edges buffer
+   * in INPUT_SUPERSTEP or incoming message buffer).
+   */
+  public enum DataBufferType { PARTITION, MESSAGE };
+  /**
+   * Type of the buffer to store on disk.
+   */
+  private final DataBufferType type;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to offload its buffers
+   * @param type type of the buffer to store on disk
+   */
+  public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
+                                  int partitionId,
+                                  DataBufferType type) {
+    super(oocEngine, partitionId);
+    this.type = type;
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingBuffer(partitionId)) {
+      switch (type) {
+      case PARTITION:
+        DiskBackedPartitionStore partitionStore =
+            (DiskBackedPartitionStore)
+                oocEngine.getServerData().getPartitionStore();
+        numBytesTransferred +=
+            partitionStore.offloadBuffers(partitionId);
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+        numBytesTransferred += edgeStore.offloadBuffers(partitionId);
+        break;
+      case MESSAGE:
+        DiskBackedMessageStore messageStore =
+            (DiskBackedMessageStore)
+                oocEngine.getServerData().getIncomingMessageStore();
+        numBytesTransferred +=
+            messageStore.offloadBuffers(partitionId);
+        break;
+      default:
+        throw new IllegalStateException("execute: requested data buffer type " +
+            "does not exist!");
+      }
+      oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_BUFFER;
+  }
+
+  @Override
+  public String toString() {
+    return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
+        "type = " + type.name() + ")";
+  }
+}


[3/4] git commit: updated refs/heads/trunk to 3793c9e

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
new file mode 100644
index 0000000..b38f957
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * IOCommand to store incoming message of a particular partition.
+ */
+public class StoreIncomingMessageIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its incoming messages
+   */
+  public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
+                                       int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingMessages(partitionId)) {
+      DiskBackedMessageStore messageStore =
+          (DiskBackedMessageStore)
+              oocEngine.getServerData().getIncomingMessageStore();
+      checkState(messageStore != null);
+      numBytesTransferred +=
+          messageStore.offloadPartitionData(partitionId);
+      oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_MESSAGE;
+  }
+
+  @Override
+  public String toString() {
+    return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
new file mode 100644
index 0000000..31fa345
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps).
+ */
+public class StorePartitionIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its data
+   */
+  public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
+                                 int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    boolean executed = false;
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingPartition(partitionId)) {
+      DiskBackedPartitionStore partitionStore =
+          (DiskBackedPartitionStore)
+              oocEngine.getServerData().getPartitionStore();
+      numBytesTransferred +=
+          partitionStore.offloadPartitionData(partitionId);
+      if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+        MessageStore messageStore =
+            oocEngine.getServerData().getCurrentMessageStore();
+        if (messageStore != null) {
+          numBytesTransferred += ((DiskBackedMessageStore) messageStore)
+              .offloadPartitionData(partitionId);
+        }
+      } else {
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore)
+                oocEngine.getServerData().getEdgeStore();
+        numBytesTransferred +=
+            edgeStore.offloadPartitionData(partitionId);
+      }
+      oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+      executed = true;
+    }
+    return executed;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.STORE_PARTITION;
+  }
+
+  @Override
+  public String toString() {
+    return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
new file mode 100644
index 0000000..83540c1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IOCommand to do nothing regarding moving data to/from disk.
+ */
+public class WaitIOCommand extends IOCommand {
+  /** How long should the disk be idle? (in milliseconds) */
+  private final long waitDuration;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param waitDuration duration of wait
+   */
+  public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
+    super(oocEngine, -1);
+    this.waitDuration = waitDuration;
+  }
+
+  @Override
+  public boolean execute() throws IOException {
+    try {
+      TimeUnit.MILLISECONDS.sleep(waitDuration);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("execute: caught InterruptedException " +
+          "while IO thread is waiting!");
+    }
+    return true;
+  }
+
+  @Override
+  public IOCommandType getType() {
+    return IOCommandType.WAIT;
+  }
+
+  @Override
+  public String toString() {
+    return "WaitIOCommand: (duration = " + waitDuration + "ms)";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
new file mode 100644
index 0000000..930b139
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO commands in out-of-core mechanism
+ */
+package org.apache.giraph.ooc.command;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
new file mode 100644
index 0000000..7265410
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * This class provides basic operations for data structures that have to
+ * participate in out-of-core mechanism. Essential subclasses of this class are:
+ *  - DiskBackedPartitionStore (for partition data)
+ *  - DiskBackedMessageStore (for messages)
+ *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
+ * Basically, any data structure that may cause OOM to happen can be implemented
+ * as a subclass of this class.
+ *
+ * There are two different terms used in the rest of this class:
+ *  - "data store" refers to in-memory representation of data. Usually this is
+ *    stored per-partition in in-memory implementations of data structures. For
+ *    instance, "data store" of a DiskBackedPartitionStore would collection of
+ *    all partitions kept in the in-memory partition store within the
+ *    DiskBackedPartitionStore.
+ *  - "raw data buffer" refers to raw data which were supposed to be
+ *    de-serialized and added to the data store, but they remain 'as is' in the
+ *    memory because their corresponding partition is offloaded to disk and is
+ *    not available in the data store.
+ *
+ * @param <T> raw data format of the data store subclassing this class
+ */
+public abstract class DiskBackedDataStore<T> {
+  /**
+   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
+   * decide whether vertex/edge buffers are large enough to flush to disk.
+   */
+  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
+      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
+          "Minimum size of a buffer (in bytes) to flush to disk.");
+
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(
+      DiskBackedDataStore.class);
+  /** Out-of-core engine */
+  protected final OutOfCoreEngine oocEngine;
+  /**
+   * Set containing ids of all partitions where the partition data is in some
+   * file on disk.
+   * Note that the out-of-core mechanism may decide to put the data for a
+   * partition on disk, while the partition data is empty. For instance, at the
+   * beginning of a superstep, out-of-core mechanism may decide to put incoming
+   * messages of a partition on disk, while the partition has not received any
+   * messages. In such scenarios, the "out-of-core mechanism" thinks that the
+   * partition data is on disk, while disk-backed data stores may want to
+   * optimize for IO/metadata accesses and decide not to create/write anything
+   * on files on disk.
+   * In summary, there is a subtle difference between this field and
+   * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
+   * IO (mainly metadata) accesses by disk-backed stores, while
+   * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
+   * regarding partition storage statuses. Since out-of-core mechanism does not
+   * know about the actual data for a partition, these two fields have to be
+   * separate.
+   */
+  protected final Set<Integer> hasPartitionDataOnFile =
+      Sets.newConcurrentHashSet();
+  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
+  private final int minBufferSizeToOffload;
+  /** Set containing ids of all out-of-core partitions */
+  private final Set<Integer> hasPartitionDataOnDisk =
+      Sets.newConcurrentHashSet();
+  /**
+   * Map of partition ids to list of raw data buffers. The map will have entries
+   * only for partitions that their in-memory data structures are currently
+   * offloaded to disk. We keep the aggregate size of buffers for each partition
+   * as part of the values in the map to estimate how much memory we can free up
+   * if we offload data buffers of a particular partition to disk.
+   */
+  private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
+      Maps.newConcurrentMap();
+  /**
+   * Map of partition ids to number of raw data buffers offloaded to disk for
+   * each partition. The map will have entries only for partitions that their
+   * in-memory data structures are currently out of core. It is necessary to
+   * know the number of data buffers on disk for a particular partition when we
+   * are loading all these buffers back in memory.
+   */
+  private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
+      Maps.newConcurrentMap();
+  /**
+   * Lock to avoid overlapping of read and write on data associated with each
+   * partition.
+   * */
+  private final ConcurrentMap<Integer, ReadWriteLock> locks =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param oocEngine Out-of-core engine
+   */
+  DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
+                      OutOfCoreEngine oocEngine) {
+    this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
+    this.oocEngine = oocEngine;
+  }
+
+  /**
+   * Retrieves a lock for a given partition. If the lock for the given partition
+   * does not exist, creates a new lock.
+   *
+   * @param partitionId id of the partition the lock is needed for
+   * @return lock for a given partition
+   */
+  private ReadWriteLock getPartitionLock(int partitionId) {
+    ReadWriteLock readWriteLock = locks.get(partitionId);
+    if (readWriteLock == null) {
+      readWriteLock = new ReentrantReadWriteLock();
+      ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
+      if (temp != null) {
+        readWriteLock = temp;
+      }
+    }
+    return readWriteLock;
+  }
+
+  /**
+   * Adds a data entry for a given partition to the current data store. If data
+   * of a given partition in data store is already offloaded to disk, adds the
+   * data entry to appropriate raw data buffer list.
+   *
+   * @param partitionId id of the partition to add the data entry to
+   * @param entry data entry to add
+   */
+  protected void addEntry(int partitionId, T entry) {
+    // Addition of data entries to a data store is much more common than
+    // out-of-core operations. Besides, in-memory data store implementations
+    // existing in the code base already account for parallel addition to data
+    // stores. Therefore, using read lock would optimize for parallel addition
+    // to data stores, specially for cases where the addition should happen for
+    // partitions that are entirely in memory.
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.readLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      List<T> entryList = new ArrayList<>();
+      entryList.add(entry);
+      int entrySize = entrySerializedSize(entry);
+      MutablePair<Integer, List<T>> newPair =
+          new MutablePair<>(entrySize, entryList);
+      Pair<Integer, List<T>> oldPair =
+          dataBuffers.putIfAbsent(partitionId, newPair);
+      if (oldPair != null) {
+        synchronized (oldPair) {
+          newPair = (MutablePair<Integer, List<T>>) oldPair;
+          newPair.setLeft(oldPair.getLeft() + entrySize);
+          newPair.getRight().add(entry);
+        }
+      }
+    } else {
+      addEntryToInMemoryPartitionData(partitionId, entry);
+    }
+    rwLock.readLock().unlock();
+  }
+
+  /**
+   * Loads and assembles all data for a given partition, and put it into the
+   * data store. Returns the number of bytes transferred from disk to memory in
+   * the loading process.
+   *
+   * @param partitionId id of the partition to load and assemble all data for
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  public abstract long loadPartitionData(int partitionId) throws IOException;
+
+  /**
+   * The proxy method that does the actual operation for `loadPartitionData`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to load and assemble all data for
+   * @param index data index chain for the data to load
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  protected long loadPartitionDataProxy(int partitionId, DataIndex index)
+      throws IOException {
+    long numBytes = 0;
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      int ioThreadId =
+          oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+      numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
+          index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+      hasPartitionDataOnDisk.remove(partitionId);
+      // Loading raw data buffers from disk if there is any and applying those
+      // to already loaded in-memory data.
+      Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
+      if (numBuffers != null) {
+        checkState(numBuffers > 0);
+        index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
+        OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+            oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+        for (int i = 0; i < numBuffers; ++i) {
+          T entry = readNextEntry(inputWrapper.getDataInput());
+          addEntryToInMemoryPartitionData(partitionId, entry);
+        }
+        numBytes += inputWrapper.finalizeInput(true);
+        index.removeLastIndex();
+      }
+      index.removeLastIndex();
+      // Applying in-memory raw data buffers to in-memory partition data.
+      Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
+      if (pair != null) {
+        for (T entry : pair.getValue()) {
+          addEntryToInMemoryPartitionData(partitionId, entry);
+        }
+      }
+    }
+    rwLock.writeLock().unlock();
+    return numBytes;
+  }
+
+  /**
+   * Offloads partition data of a given partition in the data store to disk, and
+   * returns the number of bytes offloaded from memory to disk.
+   *
+   * @param partitionId id of the partition to offload its data
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  public abstract long offloadPartitionData(int partitionId) throws IOException;
+
+  /**
+   * The proxy method that does the actual operation for `offloadPartitionData`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to offload its data
+   * @param index data index chain for the data to offload
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+  protected long offloadPartitionDataProxy(
+      int partitionId, DataIndex index) throws IOException {
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    hasPartitionDataOnDisk.add(partitionId);
+    rwLock.writeLock().unlock();
+    int ioThreadId =
+        oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+    long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
+        index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+    index.removeLastIndex();
+    return numBytes;
+  }
+
+  /**
+   * Offloads raw data buffers of a given partition to disk, and returns the
+   * number of bytes offloaded from memory to disk.
+   *
+   * @param partitionId id of the partition to offload its raw data buffers
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  public abstract long offloadBuffers(int partitionId) throws IOException;
+
+  /**
+   * The proxy method that does the actual operation for `offloadBuffers`,
+   * but uses the data index given by the caller.
+   *
+   * @param partitionId id of the partition to offload its raw data buffers
+   * @param index data index chain for the data to offload its buffers
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  protected long offloadBuffersProxy(int partitionId, DataIndex index)
+      throws IOException {
+    Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
+    if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
+      return 0;
+    }
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    pair = dataBuffers.remove(partitionId);
+    rwLock.writeLock().unlock();
+    checkNotNull(pair);
+    checkState(!pair.getRight().isEmpty());
+    int ioThreadId =
+        oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+    index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
+        .addIndex(DataIndex.TypeIndexEntry.BUFFER);
+    OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+        oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+            true);
+    for (T entry : pair.getRight()) {
+      writeEntry(entry, outputWrapper.getDataOutput());
+    }
+    long numBytes = outputWrapper.finalizeOutput();
+    index.removeLastIndex().removeLastIndex();
+    int numBuffers = pair.getRight().size();
+    Integer oldNumBuffersOnDisk =
+        numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
+    if (oldNumBuffersOnDisk != null) {
+      numDataBuffersOnDisk.replace(partitionId,
+          oldNumBuffersOnDisk + numBuffers);
+    }
+    return numBytes;
+  }
+
+  /**
+   * Looks through all partitions that their data is not in the data store (is
+   * offloaded to disk), and sees if any of them has enough raw data buffer in
+   * memory. If so, puts that partition in a list to return.
+   *
+   * @return Set of partition ids of all partition raw buffers where the
+   *         aggregate size of buffers are large enough and it is worth flushing
+   *         those buffers to disk
+   */
+  public Set<Integer> getCandidateBuffersToOffload() {
+    Set<Integer> result = new HashSet<>();
+    for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
+        dataBuffers.entrySet()) {
+      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Writes a single raw entry to a given output stream.
+   *
+   * @param entry entry to write to output
+   * @param out output stream to write the entry to
+   * @throws IOException
+   */
+  protected abstract void writeEntry(T entry, DataOutput out)
+      throws IOException;
+
+  /**
+   * Reads the next available raw entry from a given input stream.
+   *
+   * @param in input stream to read the entry from
+   * @return entry read from an input stream
+   * @throws IOException
+   */
+  protected abstract T readNextEntry(DataInput in) throws IOException;
+
+  /**
+   * Loads data of a partition into data store. Returns number of bytes loaded.
+   *
+   * @param partitionId id of the partition to load its data
+   * @param ioThreadId id of the IO thread performing the load
+   * @param index data index chain for the data to load
+   * @return number of bytes loaded from disk to memory
+   * @throws IOException
+   */
+  protected abstract long loadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+  /**
+   * Offloads data of a partition in data store to disk. Returns the number of
+   * bytes offloaded to disk
+   *
+   * @param partitionId id of the partition to offload to disk
+   * @param ioThreadId id of the IO thread performing the offload
+   * @param index data index chain for the data to offload
+   * @return number of bytes offloaded from memory to disk
+   * @throws IOException
+   */
+  protected abstract long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+  /**
+   * Gets the size of a given entry in bytes.
+   *
+   * @param entry input entry to find its size
+   * @return size of given input entry in bytes
+   */
+  protected abstract int entrySerializedSize(T entry);
+
+  /**
+   * Adds a single entry for a given partition to the in-memory data store.
+   *
+   * @param partitionId id of the partition to add the data to
+   * @param entry input entry to add to the data store
+   */
+  protected abstract void addEntryToInMemoryPartitionData(int partitionId,
+                                                          T entry);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
index 53de52f..e727fbd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
@@ -21,25 +21,18 @@ package org.apache.giraph.ooc.data;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeStore;
 import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Implementation of an edge-store used for out-of-core mechanism.
  *
@@ -49,7 +42,7 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class DiskBackedEdgeStore<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends OutOfCoreDataManager<VertexIdEdges<I, E>>
+    extends DiskBackedDataStore<VertexIdEdges<I, E>>
     implements EdgeStore<I, V, E> {
   /** Class logger. */
   private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
@@ -57,8 +50,6 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
   private final EdgeStore<I, V, E> edgeStore;
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
 
   /**
    * Constructor
@@ -72,10 +63,9 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
       EdgeStore<I, V, E> edgeStore,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       OutOfCoreEngine oocEngine) {
-    super(conf);
+    super(conf, oocEngine);
     this.edgeStore = edgeStore;
     this.conf = conf;
-    this.oocEngine = oocEngine;
   }
 
   @Override
@@ -114,32 +104,25 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
         "should not be called for DiskBackedEdgeStore!");
   }
 
-  /**
-   * Gets the path that should be used specifically for edge data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return path to files specific for edge data
-   */
-  private static String getPath(String basePath) {
-    return basePath + "_edge_store";
-  }
-
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
-    return super.loadPartitionData(partitionId, getPath(basePath));
+    return loadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
-    return super.offloadPartitionData(partitionId, getPath(basePath));
+    return offloadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
-    return super.offloadBuffers(partitionId, getPath(basePath));
+    return offloadBuffersProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
   }
 
   @Override
@@ -157,44 +140,31 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long loadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
-    File file = new File(path);
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading edge data for " +
-            "partition " + partitionId + " from " + file.getAbsolutePath());
-      }
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream dis = new DataInputStream(bis);
-      edgeStore.readPartitionEdgeStore(partitionId, dis);
-      dis.close();
-      numBytes = file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
-          "%s.", file.getAbsoluteFile());
+    if (hasPartitionDataOnFile.remove(partitionId)) {
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+      edgeStore.readPartitionEdgeStore(partitionId,
+          inputWrapper.getDataInput());
+      numBytes = inputWrapper.finalizeInput(true);
     }
     return numBytes;
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (edgeStore.hasEdgesForPartition(partitionId)) {
-      File file = new File(path);
-      checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " +
-          "file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: cannot create edge store file %s",
-          file.getAbsoluteFile());
-      FileOutputStream fos = new FileOutputStream(file);
-      BufferedOutputStream bos = new BufferedOutputStream(fos);
-      DataOutputStream dos = new DataOutputStream(bos);
-      edgeStore.writePartitionEdgeStore(partitionId, dos);
-      dos.close();
-      numBytes = dos.size();
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+              false);
+      edgeStore.writePartitionEdgeStore(partitionId,
+          outputWrapper.getDataOutput());
+      numBytes = outputWrapper.finalizeOutput();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -205,7 +175,7 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  VertexIdEdges<I, E> edges) {
     oocEngine.getMetaPartitionManager().addPartition(partitionId);
     edgeStore.addPartitionEdges(partitionId, edges);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index 94ba83a..c8d0f79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -21,6 +21,10 @@ package org.apache.giraph.ooc.data;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -28,19 +32,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Implementation of a message store used for out-of-core mechanism.
  *
@@ -48,7 +43,7 @@ import static com.google.common.base.Preconditions.checkState;
  * @param <M> Message data
  */
 public class DiskBackedMessageStore<I extends WritableComparable,
-    M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>>
+    M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
     implements MessageStore<I, M> {
   /** Class logger. */
   private static final Logger LOG =
@@ -82,6 +77,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
    * Constructor
    *
    * @param config Configuration
+   * @param oocEngine Out-of-core engine
    * @param messageStore In-memory message store for which out-of-core message
    *                     store would be wrapper
    * @param useMessageCombiner Whether message combiner is used for this message
@@ -90,9 +86,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
    */
   public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
                                     config,
+                                OutOfCoreEngine oocEngine,
                                 MessageStore<I, M> messageStore,
                                 boolean useMessageCombiner, long superstep) {
-    super(config);
+    super(config, oocEngine);
     this.config = config;
     this.messageStore = messageStore;
     this.useMessageCombiner = useMessageCombiner;
@@ -140,43 +137,38 @@ public class DiskBackedMessageStore<I extends WritableComparable,
     }
   }
 
-  /**
-   * Gets the path that should be used specifically for message data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @param superstep superstep for which message data should be stored
-   * @return path to files specific for message data
-   */
-  private static String getPath(String basePath, long superstep) {
-    return basePath + "_messages-S" + superstep;
-  }
 
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return super.loadPartitionData(partitionId, getPath(basePath, superstep));
+      return loadPartitionDataProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return
-          super.offloadPartitionData(partitionId, getPath(basePath, superstep));
+      return offloadPartitionDataProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
     if (!useMessageCombiner) {
-      return super.offloadBuffers(partitionId, getPath(basePath, superstep));
+      return offloadBuffersProxy(partitionId,
+          new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+              .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
     } else {
       return 0;
     }
@@ -250,45 +242,31 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String basePath)
-      throws IOException {
+  protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+                                           DataIndex index) throws IOException {
     long numBytes = 0;
-    File file = new File(basePath);
-    if (file.exists()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading message data for " +
-            "partition " + partitionId + " from " + file.getAbsolutePath());
-      }
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream dis = new DataInputStream(bis);
-      messageStore.readFieldsForPartition(dis, partitionId);
-      dis.close();
-      numBytes = file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
-          "%s.", file.getAbsoluteFile());
+    if (hasPartitionDataOnFile.remove(partitionId)) {
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+      messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
+          partitionId);
+      numBytes = inputWrapper.finalizeInput(true);
     }
     return numBytes;
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String basePath)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (messageStore.hasMessagesForPartition(partitionId)) {
-      File file = new File(basePath);
-      checkState(!file.exists(), "offloadInMemoryPartitionData: message store" +
-          " file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: cannot create message store file %s",
-          file.getAbsoluteFile());
-      FileOutputStream fileout = new FileOutputStream(file);
-      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-      DataOutputStream outputStream = new DataOutputStream(bufferout);
-      messageStore.writePartition(outputStream, partitionId);
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+              false);
+      messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
       messageStore.clearPartition(partitionId);
-      outputStream.close();
-      numBytes += outputStream.size();
+      numBytes = outputWrapper.finalizeOutput();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -299,7 +277,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  VertexIdMessages<I, M>
                                                      messages) {
     messageStore.addPartitionMessages(partitionId, messages);

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
index 2a5e47a..6b7822f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -20,11 +20,12 @@ package org.apache.giraph.ooc.data;
 
 import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.utils.ExtendedDataOutput;
@@ -35,25 +36,17 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Implementation of a partition-store used for out-of-core mechanism.
  * Partition store is responsible for partition data, as well as data buffers in
- * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager --
+ * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
  * refers to vertex buffers in INPUT_SUPERSTEP).
  *
  * @param <I> Vertex id
@@ -62,7 +55,7 @@ import static com.google.common.base.Preconditions.checkState;
  */
 public class DiskBackedPartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends OutOfCoreDataManager<ExtendedDataOutput>
+    extends DiskBackedDataStore<ExtendedDataOutput>
     implements PartitionStore<I, V, E> {
   /** Class logger. */
   private static final Logger LOG =
@@ -71,10 +64,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Job context (for progress) */
   private final Mapper<?, ?, ?, ?>.Context context;
-  /** Service worker */
-  private final CentralizedServiceWorker<I, V, E> serviceWorker;
-  /** Out-of-core engine */
-  private final OutOfCoreEngine oocEngine;
   /** In-memory partition store */
   private final PartitionStore<I, V, E> partitionStore;
   /**
@@ -99,21 +88,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    *                       partition store would be a wrapper
    * @param conf Configuration
    * @param context Job context
-   * @param serviceWorker Service worker
    * @param oocEngine Out-of-core engine
    */
   public DiskBackedPartitionStore(
       PartitionStore<I, V, E> partitionStore,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker,
       OutOfCoreEngine oocEngine) {
-    super(conf);
+    super(conf, oocEngine);
     this.partitionStore = partitionStore;
     this.conf = conf;
     this.context = context;
-    this.serviceWorker = serviceWorker;
-    this.oocEngine = oocEngine;
   }
 
   @Override
@@ -222,36 +207,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Gets the path that should be used specifically for partition data.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return path to files specific for partition data
-   */
-  private static String getPath(String basePath) {
-    return basePath + "_partition";
-  }
-
-  /**
-   * Get the path to the file where vertices are stored.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return The path to the vertices file
-   */
-  private static String getVerticesPath(String basePath) {
-    return basePath + "_vertices";
-  }
-
-  /**
-   * Get the path to the file where edges are stored.
-   *
-   * @param basePath path prefix to build the actual path from
-   * @return The path to the edges file
-   */
-  private static String getEdgesPath(String basePath) {
-    return basePath + "_edges";
-  }
-
-  /**
    * Read vertex data from an input and initialize the vertex.
    *
    * @param in     The input stream
@@ -295,54 +250,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  protected long loadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+                                           DataIndex index) throws IOException {
     long numBytes = 0;
     // Load vertices
-    File file = new File(getVerticesPath(path));
-    if (file.exists()) {
+    if (hasPartitionDataOnFile.remove(partitionId)) {
       Partition<I, V, E> partition = conf.createPartition(partitionId, context);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading partition vertices " +
-            partitionId + " from " + file.getAbsolutePath());
-      }
-
-      FileInputStream fis = new FileInputStream(file);
-      BufferedInputStream bis = new BufferedInputStream(fis);
-      DataInputStream inputStream = new DataInputStream(bis);
-      long numVertices = inputStream.readLong();
+      OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+      OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+          dataAccessor.prepareInput(ioThreadId, index.copy());
+      DataInput dataInput = inputWrapper.getDataInput();
+      long numVertices = dataInput.readLong();
       for (long i = 0; i < numVertices; ++i) {
         Vertex<I, V, E> vertex = conf.createVertex();
-        readVertexData(inputStream, vertex);
+        readVertexData(dataInput, vertex);
         partition.putVertex(vertex);
       }
-      inputStream.close();
-      numBytes += file.length();
-      checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
-          "%s", file.getAbsolutePath());
+      numBytes += inputWrapper.finalizeInput(true);
 
       // Load edges
-      file = new File(getEdgesPath(path));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("loadInMemoryPartitionData: loading partition edges " +
-            partitionId + " from " + file.getAbsolutePath());
-      }
-
-      fis = new FileInputStream(file);
-      bis = new BufferedInputStream(fis);
-      inputStream = new DataInputStream(bis);
+      index.removeLastIndex()
+          .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
+      inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
+      dataInput = inputWrapper.getDataInput();
       for (int i = 0; i < numVertices; ++i) {
-        readOutEdges(inputStream, partition);
+        readOutEdges(dataInput, partition);
       }
-      inputStream.close();
-      numBytes += file.length();
       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
       // around.
+      boolean shouldDeleteEdges = false;
       if (!conf.isStaticGraph() ||
           oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
-        checkState(file.delete(), "loadPartition: failed to delete %s",
-            file.getAbsolutePath());
+        shouldDeleteEdges = true;
       }
+      numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
+      index.removeLastIndex();
       partitionStore.addPartition(partition);
     }
     return numBytes;
@@ -354,7 +297,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  protected void addEntryToImMemoryPartitionData(int partitionId,
+  protected void addEntryToInMemoryPartitionData(int partitionId,
                                                  ExtendedDataOutput vertices) {
     if (!partitionStore.hasPartition(partitionId)) {
       oocEngine.getMetaPartitionManager().addPartition(partitionId);
@@ -363,15 +306,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public long loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId)
       throws IOException {
-    return super.loadPartitionData(partitionId, getPath(basePath));
+    return loadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   @Override
-  public long offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId)
       throws IOException {
-    return super.offloadPartitionData(partitionId, getPath(basePath));
+    return offloadPartitionDataProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   /**
@@ -409,61 +354,44 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  protected long offloadInMemoryPartitionData(int partitionId, String path)
-      throws IOException {
+  protected long offloadInMemoryPartitionData(
+      int partitionId, int ioThreadId, DataIndex index) throws IOException {
     long numBytes = 0;
     if (partitionStore.hasPartition(partitionId)) {
+      OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
       partitionVertexCount.put(partitionId,
           partitionStore.getPartitionVertexCount(partitionId));
       partitionEdgeCount.put(partitionId,
           partitionStore.getPartitionEdgeCount(partitionId));
       Partition<I, V, E> partition =
           partitionStore.removePartition(partitionId);
-      File file = new File(getVerticesPath(path));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("offloadInMemoryPartitionData: writing partition vertices " +
-            partitionId + " to " + file.getAbsolutePath());
-      }
-      checkState(!file.exists(), "offloadInMemoryPartitionData: partition " +
-          "store file %s already exist", file.getAbsoluteFile());
-      checkState(file.createNewFile(),
-          "offloadInMemoryPartitionData: file %s already exists.",
-          file.getAbsolutePath());
-
-      FileOutputStream fileout = new FileOutputStream(file);
-      BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
-      DataOutputStream outputStream = new DataOutputStream(bufferout);
-      outputStream.writeLong(partition.getVertexCount());
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+      OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+          dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
+      DataOutput dataOutput = outputWrapper.getDataOutput();
+      dataOutput.writeLong(partition.getVertexCount());
       for (Vertex<I, V, E> vertex : partition) {
-        writeVertexData(outputStream, vertex);
+        writeVertexData(dataOutput, vertex);
       }
-      outputStream.close();
-      numBytes += outputStream.size();
-
+      numBytes += outputWrapper.finalizeOutput();
+      index.removeLastIndex();
       // Avoid writing back edges if we have already written them once and
       // the graph is not changing.
       // If we are in the input superstep, we need to write the files
       // at least the first time, even though the graph is static.
-      file = new File(getEdgesPath(path));
+      index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
       if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
-          partitionVertexCount.get(partitionId) == null ||
-          partitionVertexCount.get(partitionId) != partition.getVertexCount() ||
-          !conf.isStaticGraph() || !file.exists()) {
-        checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " +
-            "%s already exists.", file.getAbsolutePath());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("offloadInMemoryPartitionData: writing partition edges " +
-              partitionId + " to " + file.getAbsolutePath());
-        }
-        fileout = new FileOutputStream(file);
-        bufferout = new BufferedOutputStream(fileout);
-        outputStream = new DataOutputStream(bufferout);
+          !conf.isStaticGraph() ||
+          !dataAccessor.dataExist(ioThreadId, index)) {
+        outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
+            false);
         for (Vertex<I, V, E> vertex : partition) {
-          writeOutEdges(outputStream, vertex);
+          writeOutEdges(outputWrapper.getDataOutput(), vertex);
         }
-        outputStream.close();
-        numBytes += outputStream.size();
+        numBytes += outputWrapper.finalizeOutput();
       }
+      index.removeLastIndex();
+      hasPartitionDataOnFile.add(partitionId);
     }
     return numBytes;
   }
@@ -475,9 +403,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public long offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId)
       throws IOException {
-    return super.offloadBuffers(partitionId, getPath(basePath));
+    return offloadBuffersProxy(partitionId,
+        new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 1332a3a..64e3aed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -99,6 +99,21 @@ public class MetaPartitionManager {
    */
   private final AtomicDouble lowestGraphFractionInMemory =
       new AtomicDouble(1);
+  /**
+   * Map of partition ids to their indices. index of a partition is the order
+   * with which the partition has been inserted. Partitions are indexed as 0, 1,
+   * 2, etc. This indexing is later used to find the id of the IO thread who is
+   * responsible for handling a partition. Partitions are assigned to IO threads
+   * in a round-robin fashion based on their indices.
+   */
+  private final ConcurrentMap<Integer, Integer> partitionIndex =
+      Maps.newConcurrentMap();
+  /**
+   * Sequential counter used to assign indices to partitions as they are added
+   */
+  private final AtomicInteger indexCounter = new AtomicInteger(0);
+  /** How many disks (i.e. IO threads) do we have? */
+  private final int numIOThreads;
 
   /**
    * Constructor
@@ -117,6 +132,7 @@ public class MetaPartitionManager {
     }
     this.oocEngine = oocEngine;
     this.randomGenerator = new Random();
+    this.numIOThreads = numIOThreads;
   }
 
   /**
@@ -131,7 +147,7 @@ public class MetaPartitionManager {
   /**
    * Get total number of partitions
    *
-   * @return total number of partition
+   * @return total number of partitions
    */
   public int getNumPartitions() {
     return partitions.size();
@@ -175,6 +191,18 @@ public class MetaPartitionManager {
   }
 
   /**
+   * Get the thread id that is responsible for a particular partition
+   *
+   * @param partitionId id of the given partition
+   * @return id of the thread responsible for the given partition
+   */
+  public int getOwnerThreadId(int partitionId) {
+    Integer index = partitionIndex.get(partitionId);
+    checkState(index != null);
+    return index % numIOThreads;
+  }
+
+  /**
    * Add a partition
    *
    * @param partitionId id of a partition to add
@@ -184,8 +212,9 @@ public class MetaPartitionManager {
     MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
     // Check if the given partition is new
     if (temp == null) {
-      int ownerThread = oocEngine.getIOScheduler()
-          .getOwnerThreadId(partitionId);
+      int index = indexCounter.getAndIncrement();
+      checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
+      int ownerThread = getOwnerThreadId(partitionId);
       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
       numInMemoryPartitions.getAndIncrement();
     }
@@ -199,7 +228,7 @@ public class MetaPartitionManager {
    */
   public void removePartition(Integer partitionId) {
     MetaPartition meta = partitions.remove(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
     checkState(!meta.isOnDisk());
     numInMemoryPartitions.getAndDecrement();
@@ -424,7 +453,7 @@ public class MetaPartitionManager {
    */
   public void markPartitionAsInProcess(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.IN_PROCESS);
@@ -468,7 +497,7 @@ public class MetaPartitionManager {
    */
   public void setPartitionIsProcessed(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setProcessingState(ProcessingState.PROCESSED);
@@ -508,7 +537,7 @@ public class MetaPartitionManager {
   public void doneLoadingPartition(int partitionId, long superstep) {
     MetaPartition meta = partitions.get(partitionId);
     numInMemoryPartitions.getAndIncrement();
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.IN_MEM);
@@ -535,8 +564,7 @@ public class MetaPartitionManager {
    */
   public boolean startOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread =
-        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
         perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
@@ -558,8 +586,7 @@ public class MetaPartitionManager {
    */
   public void doneOffloadingMessages(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int ownerThread =
-        oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int ownerThread = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
       meta.setIncomingMessagesState(StorageState.ON_DISK);
@@ -598,7 +625,7 @@ public class MetaPartitionManager {
    */
   public boolean startOffloadingPartition(int partitionId) {
     MetaPartition meta = partitions.get(partitionId);
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
           (meta.getPartitionState() == StorageState.IN_MEM ||
@@ -624,7 +651,7 @@ public class MetaPartitionManager {
     numInMemoryPartitions.getAndDecrement();
     updateGraphFractionInMemory();
     MetaPartition meta = partitions.get(partitionId);
-    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    int owner = getOwnerThreadId(partitionId);
     synchronized (meta) {
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.setPartitionState(StorageState.ON_DISK);
@@ -639,8 +666,7 @@ public class MetaPartitionManager {
    */
   public void resetPartitions() {
     for (MetaPartition meta : partitions.values()) {
-      int owner =
-          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      int owner = getOwnerThreadId(meta.getPartitionId());
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetPartition();
       perThreadPartitionDictionary.get(owner).addPartition(meta);
@@ -659,8 +685,7 @@ public class MetaPartitionManager {
    */
   public void resetMessages() {
     for (MetaPartition meta : partitions.values()) {
-      int owner =
-          oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+      int owner = getOwnerThreadId(meta.getPartitionId());
       perThreadPartitionDictionary.get(owner).removePartition(meta);
       meta.resetMessages();
       if (meta.getPartitionState() == StorageState.IN_MEM &&

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
deleted file mode 100644
index 325850c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.data;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
-
-/**
- * This class provides basic operations for data structures that have to
- * participate in out-of-core mechanism. Essential subclasses of this class are:
- *  - DiskBackedPartitionStore (for partition data)
- *  - DiskBackedMessageStore (for messages)
- *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
- * Basically, any data structure that may cause OOM to happen can be implemented
- * as a subclass of this class.
- *
- * There are two different terms used in the rest of this class:
- *  - "data store" refers to in-memory representation of data. Usually this is
- *    stored per-partition in in-memory implementations of data structures. For
- *    instance, "data store" of a DiskBackedPartitionStore would collection of
- *    all partitions kept in the in-memory partition store within the
- *    DiskBackedPartitionStore.
- *  - "raw data buffer" refers to raw data which were supposed to be
- *    de-serialized and added to the data store, but they remain 'as is' in the
- *    memory because their corresponding partition is offloaded to disk and is
- *    not available in the data store.
- *
- * @param <T> raw data format of the data store subclassing this class
- */
-public abstract class OutOfCoreDataManager<T> {
-  /**
-   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
-   * decide whether vertex/edge buffers are large enough to flush to disk.
-   */
-  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
-      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
-          "Minimum size of a buffer (in bytes) to flush to disk.");
-
-  /** Class logger. */
-  private static final Logger LOG = Logger.getLogger(
-      OutOfCoreDataManager.class);
-  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
-  private final int minBufferSizeToOffload;
-  /** Set containing ids of all out-of-core partitions */
-  private final Set<Integer> hasPartitionDataOnDisk =
-      Sets.newConcurrentHashSet();
-  /**
-   * Map of partition ids to list of raw data buffers. The map will have entries
-   * only for partitions that their in-memory data structures are currently
-   * offloaded to disk. We keep the aggregate size of buffers for each partition
-   * as part of the values in the map to estimate how much memory we can free up
-   * if we offload data buffers of a particular partition to disk.
-   */
-  private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
-      Maps.newConcurrentMap();
-  /**
-   * Map of partition ids to number of raw data buffers offloaded to disk for
-   * each partition. The map will have entries only for partitions that their
-   * in-memory data structures are currently out of core. It is necessary to
-   * know the number of data buffers on disk for a particular partition when we
-   * are loading all these buffers back in memory.
-   */
-  private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
-      Maps.newConcurrentMap();
-  /**
-   * Lock to avoid overlapping of read and write on data associated with each
-   * partition.
-   * */
-  private final ConcurrentMap<Integer, ReadWriteLock> locks =
-      Maps.newConcurrentMap();
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration
-   */
-  OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) {
-    this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
-  }
-
-  /**
-   * Retrieves a lock for a given partition. If the lock for the given partition
-   * does not exist, creates a new lock.
-   *
-   * @param partitionId id of the partition the lock is needed for
-   * @return lock for a given partition
-   */
-  private ReadWriteLock getPartitionLock(int partitionId) {
-    ReadWriteLock readWriteLock = locks.get(partitionId);
-    if (readWriteLock == null) {
-      readWriteLock = new ReentrantReadWriteLock();
-      ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
-      if (temp != null) {
-        readWriteLock = temp;
-      }
-    }
-    return readWriteLock;
-  }
-
-  /**
-   * Adds a data entry for a given partition to the current data store. If data
-   * of a given partition in data store is already offloaded to disk, adds the
-   * data entry to appropriate raw data buffer list.
-   *
-   * @param partitionId id of the partition to add the data entry to
-   * @param entry data entry to add
-   */
-  protected void addEntry(int partitionId, T entry) {
-    // Addition of data entries to a data store is much more common than
-    // out-of-core operations. Besides, in-memory data store implementations
-    // existing in the code base already account for parallel addition to data
-    // stores. Therefore, using read lock would optimize for parallel addition
-    // to data stores, specially for cases where the addition should happen for
-    // partitions that are entirely in memory.
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.readLock().lock();
-    if (hasPartitionDataOnDisk.contains(partitionId)) {
-      List<T> entryList = new ArrayList<>();
-      entryList.add(entry);
-      int entrySize = entrySerializedSize(entry);
-      MutablePair<Integer, List<T>> newPair =
-          new MutablePair<>(entrySize, entryList);
-      Pair<Integer, List<T>> oldPair =
-          dataBuffers.putIfAbsent(partitionId, newPair);
-      if (oldPair != null) {
-        synchronized (oldPair) {
-          newPair = (MutablePair<Integer, List<T>>) oldPair;
-          newPair.setLeft(oldPair.getLeft() + entrySize);
-          newPair.getRight().add(entry);
-        }
-      }
-    } else {
-      addEntryToImMemoryPartitionData(partitionId, entry);
-    }
-    rwLock.readLock().unlock();
-  }
-
-  /**
-   * Loads and assembles all data for a given partition, and put it into the
-   * data store. Returns the number of bytes transferred from disk to memory in
-   * the loading process.
-   *
-   * @param partitionId id of the partition to load ana assemble all data for
-   * @param basePath path to load the data from
-   * @return number of bytes loaded from disk to memory
-   * @throws IOException
-   */
-  public long loadPartitionData(int partitionId, String basePath)
-      throws IOException {
-    long numBytes = 0;
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    if (hasPartitionDataOnDisk.contains(partitionId)) {
-      numBytes += loadInMemoryPartitionData(partitionId,
-          getPath(basePath, partitionId));
-      hasPartitionDataOnDisk.remove(partitionId);
-      // Loading raw data buffers from disk if there is any and applying those
-      // to already loaded in-memory data.
-      Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
-      if (numBuffers != null) {
-        checkState(numBuffers > 0);
-        File file = new File(getBuffersPath(basePath, partitionId));
-        checkState(file.exists());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" +
-              " partition " + partitionId + " from " + file.getAbsolutePath());
-        }
-        FileInputStream fis = new FileInputStream(file);
-        BufferedInputStream bis = new BufferedInputStream(fis);
-        DataInputStream dis = new DataInputStream(bis);
-        for (int i = 0; i < numBuffers; ++i) {
-          T entry = readNextEntry(dis);
-          addEntryToImMemoryPartitionData(partitionId, entry);
-        }
-        dis.close();
-        numBytes +=  file.length();
-        checkState(file.delete(), "loadPartitionData: failed to delete %s.",
-            file.getAbsoluteFile());
-      }
-      // Applying in-memory raw data buffers to in-memory partition data.
-      Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
-      if (pair != null) {
-        for (T entry : pair.getValue()) {
-          addEntryToImMemoryPartitionData(partitionId, entry);
-        }
-      }
-    }
-    rwLock.writeLock().unlock();
-    return numBytes;
-  }
-
-  /**
-   * Offloads partition data of a given partition in the data store to disk, and
-   * returns the number of bytes offloaded from memory to disk.
-   *
-   * @param partitionId id of the partition to offload its data
-   * @param basePath path to offload the data to
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
-  public long offloadPartitionData(int partitionId, String basePath)
-      throws IOException {
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    hasPartitionDataOnDisk.add(partitionId);
-    rwLock.writeLock().unlock();
-    return offloadInMemoryPartitionData(partitionId,
-        getPath(basePath, partitionId));
-  }
-
-  /**
-   * Offloads raw data buffers of a given partition to disk, and returns the
-   * number of bytes offloaded from memory to disk.
-   *
-   * @param partitionId id of the partition to offload its raw data buffers
-   * @param basePath path to offload the data to
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  public long offloadBuffers(int partitionId, String basePath)
-      throws IOException {
-    Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
-    if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
-      return 0;
-    }
-    ReadWriteLock rwLock = getPartitionLock(partitionId);
-    rwLock.writeLock().lock();
-    pair = dataBuffers.remove(partitionId);
-    rwLock.writeLock().unlock();
-    checkNotNull(pair);
-    checkState(!pair.getRight().isEmpty());
-    File file = new File(getBuffersPath(basePath, partitionId));
-    FileOutputStream fos = new FileOutputStream(file, true);
-    BufferedOutputStream bos = new BufferedOutputStream(fos);
-    DataOutputStream dos = new DataOutputStream(bos);
-    for (T entry : pair.getRight()) {
-      writeEntry(entry, dos);
-    }
-    dos.close();
-    long numBytes = dos.size();
-    int numBuffers = pair.getRight().size();
-    Integer oldNumBuffersOnDisk =
-        numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
-    if (oldNumBuffersOnDisk != null) {
-      numDataBuffersOnDisk.replace(partitionId,
-          oldNumBuffersOnDisk + numBuffers);
-    }
-    return numBytes;
-  }
-
-  /**
-   * Looks through all partitions that their data is not in the data store (is
-   * offloaded to disk), and sees if any of them has enough raw data buffer in
-   * memory. If so, puts that partition in a list to return.
-   *
-   * @return Set of partition ids of all partition raw buffers where the
-   *         aggregate size of buffers are large enough and it is worth flushing
-   *         those buffers to disk
-   */
-  public Set<Integer> getCandidateBuffersToOffload() {
-    Set<Integer> result = new HashSet<>();
-    for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
-        dataBuffers.entrySet()) {
-      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
-        result.add(entry.getKey());
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Creates the path to read/write partition data from/to for a given
-   * partition.
-   *
-   * @param basePath path prefix to create the actual path from
-   * @param partitionId id of the partition
-   * @return path to read/write data from/to
-   */
-  private static String getPath(String basePath, int partitionId) {
-    return basePath + "-P" + partitionId;
-  }
-
-  /**
-   * Creates the path to read/write raw data buffers of a given partition
-   * from/to.
-   *
-   * @param basePath path prefix to create the actual path from
-   * @param partitionId id of the partition
-   * @return path to read/write raw data buffer to/from
-   */
-  private static String getBuffersPath(String basePath, int partitionId) {
-    return getPath(basePath, partitionId) + "_buffers";
-  }
-
-  /**
-   * Writes a single raw entry to a given output stream.
-   *
-   * @param entry entry to write to output
-   * @param out output stream to write the entry to
-   * @throws IOException
-   */
-  protected abstract void writeEntry(T entry, DataOutput out)
-      throws IOException;
-
-  /**
-   * Reads the next available raw entry from a given input stream.
-   *
-   * @param in input stream to read the entry from
-   * @return entry read from an input stream
-   * @throws IOException
-   */
-  protected abstract T readNextEntry(DataInput in) throws IOException;
-
-  /**
-   * Loads data of a partition into data store. Returns number of bytes loaded.
-   *
-   * @param partitionId id of the partition to load its data
-   * @param path path from which data should be loaded
-   * @return number of bytes loaded from disk to memory
-   * @throws IOException
-   */
-  protected abstract long loadInMemoryPartitionData(int partitionId,
-                                                    String path)
-      throws IOException;
-
-  /**
-   * Offloads data of a partition in data store to disk. Returns the number of
-   * bytes offloaded to disk
-   *
-   * @param partitionId id of the partition to offload to disk
-   * @param path path to which data should be offloaded
-   * @return number of bytes offloaded from memory to disk
-   * @throws IOException
-   */
-  protected abstract long offloadInMemoryPartitionData(int partitionId,
-                                                       String path)
-      throws IOException;
-
-  /**
-   * Gets the size of a given entry in bytes.
-   *
-   * @param entry input entry to find its size
-   * @return size of given input entry in bytes
-   */
-  protected abstract int entrySerializedSize(T entry);
-
-  /**
-   * Adds a single entry for a given partition to the in-memory data store.
-   *
-   * @param partitionId id of the partition to add the data to
-   * @param entry input entry to add to the data store
-   */
-  protected abstract void addEntryToImMemoryPartitionData(int partitionId,
-                                                          T entry);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
deleted file mode 100644
index e84ad29..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-
-/**
- * Representation of an IO command (moving data to disk/memory) used in
- * out-of-core mechanism.
- */
-public abstract class IOCommand {
-  /** Type of IO command */
-  public enum IOCommandType {
-    /** Loading a partition */
-    LOAD_PARTITION,
-    /** Storing a partition */
-    STORE_PARTITION,
-    /** Storing incoming messages of a partition */
-    STORE_MESSAGE,
-    /**
-     * Storing message/buffer raw data buffer of a currently out-of-core
-     * partition
-     */
-    STORE_BUFFER,
-    /** Doing nothing regarding IO */
-    WAIT
-  }
-
-  /** Id of the partition involved for the IO */
-  protected final int partitionId;
-  /** Out-of-core engine */
-  protected final OutOfCoreEngine oocEngine;
-  /**
-   * Number of bytes transferred to/from memory (loaded/stored) during the
-   * execution of the command
-   */
-  protected long numBytesTransferred;
-
-  /**
-   * Constructor
-   *
-   * @param oocEngine Out-of-core engine
-   * @param partitionId Id of the partition involved in the IO
-   */
-  public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
-    this.oocEngine = oocEngine;
-    this.partitionId = partitionId;
-    this.numBytesTransferred = 0;
-  }
-
-  /**
-   * Get the id of the partition involved in the IO
-   *
-   * @return id of the partition
-   */
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  /**
-   * Execute (load/store of data) the IO command, and change the data stores
-   * appropriately based on the data loaded/stored. Return true iff the command
-   * is actually executed (resulted in loading or storing data).
-   *
-   * @param basePath the base path (prefix) to the files/folders IO command
-   *                 should read/write data from/to
-   * @return whether the command is actually executed
-   * @throws IOException
-   */
-  public abstract boolean execute(String basePath) throws IOException;
-
-  /**
-   * Get the type of the command.
-   *
-   * @return type of the command
-   */
-  public abstract IOCommandType getType();
-
-  /**
-   * Get the number of bytes transferred (loaded/stored from/to disk).
-   *
-   * @return number of bytes transferred during the execution of the command
-   */
-  public long bytesTransferred() {
-    return numBytesTransferred;
-  }
-}
-


[2/4] git commit: updated refs/heads/trunk to 3793c9e

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
deleted file mode 100644
index ce24fe2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-
-import java.io.IOException;
-
-/**
- * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
- * message data (if in compute supersteps). Also, this command can be used to
- * prefetch a partition to be processed in the next superstep.
- */
-public class LoadPartitionIOCommand extends IOCommand {
-  /**
-   * Which superstep this partition should be loaded for? (can be current
-   * superstep or next superstep -- in case of prefetching).
-   */
-  private final long superstep;
-
-  /**
-   * Constructor
-   *
-   * @param oocEngine out-of-core engine
-   * @param partitionId id of the partition to be loaded
-   * @param superstep superstep to load the partition for
-   */
-  public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
-                                long superstep) {
-    super(oocEngine, partitionId);
-    this.superstep = superstep;
-  }
-
-  @Override
-  public boolean execute(String basePath) throws IOException {
-    boolean executed = false;
-    if (oocEngine.getMetaPartitionManager()
-        .startLoadingPartition(partitionId, superstep)) {
-      long currentSuperstep = oocEngine.getSuperstep();
-      DiskBackedPartitionStore partitionStore =
-          (DiskBackedPartitionStore)
-              oocEngine.getServerData().getPartitionStore();
-      numBytesTransferred +=
-          partitionStore.loadPartitionData(partitionId, basePath);
-      if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
-          superstep == currentSuperstep) {
-        DiskBackedEdgeStore edgeStore =
-            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
-        numBytesTransferred +=
-            edgeStore.loadPartitionData(partitionId, basePath);
-      }
-      MessageStore messageStore;
-      if (currentSuperstep == superstep) {
-        messageStore = oocEngine.getServerData().getCurrentMessageStore();
-      } else {
-        Preconditions.checkState(superstep == currentSuperstep + 1);
-        messageStore = oocEngine.getServerData().getIncomingMessageStore();
-      }
-      if (messageStore != null) {
-        numBytesTransferred += ((DiskBackedMessageStore) messageStore)
-            .loadPartitionData(partitionId, basePath);
-      }
-      oocEngine.getMetaPartitionManager()
-          .doneLoadingPartition(partitionId, superstep);
-      executed = true;
-    }
-    return executed;
-  }
-
-  @Override
-  public IOCommandType getType() {
-    return IOCommandType.LOAD_PARTITION;
-  }
-
-  @Override
-  public String toString() {
-    return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
-        "superstep = " + superstep + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
deleted file mode 100644
index f1769dd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-
-import java.io.IOException;
-
-/**
- * IOCommand to store raw data buffers on disk.
- */
-public class StoreDataBufferIOCommand extends IOCommand {
-  /**
-   * Types of raw data buffer to offload to disk (either vertices/edges buffer
-   * in INPUT_SUPERSTEP or incoming message buffer).
-   */
-  public enum DataBufferType { PARTITION, MESSAGE };
-  /**
-   * Type of the buffer to store on disk.
-   */
-  private final DataBufferType type;
-
-  /**
-   * Constructor
-   *
-   * @param oocEngine out-of-core engine
-   * @param partitionId id of the partition to offload its buffers
-   * @param type type of the buffer to store on disk
-   */
-  public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
-                                  int partitionId,
-                                  DataBufferType type) {
-    super(oocEngine, partitionId);
-    this.type = type;
-  }
-
-  @Override
-  public boolean execute(String basePath) throws IOException {
-    boolean executed = false;
-    if (oocEngine.getMetaPartitionManager()
-        .startOffloadingBuffer(partitionId)) {
-      switch (type) {
-      case PARTITION:
-        DiskBackedPartitionStore partitionStore =
-            (DiskBackedPartitionStore)
-                oocEngine.getServerData().getPartitionStore();
-        numBytesTransferred +=
-            partitionStore.offloadBuffers(partitionId, basePath);
-        DiskBackedEdgeStore edgeStore =
-            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
-        numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath);
-        break;
-      case MESSAGE:
-        DiskBackedMessageStore messageStore =
-            (DiskBackedMessageStore)
-                oocEngine.getServerData().getIncomingMessageStore();
-        numBytesTransferred +=
-            messageStore.offloadBuffers(partitionId, basePath);
-        break;
-      default:
-        throw new IllegalStateException("execute: requested data buffer type " +
-            "does not exist!");
-      }
-      oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
-      executed = true;
-    }
-    return executed;
-  }
-
-  @Override
-  public IOCommandType getType() {
-    return IOCommandType.STORE_BUFFER;
-  }
-
-  @Override
-  public String toString() {
-    return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
-        "type = " + type.name() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
deleted file mode 100644
index c9d8829..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * IOCommand to store incoming message of a particular partition.
- */
-public class StoreIncomingMessageIOCommand extends IOCommand {
-  /**
-   * Constructor
-   *
-   * @param oocEngine out-of-core engine
-   * @param partitionId id of the partition to store its incoming messages
-   */
-  public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
-                                       int partitionId) {
-    super(oocEngine, partitionId);
-  }
-
-  @Override
-  public boolean execute(String basePath) throws IOException {
-    boolean executed = false;
-    if (oocEngine.getMetaPartitionManager()
-        .startOffloadingMessages(partitionId)) {
-      DiskBackedMessageStore messageStore =
-          (DiskBackedMessageStore)
-              oocEngine.getServerData().getIncomingMessageStore();
-      checkState(messageStore != null);
-      numBytesTransferred +=
-          messageStore.offloadPartitionData(partitionId, basePath);
-      oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
-      executed = true;
-    }
-    return executed;
-  }
-
-  @Override
-  public IOCommandType getType() {
-    return IOCommandType.STORE_MESSAGE;
-  }
-
-  @Override
-  public String toString() {
-    return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
deleted file mode 100644
index 797ac9d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-
-/**
- * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
- * message data (if in compute supersteps).
- */
-public class StorePartitionIOCommand extends IOCommand {
-  /**
-   * Constructor
-   *
-   * @param oocEngine out-of-core engine
-   * @param partitionId id of the partition to store its data
-   */
-  public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
-                                 int partitionId) {
-    super(oocEngine, partitionId);
-  }
-
-  @Override
-  public boolean execute(String basePath) throws IOException {
-    boolean executed = false;
-    if (oocEngine.getMetaPartitionManager()
-        .startOffloadingPartition(partitionId)) {
-      DiskBackedPartitionStore partitionStore =
-          (DiskBackedPartitionStore)
-              oocEngine.getServerData().getPartitionStore();
-      numBytesTransferred +=
-          partitionStore.offloadPartitionData(partitionId, basePath);
-      if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
-        MessageStore messageStore =
-            oocEngine.getServerData().getCurrentMessageStore();
-        if (messageStore != null) {
-          numBytesTransferred += ((DiskBackedMessageStore) messageStore)
-              .offloadPartitionData(partitionId, basePath);
-        }
-      } else {
-        DiskBackedEdgeStore edgeStore =
-            (DiskBackedEdgeStore)
-                oocEngine.getServerData().getEdgeStore();
-        numBytesTransferred +=
-            edgeStore.offloadPartitionData(partitionId, basePath);
-      }
-      oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
-      executed = true;
-    }
-    return executed;
-  }
-
-  @Override
-  public IOCommandType getType() {
-    return IOCommandType.STORE_PARTITION;
-  }
-
-  @Override
-  public String toString() {
-    return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
deleted file mode 100644
index 74e72eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * IOCommand to do nothing regarding moving data to/from disk.
- */
-public class WaitIOCommand extends IOCommand {
-  /** How long should the disk be idle? (in milliseconds) */
-  private final long waitDuration;
-
-  /**
-   * Constructor
-   *
-   * @param oocEngine out-of-core engine
-   * @param waitDuration duration of wait
-   */
-  public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
-    super(oocEngine, -1);
-    this.waitDuration = waitDuration;
-  }
-
-  @Override
-  public boolean execute(String basePath) throws IOException {
-    try {
-      TimeUnit.MILLISECONDS.sleep(waitDuration);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("execute: caught InterruptedException " +
-          "while IO thread is waiting!");
-    }
-    return true;
-  }
-
-  @Override
-  public IOCommandType getType() {
-    return IOCommandType.WAIT;
-  }
-
-  @Override
-  public String toString() {
-    return "WaitIOCommand: (duration = " + waitDuration + "ms)";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
deleted file mode 100644
index 2230ec4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of classes related to IO operations in out-of-core mechanism
- */
-package org.apache.giraph.ooc.io;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
new file mode 100644
index 0000000..d44204b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Index chain used in out-of-core data accessor object (DAO) to access
+ * serialized data.
+ */
+public class DataIndex {
+  /** Chain of data indices */
+  private final List<DataIndexEntry> indexList = new ArrayList<>(5);
+
+  /**
+   * Add an index to the index chain
+   *
+   * @param entry the entry to add to the chain
+   * @return the index chain itself
+   */
+  public DataIndex addIndex(DataIndexEntry entry) {
+    indexList.add(entry);
+    return this;
+  }
+
+  /**
+   * Remove/Pop the last index in the index chain
+   *
+   * @return the index chain itself
+   */
+  public DataIndex removeLastIndex() {
+    indexList.remove(indexList.size() - 1);
+    return this;
+  }
+
+  /**
+   * Create a copy of the existing DataIndex
+   *
+   * @return a copy of the existing index chain
+   */
+  public DataIndex copy() {
+    DataIndex index = new DataIndex();
+    for (DataIndexEntry entry : indexList) {
+      index.indexList.add(entry);
+    }
+    return index;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof DataIndex)) {
+      return false;
+    }
+    DataIndex dataIndex = (DataIndex) obj;
+    return indexList.equals(dataIndex.indexList);
+  }
+
+  @Override
+  public int hashCode() {
+    return indexList.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (DataIndexEntry entry : indexList) {
+      sb.append(entry);
+    }
+    return sb.toString();
+  }
+
+  /** Interface to unify different types of entries used as index chain */
+  public interface DataIndexEntry { }
+
+  /**
+   * Different static types of index chain entry
+   */
+  public enum TypeIndexEntry implements DataIndexEntry {
+    /** The whole partition */
+    PARTITION("_partition"),
+    /** Partition vertices */
+    PARTITION_VERTICES("_vertices"),
+    /** Partition edges */
+    PARTITION_EDGES("_edges"),
+    /** Partition messages */
+    MESSAGE("_messages"),
+    /** Edges stored in edge store for a partition */
+    EDGE_STORE("_edge_store"),
+    /** Raw data buffer (refer to DiskBackedDataStore) */
+    BUFFER("_buffer");
+
+    /** String realization of entry type */
+    private final String name;
+
+    /**
+     * Constructor
+     *
+     * @param name name of the type
+     */
+    TypeIndexEntry(String name) {
+      this.name = name;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+  /**
+   * Class representing any index chain that depends on something with id.
+   * Generally this is used for identifying indices in two types:
+   *  - Index entry based on superstep id ('S' and the superstep number)
+   *  - Index entry based on partition id ('P' and the partition id)
+   */
+  public static final class NumericIndexEntry implements DataIndexEntry {
+    /** Type of index */
+    private final char type;
+    /** Id of the index associated with the specified type */
+    private final long id;
+
+    /**
+     * Constructor
+     *
+     * @param type type of index (for now 'S' for superstep, or 'P' for
+     *             partition)
+     * @param id id of the index associated with the given type
+     */
+    private NumericIndexEntry(char type, long id) {
+      this.type = type;
+      this.id = id;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof NumericIndexEntry)) {
+        return false;
+      }
+      NumericIndexEntry index = (NumericIndexEntry) obj;
+      return index.type == type && index.id == id;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 17;
+      result = result * 37 + type;
+      result = result * 37 + (int) id;
+      result = result * 37 + (int) (id >> 32);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("_%c%d", type, id);
+    }
+
+    /**
+     * Create a data index entry for a given partition
+     *
+     * @param partitionId id of the partition
+     * @return data index entry for a given partition
+     */
+    public static NumericIndexEntry createPartitionEntry(int partitionId) {
+      return new NumericIndexEntry('P', partitionId);
+    }
+
+    /**
+     * Create a data index entry for a given superstep
+     *
+     * @param superstep the superstep number
+     * @return data index entry for a given superstep
+     */
+    public static NumericIndexEntry createSuperstepEntry(long superstep) {
+      return new NumericIndexEntry('S', superstep);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
new file mode 100644
index 0000000..2e42906
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.KryoDataInput;
+import com.esotericsoftware.kryo.io.KryoDataOutput;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * Data accessor object to read/write data in local disk.
+ * Note: This class assumes that the data are partitioned across IO threads,
+ *       i.e. each part of data can be accessed by one and only one IO thread
+ *       throughout the execution. Also, each IO thread reads a particular
+ *       type of data completely and, only then, it can read other type of data;
+ *       i.e. an IO thread cannot be used to read two different files at the
+ *       same time. These assumptions are based on the assumptions that the
+ *       current out-of-core mechanism is designed for.
+ */
+public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
+  /**
+   * Size of the buffer used for (de)serializing data when reading/writing
+   * from/to disk
+   */
+  public static final IntConfOption OOC_DISK_BUFFER_SIZE =
+      new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
+          "size of the buffer when (de)serializing data for reading/writing " +
+              "from/to disk");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(LocalDiskDataAccessor.class);
+  /**
+   * In-memory buffer used for (de)serializing data when reading/writing
+   * from/to disk using Kryo
+   */
+  private final byte[][] perThreadBuffers;
+  /** Path prefix for different disks */
+  private final String[] basePaths;
+  /** How many disks (i.e. IO threads) do we have? */
+  private final int numDisks;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public LocalDiskDataAccessor(
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+    // Take advantage of multiple disks
+    String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
+    this.numDisks = userPaths.length;
+    if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
+        GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
+      LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
+          "out-of-core threads is only specified by the number of " +
+          "directories given by 'giraph.partitionsDirectory' flag! Now using " +
+          numDisks + " IO threads!");
+    }
+    this.basePaths = new String[numDisks];
+    int ptr = 0;
+    String jobId = conf.getJobId();
+    for (String path : userPaths) {
+      File file = new File(path);
+      if (!file.exists()) {
+        checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
+            "directory " + file.getAbsolutePath());
+      }
+      basePaths[ptr] = path + "/" + jobId;
+      ptr++;
+    }
+    final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
+    this.perThreadBuffers = new byte[numDisks][diskBufferSize];
+  }
+
+  @Override
+  public void initialize() { }
+
+  @Override
+  public void shutdown() {
+    for (String path : basePaths) {
+      File file = new File(path).getParentFile();
+      for (String subFileName : file.list()) {
+        File subFile = new File(file.getPath(), subFileName);
+        checkState(subFile.delete(), "shutdown: cannot delete file %s",
+            subFile.getAbsoluteFile());
+      }
+      checkState(file.delete(), "shutdown: cannot delete directory %s",
+          file.getAbsoluteFile());
+    }
+  }
+
+  @Override
+  public int getNumAccessorThreads() {
+    return numDisks;
+  }
+
+  @Override
+  public DataInputWrapper prepareInput(int threadId, DataIndex index)
+      throws IOException {
+    return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
+        perThreadBuffers[threadId]);
+  }
+
+  @Override
+  public DataOutputWrapper prepareOutput(
+      int threadId, DataIndex index, boolean shouldAppend) throws IOException {
+    return new LocalDiskDataOutputWrapper(
+        basePaths[threadId] + index.toString(), shouldAppend,
+        perThreadBuffers[threadId]);
+  }
+
+  @Override
+  public boolean dataExist(int threadId, DataIndex index) {
+    return new File(basePaths[threadId] + index.toString()).exists();
+  }
+
+  /** Implementation of <code>DataInput</code> wrapper for local disk reader */
+  private static class LocalDiskDataInputWrapper implements DataInputWrapper {
+    /** File used to read the data from */
+    private final File file;
+    /** Kryo's handle to read the data */
+    private final Input input;
+
+    /**
+     * Constructor
+     *
+     * @param fileName file name
+     * @param buffer reusable byte buffer that will be used in Kryo's Input
+     *               reader
+     * @throws IOException
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        "OBL_UNSATISFIED_OBLIGATION")
+    LocalDiskDataInputWrapper(String fileName, byte[] buffer)
+        throws IOException {
+      file = new File(fileName);
+      LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " +
+          "file " + file.getAbsolutePath());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
+            "local file " + file.getAbsolutePath());
+      }
+      input = new UnsafeInput(buffer);
+      input.setInputStream(new FileInputStream(
+          new RandomAccessFile(file, "r").getFD()));
+    }
+
+    @Override
+    public DataInput getDataInput() {
+      return new KryoDataInput(input);
+    }
+
+    @Override
+    public long finalizeInput(boolean deleteOnClose) {
+      input.close();
+      long count = input.total();
+      checkState(!deleteOnClose || file.delete(),
+          "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
+      return count;
+    }
+  }
+
+  /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
+  private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
+    /** File used to write the data to */
+    private final File file;
+    /** Kryo's handle to write the date */
+    private final Output output;
+
+    /**
+     * Constructor
+     *
+     * @param fileName file name
+     * @param shouldAppend whether the <code>DataOutput</code> should be used
+     *                     for appending to already existing files
+     * @param buffer reusable byte buffer that will be used in Kryo's Output
+     *               writer
+     * @throws IOException
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+        "OBL_UNSATISFIED_OBLIGATION")
+    LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
+                               byte[] buffer) throws IOException {
+      file = new File(fileName);
+      LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " +
+          "local file " + file.getAbsolutePath());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
+            "local file " + file.getAbsolutePath());
+        if (!shouldAppend) {
+          checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
+              "already exist", file.getAbsoluteFile());
+          checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
+              "cannot create file %s", file.getAbsolutePath());
+        }
+      }
+      output = new UnsafeOutput(buffer);
+      RandomAccessFile raf = new RandomAccessFile(file, "rw");
+      if (shouldAppend) {
+        raf.seek(file.length());
+      }
+      output.setOutputStream(new FileOutputStream(raf.getFD()));
+    }
+
+    @Override
+    public DataOutput getDataOutput() {
+      return new KryoDataOutput(output);
+    }
+
+
+    @Override
+    public long finalizeOutput() {
+      output.close();
+      long count = output.total();
+      return count;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
new file mode 100644
index 0000000..d4ddc62
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Interface representing data accessor object (DAO) used as persistence layer
+ * in out-of-core mechanism.
+ * Note: any class implementing this interface should have one and only one
+ *       constructor taking one and only one argument of type
+ *       <code>ImmutableClassesGiraphConfiguration</code>
+ */
+public interface OutOfCoreDataAccessor {
+  /** Initialize the DAO */
+  void initialize();
+
+  /** Shut down the DAO */
+  void shutdown();
+
+  /**
+   * @return the number of threads involved in data persistence
+   */
+  int getNumAccessorThreads();
+
+  /**
+   * Prepare a wrapper containing <code>DataInput</code> representation for a
+   * given thread involved in persistence for a given index chain for data.
+   *
+   * @param threadId id of the thread involved in persistence
+   * @param index index chain of the data to access the serialized data form
+   * @return the wrapper for <code>DataInput</code> representation of data
+   * @throws IOException
+   */
+  DataInputWrapper prepareInput(int threadId, DataIndex index)
+      throws IOException;
+
+  /**
+   * Prepare a wrapper containing <code>DataOutput</code> representation for a
+   * given thread involved in persistence for a given index chain for data.
+   *
+   * @param threadId id of the thread involved in persistence
+   * @param index index chain of the data to access the serialized data form
+   * @param shouldAppend whether the <code>DataOutput</code> should be used for
+   *                     appending to already existing data for the given index
+   *                     or the <code>DataOutput</code> should create new
+   *                     instance to store serialized data
+   * @return the wrapper for <code>DataOutput</code> representation of data
+   * @throws IOException
+   */
+  DataOutputWrapper prepareOutput(int threadId, DataIndex index,
+                                  boolean shouldAppend) throws IOException;
+
+  /**
+   * Whether the data for the given thread and index chain exists?
+   *
+   * @param threadId id of the thread involved in persistence
+   * @param index index chain used to access the data
+   * @return True if the data exists for the given index chain for the given
+   *         thread, False otherwise
+   */
+  boolean dataExist(int threadId, DataIndex index);
+
+  /** Interface to wrap <code>DataInput</code> */
+  interface DataInputWrapper {
+    /**
+     * @return the <code>DataInput</code>
+     */
+    DataInput getDataInput();
+
+    /**
+     * Finalize and close the <code>DataInput</code> used for persistence.
+     *
+     * @param deleteOnClose whether the source of <code>DataInput</code>
+     *                      should be deleted on closing/finalizing
+     * @return number of bytes read from <code>DataInput</code> since it was
+     *         opened
+     */
+    long finalizeInput(boolean deleteOnClose);
+  }
+
+  /** Interface to warp <code>DataOutput</code> */
+  interface DataOutputWrapper {
+    /**
+     * @return the <code>DataOutput</code>
+     */
+    DataOutput getDataOutput();
+
+    /**
+     * Finalize and close the <code>DataOutput</code> used for persistence.
+     *
+     * @return number of bytes written to <code>DataOutput</code> since it was
+     *         opened
+     */
+    long finalizeOutput();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
new file mode 100644
index 0000000..adf8dba
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO abstraction or persistence layer used for
+ * out-of-core mechanism
+ */
+package org.apache.giraph.ooc.persistence;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
new file mode 100644
index 0000000..ffc5f7f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.StorePartitionIOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Oracle for fixed out-of-core mechanism */
+public class FixedPartitionsOracle implements OutOfCoreOracle {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(FixedPartitionsOracle.class);
+  /** Maximum number of partitions to be kept in memory */
+  private final int maxPartitionsInMemory;
+  /**
+   * Number of partitions to be added (loaded) or removed (stored) to/from
+   * memory. Each outstanding load partition counts +1 and each outstanding
+   * store partition counts -1 toward this counter.
+   */
+  private final AtomicInteger deltaNumPartitionsInMemory =
+      new AtomicInteger(0);
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
+                               OutOfCoreEngine oocEngine) {
+    this.maxPartitionsInMemory =
+        GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
+    this.oocEngine = oocEngine;
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    int numPartitionsInMemory =
+        oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
+          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
+          " to be loaded");
+    }
+    int numPartitions =
+        numPartitionsInMemory + deltaNumPartitionsInMemory.get();
+    // Fixed out-of-core policy:
+    //   - if the number of partitions in memory is less than the max number of
+    //     partitions in memory, we should load a partition to memory. This
+    //     basically means we are prefetching partition's data either for the
+    //     current superstep, or for the next superstep.
+    //   - if the number of partitions in memory is equal to the the max number
+    //     of partitions in memory, we do a 'soft store', meaning, we store
+    //     processed partition to disk only if there is an unprocessed partition
+    //     on disk. This basically makes room for unprocessed partitions on disk
+    //     to be prefetched.
+    //   - if the number of partitions in memory is more than the max number of
+    //     partitions in memory, we do a 'hard store', meaning we store a
+    //     partition to disk, regardless of its processing state.
+    if (numPartitions < maxPartitionsInMemory) {
+      return new IOAction[]{
+        IOAction.LOAD_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS};
+    } else if (numPartitions > maxPartitionsInMemory) {
+      LOG.warn("getNextIOActions: number of partitions in memory passed the " +
+          "specified threshold!");
+      return new IOAction[]{
+        IOAction.STORE_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS};
+    } else {
+      return new IOAction[]{
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.LOAD_TO_SWAP_PARTITION};
+    }
+  }
+
+  @Override
+  public boolean approve(IOCommand command) {
+    int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
+        .getNumInMemoryPartitions();
+    // If loading a partition result in having more partition in memory, the
+    // command should be denied. Also, if number of partitions in memory is
+    // already less than the max number of partitions, any command for storing
+    // a partition should be denied.
+    if (command instanceof LoadPartitionIOCommand &&
+        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
+            maxPartitionsInMemory) {
+      deltaNumPartitionsInMemory.getAndDecrement();
+      return false;
+
+    } else if (command instanceof StorePartitionIOCommand &&
+        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
+            maxPartitionsInMemory) {
+      deltaNumPartitionsInMemory.getAndIncrement();
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    if (command instanceof LoadPartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndDecrement();
+    } else if (command instanceof StorePartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndIncrement();
+    }
+  }
+
+  @Override
+  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
+
+  @Override
+  public void shutdown() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
new file mode 100644
index 0000000..45b9914
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.ooc.command.IOCommand;
+
+/**
+ * Interface for any out-of-core oracle. An out-of-core oracle is the brain of
+ * the out-of-core mechanism, determining/deciding on out-of-core actions (load
+ * or store) that should happen.
+ * Note: any class implementing this interface should have one and only one
+ *       constructor taking only two arguments of types
+ *       <code>ImmutableClassesGiraphConfiguration</code> and
+ *       <code>OutOfCoreEngine</code>
+ */
+public interface OutOfCoreOracle {
+  /**
+   * Different types of IO actions that can potentially lead to a more desired
+   * state of computation for out-of-core mechanism. These actions are issued
+   * based on the status of the memory (memory pressure, rate of data transfer
+   * to memory, etc.)
+   */
+  enum IOAction {
+    /**
+     * Either of:
+     *    - storing incoming messages of any partition currently on disk, or
+     *    - storing incoming messages' raw data buffer of any partition
+     *      currently on disk, or
+     *    - storing partitions' raw data buffer for those partitions that are
+     *      currently on disk.
+     */
+    STORE_MESSAGES_AND_BUFFERS,
+    /**
+     * Storing a partition that is *processed* in the current iteration cycle.
+     * This action is also known as "soft store"
+     */
+    STORE_PROCESSED_PARTITION,
+    /**
+     * Storing a partition from memory on disk, prioritizing to *processed*
+     * partitions on memory. However, if there is no *processed* partition,
+     * store should happen at any cost, even if an *unprocessed* partition has
+     * to be stored. This action is also know as "hard store".
+     */
+    STORE_PARTITION,
+    /**
+     * Loading an *unprocessed* partition from disk to memory, only if there are
+     * *processed* partitions in memory. This action basically initiates a swap
+     * operation.
+     */
+    LOAD_TO_SWAP_PARTITION,
+    /**
+     * Loading an *unprocessed* partition from disk to memory. This action is
+     * also known as "soft load".
+     */
+    LOAD_UNPROCESSED_PARTITION,
+    /**
+     * Loading a partition (prioritizing *unprocessed* over *processed*) from
+     * disk to memory. Loading a *processed* partition to memory is a prefetch
+     * of that partition to be processed in the next superstep. This action is
+     * also known as "hard load".
+     */
+    LOAD_PARTITION,
+    /**
+     * Loading a partition regardless of the memory situation. An out-of-core
+     * mechanism may use this action to signal IO threads that it is allowed to
+     * load a partition that is specifically requested.
+     */
+    URGENT_LOAD_PARTITION
+  }
+
+  /**
+   * Get the next set of viable IO actions to help bring memory to a more
+   * desired state.
+   *
+   * @return an array of viable IO actions, sorted from highest priority to
+   *         lowest priority
+   */
+  IOAction[] getNextIOActions();
+
+  /**
+   * Whether a command is appropriate to bring the memory to a more desired
+   * state. A command is not executed unless it is approved by the oracle. This
+   * method is specially important where there are multiple IO threads
+   * performing IO operations for the out-of-core mechanism. The approval
+   * becomes significantly important to prevent all IO threads from performing
+   * identical command type, if that is a necessity. For instance, execution of
+   * a particular command type by only one thread may bring the memory to a
+   * desired state, and the rest of IO threads may perform other types of
+   * commands.
+   *
+   * @param command the IO command that is about to execute
+   * @return 'true' if the command is approved for execution. 'false' if the
+   *         command should not be executed
+   */
+  boolean approve(IOCommand command);
+
+  /**
+   * Notification of command completion. Oracle may update its status and commit
+   * the changes a command may cause.
+   *
+   * @param command the IO command that is completed
+   */
+  void commandCompleted(IOCommand command);
+
+  /**
+   * Notification of GC completion. Oracle may take certain decisions based on
+   * GC information (such as amount of time it took, memory it reclaimed, etc.)
+   *
+   * @param gcInfo GC information
+   */
+  void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
+
+  /**
+   * Shut down the out-of-core oracle. Necessary specifically for cases where
+   * out-of-core oracle is using additional monitoring threads.
+   */
+  void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
new file mode 100644
index 0000000..477b3ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.google.common.collect.Maps;
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.OutOfCoreIOStatistics;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+import java.lang.management.MemoryUsage;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory state constantly at a desired state. This oracle
+ * monitors GC behavior to keep track of memory pressure.
+ *
+ * After each GC is done, this oracle retrieve statistics about the memory
+ * pressure (memory used, max memory, and how far away memory is compared to a
+ * max optimal pressure). Based on the the past 2 recent memory statistics,
+ * the oracle predicts the status of the memory, and sets the rate of load/store
+ * of data from/to disk. If the rate of loading data from disk is 'l', and the
+ * rate of storing data to disk is 's', the rate of data injection to memory
+ * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
+ * be based on the prediction of memory status.
+ *
+ * Assume that based on the previous GC call the memory usage at time t_0 is
+ * m_0, and based on the most recent GC call the memory usage at time t_1 is
+ * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
+ * Assume that the ideal memory pressure happens when the memory usage is
+ * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
+ * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
+ * injection rate to memory so far was i, the new injection rate should be:
+ * i_new = i - (alpha - beta)
+ */
+public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
+  /**
+   * The optimal memory pressure at which GC behavior is close to ideal. This
+   * fraction may be dependant on the GC strategy used for running a job, but
+   * generally should not be dependent on the graph processing application.
+   */
+  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "shows the optimal GC behavior. This fraction may be dependent " +
+              "on the GC strategy used in running the job.");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleGCMonitoringOracle.class);
+  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+  private final float optimalMemoryPressure;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Status of memory from the last GC call */
+  private GCObservation lastGCObservation;
+  /** Desired rate of data injection to memory */
+  private final AtomicLong desiredDiskToMemoryDataRate =
+      new AtomicLong(0);
+  /** Number of on the fly (outstanding) IO commands for each command type */
+  private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
+                                  OutOfCoreEngine oocEngine) {
+    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+    this.oocEngine = oocEngine;
+    this.lastGCObservation = new GCObservation(-1, 0, 0);
+    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+      commandOccurrences.put(type, new AtomicInteger(0));
+    }
+  }
+
+  @Override
+  public synchronized void gcCompleted(GarbageCollectionNotificationInfo
+                                             gcInfo) {
+    long time = System.currentTimeMillis();
+    Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
+        .getMemoryUsageAfterGc();
+    long usedMemory = 0;
+    long maxMemory = 0;
+    for (MemoryUsage memDetail : memAfter.values()) {
+      usedMemory += memDetail.getUsed();
+      maxMemory += memDetail.getMax();
+    }
+    GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("gcCompleted: GC completed with: " + observation);
+    }
+    // Whether this is not the first GC call in the application
+    if (lastGCObservation.isValid()) {
+      long deltaDataRate =
+          lastGCObservation.getDesiredDeltaDataRate(observation);
+      long diskBandwidthEstimate =
+          oocEngine.getIOStatistics().getDiskBandwidth();
+      // Update the desired data injection rate to memory. The data injection
+      // rate cannot be less than -disk_bandwidth (the extreme case happens if
+      // we only do 'store'), and cannot be more than disk_bandwidth (the
+      // extreme case happens if we only do 'load').
+      long dataInjectionRate = desiredDiskToMemoryDataRate.get();
+      desiredDiskToMemoryDataRate.set(Math.max(
+          Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
+              diskBandwidthEstimate), -diskBandwidthEstimate));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("gcCompleted: changing data injection rate from " +
+            String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
+            " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
+            1024.0 / 1024.0));
+      }
+    }
+    lastGCObservation = observation;
+  }
+
+  /**
+   * Get the current data injection rate to memory based on the commands ran
+   * in the history (retrieved from statistics collector), and outstanding
+   * commands issued by the IO scheduler.
+   *
+   * @return the current data injection rate to memory
+   */
+  private long getCurrentDataInjectionRate() {
+    long effectiveBytesTransferred = 0;
+    long effectiveDuration = 0;
+    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+      OutOfCoreIOStatistics.BytesDuration stats =
+          oocEngine.getIOStatistics().getCommandTypeStats(type);
+      int occurrence = commandOccurrences.get(type).get();
+      long typeBytesTransferred = stats.getBytes();
+      long typeDuration = stats.getDuration();
+      // If there is an outstanding command, we still do not know how many bytes
+      // it will transfer, and how long it will take. So, we guesstimate these
+      // numbers based on other similar commands happened in the history. We
+      // simply take the average number of bytes transferred for the particular
+      // command, and we take average duration for the particular command. We
+      // should multiply these numbers by the number of outstanding commands of
+      // this particular command type.
+      if (stats.getOccurrence() != 0) {
+        typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
+            occurrence;
+        typeDuration += stats.getDuration() / stats.getOccurrence() *
+            occurrence;
+      }
+      if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
+        effectiveBytesTransferred += typeBytesTransferred;
+      } else {
+        // Store (data going out of memory), or wait (no data transferred)
+        effectiveBytesTransferred -= typeBytesTransferred;
+      }
+      effectiveDuration += typeDuration;
+    }
+    if (effectiveDuration == 0) {
+      return 0;
+    } else {
+      return effectiveBytesTransferred / effectiveDuration;
+    }
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
+    long desiredRate = desiredDiskToMemoryDataRate.get();
+    long currentRate = getCurrentDataInjectionRate();
+    if (desiredRate > error) {
+      // 'l-s' is positive, we should do more load than store.
+      if (currentRate > desiredRate + error) {
+        // We should decrease 'l-s'. This can be done either by increasing 's'
+        // or issuing wait command. We prioritize wait over hard store.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION};
+      } else if (currentRate < desiredRate - error) {
+        // We should increase 'l-s'. We can simply load partitions/data.
+        return new IOAction[]{IOAction.LOAD_PARTITION};
+      } else {
+        // We are in a proper state and we should keep up with the rate. We can
+        // either soft store data or load data (hard load, since we desired rate
+        // is positive).
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_PARTITION};
+      }
+    } else if (desiredRate < -error) {
+      // 'l-s' is negative, we should do more store than load.
+      if (currentRate < desiredRate - error) {
+        // We should increase 'l-s', but we should be cautious. We only do soft
+        // load, or wait.
+        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+      } else if (currentRate > desiredRate + error) {
+        // We should reduce 'l-s', we do hard store.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PARTITION};
+      } else {
+        // We should keep up with the rate. We can either soft store data, or
+        // soft load data.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_UNPROCESSED_PARTITION};
+      }
+    } else {
+      // 'l-s' is almost zero. If current rate is over the desired rate, we do
+      // soft store. If the current rate is below the desired rate, we do soft
+      // load.
+      if (currentRate > desiredRate + error) {
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION};
+      } else if (currentRate < desiredRate - error) {
+        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+      } else {
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_UNPROCESSED_PARTITION};
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean approve(IOCommand command) {
+    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
+    long desiredRate = desiredDiskToMemoryDataRate.get();
+    long currentRate = getCurrentDataInjectionRate();
+    // The command is denied iff the current rate is above the desired rate and
+    // we are doing load (instead of store), or the current rate is below the
+    // desired rate and we are doing store (instead of loading).
+    if (currentRate > desiredRate + error &&
+        command instanceof LoadPartitionIOCommand) {
+      return false;
+    }
+    if (currentRate < desiredRate - error &&
+        !(command instanceof LoadPartitionIOCommand) &&
+        !(command instanceof WaitIOCommand)) {
+      return false;
+    }
+    commandOccurrences.get(command.getType()).getAndIncrement();
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    commandOccurrences.get(command.getType()).getAndDecrement();
+  }
+
+  @Override
+  public void shutdown() { }
+
+  /** Helper class to record memory status after GC calls */
+  private class GCObservation {
+    /** The time at which the GC happened (in milliseconds) */
+    private long time;
+    /** Amount of memory used after the GC call */
+    private long usedMemory;
+    /** Maximum amounts of memory reported by GC listener */
+    private long maxMemory;
+
+    /**
+     * Constructor
+     *
+     * @param time time of GC
+     * @param usedMemory amount of used memory after GC
+     * @param maxMemory amount of all available memory based on GC observation
+     */
+    public GCObservation(long time, long usedMemory, long maxMemory) {
+      this.time = time;
+      this.usedMemory = usedMemory;
+      this.maxMemory = maxMemory;
+    }
+
+    /**
+     * Is this a valid observation?
+     *
+     * @return true iff it is a valid observation
+     */
+    public boolean isValid() {
+      return time > 0;
+    }
+
+    /**
+     * Considering a new observation of memory status after the most recent GC,
+     * what is the desired rate for data injection to memory.
+     *
+     * @param newObservation the most recent GC observation
+     * @return desired rate of data injection to memory
+     */
+    public long getDesiredDeltaDataRate(GCObservation newObservation) {
+      long newUsedMemory = newObservation.usedMemory;
+      long newMaxMemory = newObservation.maxMemory;
+      long lastUsedMemory = usedMemory;
+      long lastMaxMemory = maxMemory;
+      // Scale the memory status of two GC observation to be the same
+      long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
+      newUsedMemory =
+          (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
+      lastUsedMemory =
+          (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
+      long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
+            "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
+            "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
+            String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
+                1024.0));
+      }
+      long interval = newObservation.time - time;
+      if (interval == 0) {
+        interval = 1;
+        LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
+            "time!");
+      }
+      long currentDataRate = (long) ((double) (newUsedMemory -
+          lastUsedMemory) / interval * 1000);
+      long desiredDataRate = (long) ((double) (desiredUsedMemory -
+          newUsedMemory) / interval * 1000);
+      return currentDataRate - desiredDataRate;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
+          "time: %d ms)", usedMemory / 1024.0 / 1024.0,
+          maxMemory / 1024.0 / 1024.0, time);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
new file mode 100644
index 0000000..ff2b3f7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory usage at a desired state. Out-of-core policy in this
+ * oracle is based on several user-defined thresholds. Also, this oracle spawns
+ * a thread to periodically check the memory usage. This thread would issue
+ * manual GC calls if JVM fails to call major/full GC for a while and the amount
+ * of used memory is about to cause high-memory pressure. This oracle, also,
+ * monitors GC activities. The monitoring mechanism looks for major/full GC
+ * calls, and updates out-of-core decisions based on the amount of available
+ * memory after such GCs. There are three out-of-core decisions:
+ *  - Which IO operations should be done (load/offload of partitions and
+ *    messages)
+ *  - What the incoming messages rate should be (updating credits announced by
+ *    this worker in credit-based flow-control mechanism)
+ *  - How many processing threads should remain active (tethering rate of
+ *    data generation)
+ *
+ * The following table shows the relationship of these decisions and
+ * used-defined thresholds.
+ * --------------------------------------------------------------
+ * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
+ * (memory usage)      |   GC?   | Action |          | Threads  |
+ * --------------------------------------------------------------
+ *                     |  Yes    | hard   |  0       |  0       |
+ *                     |         | store  |          |          |
+ * failPressure -------------------------------------------------
+ *                     |  Yes    | hard   |  0       | fraction |
+ *                     |         | store  |          |          |
+ * emergencyPressure --------------------------------------------
+ *                     |  Yes    | hard   | fraction |  max     |
+ *                     |         | store  |          |          |
+ * highPressure -------------------------------------------------
+ *                     |  No     | soft   | fraction |  max     |
+ *                     |         | store  |          |          |
+ * optimalPressure ----------------------------------------------
+ *                     |  No     | soft   |  max     |  max     |
+ *                     |         | load   |          |          |
+ * lowPressure --------------------------------------------------
+ *                     |  No     | hard   |  max     |  max     |
+ *                     |         | load   |          |          |
+ * --------------------------------------------------------------
+ *
+ */
+public class ThresholdBasedOracle implements OutOfCoreOracle {
+  /** The memory pressure at/above which the job would fail */
+  public static final FloatConfOption FAIL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.failPressure", 0.975f,
+          "The memory pressure (fraction of used memory) at/above which the " +
+              "job would fail.");
+  /**
+   * The memory pressure at which the job is cloe to fail, even though we were
+   * using maximal disk bandwidth and minimal network rate. We should reduce
+   * job processing rate.
+   */
+  public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "is close to fail, hence we should reduce its processing rate " +
+              "as much as possible.");
+  /** The memory pressure at which the job is suffering from GC overhead. */
+  public static final FloatConfOption HIGH_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.highPressure", 0.875f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "is suffering from GC overhead.");
+  /**
+   * The memory pressure at which we expect GC to perform optimally for a
+   * memory intensive job.
+   */
+  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
+          "The memory pressure (fraction of used memory) at which a " +
+              "memory-intensive job shows the optimal GC behavior.");
+  /**
+   * The memory pressure at/below which the job can use more memory without
+   * suffering from GC overhead.
+   */
+  public static final FloatConfOption LOW_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.lowPressure", 0.7f,
+          "The memory pressure (fraction of used memory) at/below which the " +
+              "job can use more memory without suffering the performance.");
+  /** The interval at which memory observer thread wakes up. */
+  public static final LongConfOption CHECK_MEMORY_INTERVAL =
+      new LongConfOption("giraph.checkMemoryInterval", 2500,
+          "The interval/period where memory observer thread wakes up and " +
+              "monitors memory footprint (in milliseconds)");
+  /**
+   * Memory observer thread would manually call GC if major/full GC has not
+   * been called for a while. The period where we expect GC to be happened in
+   * past is specified in this parameter
+   */
+  public static final LongConfOption LAST_GC_CALL_INTERVAL =
+      new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
+          "How long after last major/full GC should we call manual GC?");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ThresholdBasedOracle.class);
+  /** Cached value for FAIL_MEMORY_PRESSURE */
+  private final float failMemoryPressure;
+  /** Cached value for EMERGENCY_MEMORY_PRESSURE */
+  private final float emergencyMemoryPressure;
+  /** Cached value for HIGH_MEMORY_PRESSURE */
+  private final float highMemoryPressure;
+  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+  private final float optimalMemoryPressure;
+  /** Cached value for LOW_MEMORY_PRESSURE */
+  private final float lowMemoryPressure;
+  /** Cached value for CHECK_MEMORY_INTERVAL */
+  private final long checkMemoryInterval;
+  /** Cached value for LAST_GC_CALL_INTERVAL */
+  private final long lastGCCallInterval;
+  /**
+   * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
+   * credit used for credit-based flow-control mechanism)
+   */
+  private final short maxRequestsCredit;
+  /**
+   * Whether the job is shutting down. Used for terminating the memory
+   * observer thread.
+   */
+  private final CountDownLatch shouldTerminate;
+  /** Result of memory observer thread */
+  private final Future<Void> checkMemoryThreadResult;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Last time a major/full GC has been called (in milliseconds) */
+  private volatile long lastMajorGCTime;
+  /** Last time a non major/full GC has been called (in milliseconds) */
+  private volatile long lastMinorGCTime;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
+                              OutOfCoreEngine oocEngine) {
+    this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
+    this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
+    this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
+    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+    this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
+    this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
+    this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
+    this.maxRequestsCredit = (short)
+        CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
+    boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+    checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
+        "must be enabled. Use giraph.waitForPerWorkerRequests=true");
+    this.shouldTerminate = new CountDownLatch(1);
+    this.oocEngine = oocEngine;
+    this.lastMajorGCTime = 0;
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            while (true) {
+              boolean done = shouldTerminate.await(checkMemoryInterval,
+                  TimeUnit.MILLISECONDS);
+              if (done) {
+                break;
+              }
+              double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+              long time = System.currentTimeMillis();
+              if ((usedMemoryFraction > highMemoryPressure &&
+                  time - lastMajorGCTime >= lastGCCallInterval) ||
+                  (usedMemoryFraction > optimalMemoryPressure &&
+                  time - lastMajorGCTime >= lastGCCallInterval &&
+                  time - lastMinorGCTime >= lastGCCallInterval)) {
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("call: last GC happened a while ago and the " +
+                      "amount of used memory is high (used memory " +
+                      "fraction is " +
+                      String.format("%.2f", usedMemoryFraction) + "). " +
+                      "Calling GC manually");
+                }
+                System.gc();
+                time = System.currentTimeMillis() - time;
+                usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("call: manual GC is done. It took " +
+                      String.format("%.2f", (double) time / 1000) +
+                      " seconds. Used memory fraction is " +
+                      String.format("%.2f", usedMemoryFraction));
+                }
+              }
+              updateRates(usedMemoryFraction);
+            }
+            return null;
+          }
+        };
+      }
+    };
+    ExecutorService executor = Executors.newSingleThreadExecutor(
+        ThreadUtils.createThreadFactory("check-memory"));
+    this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
+        callableFactory.newCallable(0)));
+    executor.shutdown();
+  }
+
+  /**
+   * upon major/full GC calls.
+   */
+  /**
+   * Update statistics and rate regarding communication credits and number of
+   * active threads.
+   *
+   * @param usedMemoryFraction the fraction of used memory over max memory
+   */
+  public void updateRates(double usedMemoryFraction) {
+    // Update the fraction of processing threads that should remain active
+    if (usedMemoryFraction >= failMemoryPressure) {
+      oocEngine.updateActiveThreadsFraction(0);
+    } else if (usedMemoryFraction < emergencyMemoryPressure) {
+      oocEngine.updateActiveThreadsFraction(1);
+    } else {
+      oocEngine.updateActiveThreadsFraction(1 -
+          (usedMemoryFraction - emergencyMemoryPressure) /
+              (failMemoryPressure - emergencyMemoryPressure));
+    }
+
+    // Update the fraction of credit that should be used in credit-based flow-
+    // control
+    if (usedMemoryFraction >= emergencyMemoryPressure) {
+      updateRequestsCredit((short) 0);
+    } else if (usedMemoryFraction < optimalMemoryPressure) {
+      updateRequestsCredit(maxRequestsCredit);
+    } else {
+      updateRequestsCredit((short) (maxRequestsCredit *
+          (1 - (usedMemoryFraction - optimalMemoryPressure) /
+              (emergencyMemoryPressure - optimalMemoryPressure))));
+    }
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+    if (LOG.isInfoEnabled()) {
+      LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
+          usedMemoryFraction));
+    }
+    if (usedMemoryFraction > highMemoryPressure) {
+      return new IOAction[]{
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.STORE_PARTITION};
+    } else if (usedMemoryFraction > optimalMemoryPressure) {
+      return new IOAction[]{
+        IOAction.LOAD_UNPROCESSED_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.STORE_PROCESSED_PARTITION};
+    } else if (usedMemoryFraction > lowMemoryPressure) {
+      return new IOAction[]{
+        IOAction.LOAD_UNPROCESSED_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.LOAD_PARTITION};
+    } else {
+      return new IOAction[]{IOAction.LOAD_PARTITION};
+    }
+  }
+
+  @Override
+  public boolean approve(IOCommand command) {
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    // Do nothing
+  }
+
+  @Override
+  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
+    String gcAction = gcInfo.getGcAction().toLowerCase();
+    if (gcAction.contains("full") || gcAction.contains("major")) {
+      if (!gcInfo.getGcCause().contains("No GC")) {
+        lastMajorGCTime = System.currentTimeMillis();
+      }
+    } else {
+      lastMinorGCTime = System.currentTimeMillis();
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    shouldTerminate.countDown();
+    try {
+      checkMemoryThreadResult.get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("shutdown: caught exception while waiting on check-memory " +
+          "thread to terminate!");
+      throw new IllegalStateException(e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
+    }
+  }
+
+  /**
+   * Update the credit announced for this worker in Netty. The lower the credit
+   * is, the lower rate incoming messages arrive at this worker. Thus, credit
+   * is an indirect way of controlling amount of memory incoming messages would
+   * take.
+   *
+   * @param newCredit the new credit to announce to other workers
+   */
+  private void updateRequestsCredit(short newCredit) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
+    }
+    FlowControl flowControl = oocEngine.getFlowControl();
+    if (flowControl != null) {
+      ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
new file mode 100644
index 0000000..c58289f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to out-of-core policy
+ */
+package org.apache.giraph.ooc.policy;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index d3ace99..c54e7b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -123,7 +123,7 @@ public class ZooKeeperManager {
     this.context = context;
     this.conf = configuration;
     taskPartition = conf.getTaskPartition();
-    jobId = conf.get("mapred.job.id", "Unknown Job");
+    jobId = conf.getJobId();
     baseDirectory =
         new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
             getFinalZooKeeperPath()));