You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/05/20 22:14:29 UTC

[5/5] git commit: updated refs/heads/trunk to 6256a76

Integrating out-of-core mechanism with credit-based flow-control and data generation tethering

Summary: This diff integrates out-of-core infrastructure with credit-based flow control and adds the ability to tether the rate of data generation/processing. Data generation/processing rate is controlled by changing the number of active processing (input/compute) threads. This diff also implements a new (and more performant) adaptive out-of-core policy.

Test Plan:
mvn clean verify
all snapshot tests including ones with large data pass
Running adaptive out-of-core on large graph with very limited memory does not fail.
This diff should enable us to avoid *any* reasonable job to fail!

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Subscribers: ramesh-muthusamy

Differential Revision: https://reviews.facebook.net/D55479


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6256a761
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6256a761
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6256a761

Branch: refs/heads/trunk
Commit: 6256a761d61a5b27a05878da2449ce8537d60c99
Parents: 4321e44
Author: Sergey Edunov <ed...@fb.com>
Authored: Fri May 20 15:14:08 2016 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Fri May 20 15:14:08 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/comm/ServerData.java |   14 +-
 .../flow_control/CreditBasedFlowControl.java    |  400 ++++++-
 .../giraph/comm/flow_control/FlowControl.java   |   19 +-
 .../comm/flow_control/NoOpFlowControl.java      |   14 +-
 .../comm/flow_control/StaticFlowControl.java    |   32 +-
 .../apache/giraph/comm/netty/NettyClient.java   |   39 +-
 .../handler/AddressRequestIdGenerator.java      |   53 -
 .../netty/handler/RequestServerHandler.java     |    6 +-
 .../netty/handler/ResponseClientHandler.java    |    4 +-
 .../netty/handler/TaskRequestIdGenerator.java   |   51 +
 .../giraph/comm/requests/RequestType.java       |    4 +-
 .../giraph/comm/requests/SendResumeRequest.java |   80 ++
 .../apache/giraph/conf/GiraphConfiguration.java |    4 -
 .../org/apache/giraph/conf/GiraphConstants.java |   18 +-
 .../org/apache/giraph/counters/GiraphStats.java |   34 +-
 .../apache/giraph/edge/AbstractEdgeStore.java   |   23 +-
 .../apache/giraph/graph/ComputeCallable.java    |   62 +-
 .../org/apache/giraph/graph/GlobalStats.java    |   30 +
 .../apache/giraph/graph/GraphTaskManager.java   |   66 +-
 .../apache/giraph/master/BspServiceMaster.java  |    8 +
 .../apache/giraph/metrics/AggregatedMetric.java |   46 +-
 .../giraph/metrics/AggregatedMetricDouble.java  |   50 +
 .../giraph/metrics/AggregatedMetricLong.java    |   50 +
 .../giraph/metrics/AggregatedMetrics.java       |   62 +-
 .../giraph/metrics/ValueWithHostname.java       |   18 +-
 .../giraph/metrics/WorkerSuperstepMetrics.java  |   62 +
 .../apache/giraph/ooc/FixedOutOfCoreEngine.java |  147 ---
 .../giraph/ooc/FixedOutOfCoreIOScheduler.java   |  211 ----
 .../giraph/ooc/FixedPartitionsOracle.java       |  139 +++
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |  382 +++++-
 .../apache/giraph/ooc/OutOfCoreIOCallable.java  |   71 +-
 .../apache/giraph/ooc/OutOfCoreIOScheduler.java |  182 ++-
 .../giraph/ooc/OutOfCoreIOStatistics.java       |  360 ++++++
 .../org/apache/giraph/ooc/OutOfCoreOracle.java  |  131 +++
 .../giraph/ooc/SimpleGCMonitoringOracle.java    |  355 ++++++
 .../apache/giraph/ooc/ThresholdBasedOracle.java |  364 ++++++
 .../giraph/ooc/data/DiskBackedEdgeStore.java    |   22 +-
 .../giraph/ooc/data/DiskBackedMessageStore.java |   43 +-
 .../ooc/data/DiskBackedPartitionStore.java      |   46 +-
 .../giraph/ooc/data/MetaPartitionManager.java   |  677 +++++------
 .../giraph/ooc/data/OutOfCoreDataManager.java   |   42 +-
 .../org/apache/giraph/ooc/io/IOCommand.java     |   47 +-
 .../giraph/ooc/io/LoadPartitionIOCommand.java   |   20 +-
 .../giraph/ooc/io/StoreDataBufferIOCommand.java |   18 +-
 .../ooc/io/StoreIncomingMessageIOCommand.java   |   13 +-
 .../giraph/ooc/io/StorePartitionIOCommand.java  |   21 +-
 .../org/apache/giraph/ooc/io/WaitIOCommand.java |    8 +-
 .../giraph/utils/AdjustableSemaphore.java       |    6 +
 .../apache/giraph/worker/BspServiceWorker.java  |    5 +
 .../giraph/worker/EdgeInputSplitsCallable.java  |   10 +
 .../giraph/worker/InputSplitsCallable.java      |   20 +-
 .../worker/VertexInputSplitsCallable.java       |   10 +
 .../giraph/partition/TestPartitionStores.java   |    6 +-
 .../java/org/apache/giraph/TestOutOfCore.java   |    4 +-
 src/site/xdoc/options.xml                       | 1095 ------------------
 55 files changed, 3593 insertions(+), 2111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index be34820..69fbfee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -44,7 +44,6 @@ import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
 import org.apache.giraph.ooc.data.DiskBackedMessageStore;
 import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-import org.apache.giraph.ooc.FixedOutOfCoreEngine;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
@@ -149,16 +148,7 @@ public class ServerData<I extends WritableComparable,
     PartitionStore<I, V, E> inMemoryPartitionStore =
         new SimplePartitionStore<I, V, E>(conf, context);
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
-      int maxPartitionsInMemory =
-          GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
-      if (maxPartitionsInMemory == 0) {
-        throw new IllegalStateException("ServerData: Adaptive " +
-            "out-of-core engine is not supported yet! Number of partitions in" +
-            " memory should be greater than 0.");
-      } else {
-        oocEngine = new FixedOutOfCoreEngine(conf, service,
-            maxPartitionsInMemory);
-      }
+      oocEngine = new OutOfCoreEngine(conf, service);
       partitionStore =
           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
               conf, context, service, oocEngine);
@@ -312,7 +302,7 @@ public class ServerData<I extends WritableComparable,
     currentMessageStore = nextCurrentMessageStore;
     incomingMessageStore = nextIncomingMessageStore;
     if (oocEngine != null) {
-      oocEngine.getMetaPartitionManager().resetMessages();
+      oocEngine.reset();
       oocEngine.getSuperstepLock().writeLock().unlock();
     }
     currentMessageStore.finalizeStore();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index ff82dd1..36d4f20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -18,18 +18,36 @@
 
 package org.apache.giraph.comm.flow_control;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.SendResumeRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,9 +56,27 @@ import static com.google.common.base.Preconditions.checkState;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 
 /**
- * Representation of credit-based flow control policy where each worker has a
- * constant user-defined credit. The number of open requests to a particular
- * worker cannot be more than its specified credit.
+ * Representation of credit-based flow control policy. With this policy there
+ * can be limited number of open requests from any worker x to any other worker
+ * y. This number is called 'credit'. Let's denote this number by C{x->y}. This
+ * implementation assumes that for a particular worker W, all values of C{x->W}
+ * are the same. Let's denote this value by CR_W. CR_W may change due to other
+ * reasons (e.g. memory pressure observed in an out-of-core mechanism). However,
+ * CR_W is always in range [0, MAX_CR], where MAX_CR is a user-defined constant.
+ * Note that MAX_CR should be representable by at most 14 bits.
+ *
+ * In this implementation, the value of CR_W is announced to other workers along
+ * with the ACK response envelope for all ACK response envelope going out of W.
+ * Therefore, for non-zero values of CR_W, other workers know the instant value
+ * of CR_W, hence they can control the number of open requests they have to W.
+ * However, it is possible that W announces 0 as CR_W. In this case, other
+ * workers stop opening more requests to W, hence they will not get any new
+ * response envelope from W. This means other workers should be notified with
+ * a dedicated request to resume sending more requests once CR_W becomes
+ * non-zero. In this implementation, once W_CR is announced as 0 to a particular
+ * worker U, we keep U in a set, so later on we can send 'resume signal' to U
+ * once CR_W becomes non-zero. Sending resume signals are done through a
+ * separate thread.
  */
 public class CreditBasedFlowControl implements FlowControl {
   /**
@@ -69,28 +105,87 @@ public class CreditBasedFlowControl implements FlowControl {
   private final int unsentWaitMsecs;
   /** Waiting interval for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
-  /** Maximum number of open requests we can have for each worker */
-  private final int maxOpenRequestsPerWorker;
+  /**
+   * Maximum number of open requests each worker can have to this work at each
+   * moment (CR_W -define above- for this worker)
+   */
+  private volatile short maxOpenRequestsPerWorker;
   /** Total number of unsent, cached requests */
   private final AtomicInteger aggregateUnsentRequests = new AtomicInteger(0);
   /**
-   * Map of requests permits per worker. Key in the map is the worker id and the
-   * value is the semaphore to control the number of open requests for the
-   * particular worker. Basically, the number of available permits on this
-   * semaphore is the credit available for the worker.
+   * Map of requests permits per worker. Keys in the map are worker ids and
+   * values are pairs (X, Y) where:
+   *   X: is the semaphore to control the number of open requests for a
+   *      particular worker. Basically, the number of available permits on a
+   *      semaphore is the credit available for the worker associated with that
+   *      semaphore.
+   *   Y: is the timestamp of the latest message (resume signal or ACK response)
+   *      that changed the number of permits in the semaphore.
+   * The idea behind keeping a timestamp is to avoid any issue that may happen
+   * due to out-of-order message delivery. For example, consider this scenario:
+   * an ACK response is sent to a worker announcing the credit is 0. Later on,
+   * a resume signal announcing a non-zero credit is sent to the same worker.
+   * Now, if the resume signal receives before the ACK message, the worker
+   * would incorrectly assume credit value of 0, and would avoid sending any
+   * messages, which may lead to a live-lock.
+   *
+   * The timestamp value is simply the request id generated by NettyClient.
+   * These ids are generated in consecutive order, hence simulating the concept
+   * of timestamp. However, the timestamp value should be sent along with
+   * any ACK response envelope. The ACK response envelope is already very small
+   * (maybe 10-20 bytes). So, the timestamp value should not add much overhead
+   * to it. Instead of sending the whole long value request id (8 bytes) as the
+   * timestamp, we can simply send the least significant 2 bytes of it. This is
+   * a valid timestamp, as the credit value can be 0x3FFF (=16383) at most. This
+   * means there will be at most 0x3FFF messages on the fly at each moment,
+   * which means that the timestamp value sent by all messages in fly will fall
+   * into a range of size 0x3FFF. So, it is enough to only consider timestamp
+   * values twice as big as the mentioned range to be able to accurately
+   * determine ordering even when values wrap around. This means we only need to
+   * consider 15 least significant bits of request ids as timestamp values.
+   *
+   * The ACK response value contains following information (from least
+   * significant to most significant):
+   *  - 16 bits timestamp
+   *  - 14 bits credit value
+   *  - 1 bit specifying whether one end of communication is master and hence
+   *    credit based flow control should be ignored
+   *  - 1 bit response flag
    */
-  private final ConcurrentMap<Integer, AdjustableSemaphore>
+  private final ConcurrentMap<Integer, Pair<AdjustableSemaphore, Integer>>
       perWorkerOpenRequestMap = Maps.newConcurrentMap();
   /** Map of unsent cached requests per worker */
   private final ConcurrentMap<Integer, Deque<WritableRequest>>
       perWorkerUnsentRequestMap = Maps.newConcurrentMap();
   /**
+   * Set of workers that should be notified to resume sending more requests if
+   * the credit becomes non-zero
+   */
+  private final Set<Integer> workersToResume = Sets.newHashSet();
+  /**
+   * Resume signals are not using any credit, so they should be treated
+   * differently than normal requests. Resume signals should be ignored in
+   * accounting for credits in credit-based flow control. The following map
+   * keeps sets of request ids, for resume signals sent to other workers that
+   * are still "open". The set of request ids used for resume signals for a
+   * worker is important so we can determine if a received response is for a
+   * resume signal or not.
+   */
+  private final Map<Integer, Set<Long>> resumeRequestsId =
+      Maps.newConcurrentMap();
+  /**
    * Semaphore to control number of cached unsent requests. Maximum number of
    * permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
    */
   private final Semaphore unsentRequestPermit;
   /** Netty client used for sending requests */
   private final NettyClient nettyClient;
+  /**
+   * Result of execution for the thread responsible for sending resume signals
+   */
+  private final Future<Void> resumeThreadResult;
+  /** Whether we are shutting down the execution */
+  private volatile boolean shouldTerminate;
 
   /**
    * Constructor
@@ -100,31 +195,92 @@ public class CreditBasedFlowControl implements FlowControl {
   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
                                 NettyClient nettyClient) {
     this.nettyClient = nettyClient;
-    maxOpenRequestsPerWorker = MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+    maxOpenRequestsPerWorker =
+        (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
     checkState(maxOpenRequestsPerWorker < 0x4000 &&
         maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
         "requests should be in range (0, " + 0x4FFF + ")");
     unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+    shouldTerminate = false;
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            while (true) {
+              synchronized (workersToResume) {
+                if (shouldTerminate) {
+                  break;
+                }
+                for (Integer workerId : workersToResume) {
+                  if (maxOpenRequestsPerWorker != 0) {
+                    sendResumeSignal(workerId);
+                  } else {
+                    break;
+                  }
+                }
+                try {
+                  workersToResume.wait();
+                } catch (InterruptedException e) {
+                  throw new IllegalStateException("call: caught exception " +
+                      "while waiting for resume-sender thread to be notified!",
+                      e);
+                }
+              }
+            }
+            return null;
+          }
+        };
+      }
+    };
+
+    ExecutorService executor = Executors.newSingleThreadExecutor(
+        ThreadUtils.createThreadFactory("resume-sender"));
+    resumeThreadResult = executor.submit(new LogStacktraceCallable<>(
+        callableFactory.newCallable(0)));
+    executor.shutdown();
+  }
+
+  /**
+   * Send resume signal request to a given worker
+   *
+   * @param workerId id of the worker to send the resume signal to
+   */
+  private void sendResumeSignal(int workerId) {
+    WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
+    Long resumeId = nettyClient.doSend(workerId, request);
+    checkState(resumeId != null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("sendResumeSignal: sending signal to worker " + workerId +
+          " with credit=" + maxOpenRequestsPerWorker + ", ID=" +
+          (resumeId & 0xFFFF));
+    }
+    resumeRequestsId.get(workerId).add(resumeId);
   }
 
   @Override
   public void sendRequest(int destTaskId, WritableRequest request) {
-    AdjustableSemaphore openRequestPermit =
+    Pair<AdjustableSemaphore, Integer> pair =
         perWorkerOpenRequestMap.get(destTaskId);
     // Check if this is the first time sending a request to a worker. If so, we
     // should the worker id to necessary bookkeeping data structure.
-    if (openRequestPermit == null) {
-      openRequestPermit = new AdjustableSemaphore(maxOpenRequestsPerWorker);
-      AdjustableSemaphore temp = perWorkerOpenRequestMap.putIfAbsent(destTaskId,
-          openRequestPermit);
-      perWorkerUnsentRequestMap
-          .putIfAbsent(destTaskId, new ArrayDeque<WritableRequest>());
+    if (pair == null) {
+      pair = new MutablePair<>(
+          new AdjustableSemaphore(maxOpenRequestsPerWorker), -1);
+      Pair<AdjustableSemaphore, Integer> temp =
+          perWorkerOpenRequestMap.putIfAbsent(destTaskId, pair);
+      perWorkerUnsentRequestMap.putIfAbsent(
+          destTaskId, new ArrayDeque<WritableRequest>());
+      resumeRequestsId.putIfAbsent(
+          destTaskId, Sets.<Long>newConcurrentHashSet());
       if (temp != null) {
-        openRequestPermit = temp;
+        pair = temp;
       }
     }
+    AdjustableSemaphore openRequestPermit = pair.getLeft();
     // Try to reserve a spot for the request amongst the open requests of
     // the destination worker.
     boolean shouldSend = openRequestPermit.tryAcquire();
@@ -198,8 +354,8 @@ public class CreditBasedFlowControl implements FlowControl {
    * @param response response received
    * @return true iff credit should be ignored, false otherwise
    */
-  private boolean shouldIgnoreCredit(short response) {
-    return ((short) ((response >> 14) & 1)) == 1;
+  private boolean shouldIgnoreCredit(int response) {
+    return ((short) ((response >> (14 + 16)) & 1)) == 1;
   }
 
   /**
@@ -208,8 +364,18 @@ public class CreditBasedFlowControl implements FlowControl {
    * @param response response received
    * @return credit from the received response
    */
-  private short getCredit(short response) {
-    return (short) (response & 0x3FFF);
+  private short getCredit(int response) {
+    return (short) ((response >> 16) & 0x3FFF);
+  }
+
+  /**
+   * Get the timestamp from a response
+   *
+   * @param response response received
+   * @return timestamp from the received response
+   */
+  private int getTimestamp(int response) {
+    return response & 0xFFFF;
   }
 
   /**
@@ -219,15 +385,73 @@ public class CreditBasedFlowControl implements FlowControl {
    * @return AckSignalFlag coming with the response
    */
   @Override
-  public AckSignalFlag getAckSignalFlag(short response) {
-    return AckSignalFlag.values()[(response >> 15) & 1];
+  public AckSignalFlag getAckSignalFlag(int response) {
+    return AckSignalFlag.values()[(response >> (16 + 14 + 1)) & 1];
   }
 
   @Override
-  public short calculateResponse(AckSignalFlag flag, int taskId) {
+  public int calculateResponse(AckSignalFlag flag, int taskId) {
     boolean ignoreCredit = nettyClient.masterInvolved(taskId);
-    return (short) ((flag.ordinal() << 15) |
-        ((ignoreCredit ? 1 : 0) << 14) | (maxOpenRequestsPerWorker & 0x3FFF));
+    if (!ignoreCredit && maxOpenRequestsPerWorker == 0) {
+      synchronized (workersToResume) {
+        workersToResume.add(taskId);
+      }
+    }
+    int timestamp = (int) (nettyClient.getNextRequestId(taskId) & 0xFFFF);
+    return (flag.ordinal() << (16 + 14 + 1)) |
+        ((ignoreCredit ? 1 : 0) << (16 + 14)) |
+        (maxOpenRequestsPerWorker << 16) |
+        timestamp;
+  }
+
+  @Override
+  public void shutdown() {
+    synchronized (workersToResume) {
+      shouldTerminate = true;
+      workersToResume.notifyAll();
+    }
+    try {
+      resumeThreadResult.get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IllegalStateException("shutdown: caught exception while" +
+          "getting result of resume-sender thread");
+    }
+  }
+
+  @Override
+  public void logInfo() {
+    if (LOG.isInfoEnabled()) {
+      // Count how many unsent requests each task has
+      Map<Integer, Integer> unsentRequestCounts = Maps.newHashMap();
+      for (Map.Entry<Integer, Deque<WritableRequest>> entry :
+          perWorkerUnsentRequestMap.entrySet()) {
+        unsentRequestCounts.put(entry.getKey(), entry.getValue().size());
+      }
+      ArrayList<Map.Entry<Integer, Integer>> sorted =
+          Lists.newArrayList(unsentRequestCounts.entrySet());
+      Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {
+        @Override
+        public int compare(Map.Entry<Integer, Integer> entry1,
+                           Map.Entry<Integer, Integer> entry2) {
+          int value1 = entry1.getValue();
+          int value2 = entry2.getValue();
+          return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);
+        }
+      });
+      StringBuilder message = new StringBuilder();
+      message.append("logInfo: ").append(aggregateUnsentRequests.get())
+          .append(" unsent requests in total. ");
+      int itemsToPrint = Math.min(10, sorted.size());
+      for (int i = 0; i < itemsToPrint; ++i) {
+        message.append(sorted.get(i).getValue())
+            .append(" unsent requests for taskId=")
+            .append(sorted.get(i).getKey()).append(" (credit=")
+            .append(perWorkerOpenRequestMap.get(sorted.get(i).getKey())
+                .getKey().getMaxPermits())
+            .append("), ");
+      }
+      LOG.info(message);
+    }
   }
 
   @Override
@@ -240,7 +464,7 @@ public class CreditBasedFlowControl implements FlowControl {
         try {
           aggregateUnsentRequests.wait(waitingRequestMsecs);
         } catch (InterruptedException e) {
-          throw new IllegalStateException("waitSomeRequests: failed while " +
+          throw new IllegalStateException("waitAllRequests: failed while " +
               "waiting on open/cached requests");
         }
       }
@@ -257,21 +481,52 @@ public class CreditBasedFlowControl implements FlowControl {
   }
 
   @Override
-  public void messageAckReceived(int taskId, short response) {
-    boolean shouldIgnoreCredit = shouldIgnoreCredit(response);
+  public void messageAckReceived(int taskId, long requestId, int response) {
+    boolean ignoreCredit = shouldIgnoreCredit(response);
     short credit = getCredit(response);
-    AdjustableSemaphore openRequestPermit =
-        perWorkerOpenRequestMap.get(taskId);
-    openRequestPermit.release();
-    if (!shouldIgnoreCredit) {
-      openRequestPermit.setMaxPermits(credit);
+    int timestamp = getTimestamp(response);
+    MutablePair<AdjustableSemaphore, Integer> pair =
+        (MutablePair<AdjustableSemaphore, Integer>)
+            perWorkerOpenRequestMap.get(taskId);
+    AdjustableSemaphore openRequestPermit = pair.getLeft();
+    // Release a permit on open requests if we received ACK of a request other
+    // than a Resume request (resume requests are always sent regardless of
+    // number of open requests)
+    if (!resumeRequestsId.get(taskId).remove(requestId)) {
+      openRequestPermit.release();
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("messageAckReceived: ACK of resume received from " + taskId +
+          " timestamp=" + timestamp);
+    }
+    if (!ignoreCredit) {
+      synchronized (pair) {
+        if (compareTimestamps(timestamp, pair.getRight()) > 0) {
+          pair.setRight(timestamp);
+          openRequestPermit.setMaxPermits(credit);
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("messageAckReceived: received out-of-order messages." +
+              "Received timestamp=" + timestamp + " and current timestamp=" +
+              pair.getRight());
+        }
+      }
     }
-    Deque<WritableRequest> requestDeque =
-        perWorkerUnsentRequestMap.get(taskId);
     // Since we received a response and we changed the credit of the sender
     // client, we may be able to send some more requests to the sender
     // client. So, we try to send as much request as we can to the sender
     // client.
+    trySendCachedRequests(taskId);
+  }
+
+  /**
+   * Try to send as much as cached requests to a given worker
+   *
+   * @param taskId id of the worker to send cached requests to
+   */
+  private void trySendCachedRequests(int taskId) {
+    Deque<WritableRequest> requestDeque =
+        perWorkerUnsentRequestMap.get(taskId);
+    AdjustableSemaphore openRequestPermit =
+        perWorkerOpenRequestMap.get(taskId).getLeft();
     while (true) {
       WritableRequest request;
       synchronized (requestDeque) {
@@ -285,6 +540,7 @@ public class CreditBasedFlowControl implements FlowControl {
           break;
         }
       }
+      unsentRequestPermit.release();
       // At this point, we have a request, and we reserved a credit for the
       // sender client. So, we send the request to the client and update
       // the state.
@@ -294,7 +550,73 @@ public class CreditBasedFlowControl implements FlowControl {
           aggregateUnsentRequests.notifyAll();
         }
       }
-      unsentRequestPermit.release();
     }
   }
+
+  /**
+   * Update the max credit that is announced to other workers
+   *
+   * @param newCredit new credit
+   */
+  public void updateCredit(short newCredit) {
+    newCredit = (short) Math.max(0, Math.min(0x3FFF, newCredit));
+    // Check whether we should send resume signals to some workers
+    if (maxOpenRequestsPerWorker == 0 && newCredit != 0) {
+      maxOpenRequestsPerWorker = newCredit;
+      synchronized (workersToResume) {
+        workersToResume.notifyAll();
+      }
+    } else {
+      maxOpenRequestsPerWorker = newCredit;
+    }
+  }
+
+  /**
+   * Compare two timestamps accounting for wrap around. Note that the timestamp
+   * values should be in a range that fits into 14 bits values. This means if
+   * the difference of the two given timestamp is large, we are dealing with one
+   * value being wrapped around.
+   *
+   * @param ts1 first timestamp
+   * @param ts2 second timestamp
+   * @return positive value if first timestamp is later than second timestamp,
+   *         negative otherwise
+   */
+  private int compareTimestamps(int ts1, int ts2) {
+    int diff = ts1 - ts2;
+    if (Math.abs(diff) < 0x7FFF) {
+      return diff;
+    } else {
+      return -diff;
+    }
+  }
+
+  /**
+   * Process a resume signal came from a given worker
+   *
+   * @param clientId id of the worker that sent the signal
+   * @param credit the credit value sent along with the resume signal
+   * @param requestId timestamp (request id) of the resume signal
+   */
+  public void processResumeSignal(int clientId, short credit, long requestId) {
+    int timestamp = (int) (requestId & 0xFFFF);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("processResumeSignal: resume signal from " + clientId +
+          " with timestamp=" + timestamp);
+    }
+    MutablePair<AdjustableSemaphore, Integer> pair =
+        (MutablePair<AdjustableSemaphore, Integer>)
+            perWorkerOpenRequestMap.get(clientId);
+    synchronized (pair) {
+      if (compareTimestamps(timestamp, pair.getRight()) > 0) {
+        pair.setRight(timestamp);
+        pair.getLeft().setMaxPermits(credit);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("processResumeSignal: received out-of-order messages. " +
+            "Received timestamp=" + timestamp + " and current timestamp=" +
+            pair.getRight());
+      }
+    }
+    trySendCachedRequests(clientId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
index 4eda193..4072af7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
@@ -45,9 +45,10 @@ public interface FlowControl {
    * Notify the flow control policy that an open request is completed.
    *
    * @param taskId id of the task to which the open request is completed
-   * @param response the response heard from the task
+   * @param requestId id of the open request which is completed
+   * @param response the response heard from the client
    */
-  void messageAckReceived(int taskId, short response);
+  void messageAckReceived(int taskId, long requestId, int response);
 
   /**
    * Decode the acknowledgement signal from the response after an open request
@@ -56,7 +57,7 @@ public interface FlowControl {
    * @param response the response heard after completion of a request
    * @return the Acknowledgement signal decoded from the response
    */
-  AckSignalFlag getAckSignalFlag(short response);
+  AckSignalFlag getAckSignalFlag(int response);
 
   /**
    * There may be requests in possession of the flow control mechanism, as the
@@ -79,5 +80,15 @@ public interface FlowControl {
    * @param taskId id of the task the acknowledgement is for
    * @return the response to piggyback along with the acknowledgement message
    */
-  short calculateResponse(AckSignalFlag flag, int taskId);
+  int calculateResponse(AckSignalFlag flag, int taskId);
+
+  /**
+   * Shutdown the flow control policy
+   */
+  void shutdown();
+
+  /**
+   * Log the status of the flow control
+   */
+  void logInfo();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
index d50fe92..c97c967 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
@@ -45,10 +45,10 @@ public class NoOpFlowControl implements FlowControl {
   }
 
   @Override
-  public void messageAckReceived(int taskId, short response) { }
+  public void messageAckReceived(int taskId, long requestId, int response) { }
 
   @Override
-  public AckSignalFlag getAckSignalFlag(short response) {
+  public AckSignalFlag getAckSignalFlag(int response) {
     return AckSignalFlag.values()[response];
   }
 
@@ -61,7 +61,13 @@ public class NoOpFlowControl implements FlowControl {
   }
 
   @Override
-  public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
-    return (short) alreadyDone.ordinal();
+  public int calculateResponse(AckSignalFlag alreadyDone, int taskId) {
+    return alreadyDone.ordinal();
   }
+
+  @Override
+  public void shutdown() { }
+
+  @Override
+  public void logInfo() { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
index 1fc43a7..6d67afd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
@@ -31,6 +31,8 @@ import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.log4j.Logger;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 
 /**
@@ -69,6 +71,8 @@ public class StaticFlowControl implements
   private final Object requestSpotAvailable = new Object();
   /** Counter for time spent waiting on too many open requests */
   private Counter timeWaitingOnOpenRequests;
+  /** Number of threads waiting on too many open requests */
+  private final AtomicInteger numWaitingThreads = new AtomicInteger(0);
 
   /**
    * Constructor
@@ -108,11 +112,12 @@ public class StaticFlowControl implements
   }
 
   /**
-   * Ensure that at most maxOpenRequests are not complete.  Periodically,
-   * check the state of every request.  If we find the connection failed,
-   * re-establish it and re-send the request.
+   * Ensure that at most numberOfRequestsToProceed are not complete.
+   * Periodically, check the state of every request.  If we find the connection
+   * failed, re-establish it and re-send the request.
    */
   private void waitSomeRequests() {
+    numWaitingThreads.getAndIncrement();
     while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
       // Wait for requests to complete for some time
       synchronized (requestSpotAvailable) {
@@ -129,23 +134,36 @@ public class StaticFlowControl implements
       }
       nettyClient.logAndSanityCheck();
     }
+    numWaitingThreads.getAndDecrement();
   }
 
   @Override
-  public void messageAckReceived(int taskId, short response) {
+  public void messageAckReceived(int taskId, long requestId, int response) {
     synchronized (requestSpotAvailable) {
       requestSpotAvailable.notifyAll();
     }
   }
 
   @Override
-  public AckSignalFlag getAckSignalFlag(short response) {
+  public AckSignalFlag getAckSignalFlag(int response) {
     return AckSignalFlag.values()[response];
   }
 
   @Override
-  public short calculateResponse(AckSignalFlag alreadyDone, int taskId) {
-    return (short) alreadyDone.ordinal();
+  public int calculateResponse(AckSignalFlag alreadyDone, int clientId) {
+    return alreadyDone.ordinal();
+  }
+
+  @Override
+  public void shutdown() { }
+
+  @Override
+  public void logInfo() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
+          "until number of open requests falls below " +
+          numberOfRequestsToProceed);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c185fdc..217dba6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -23,7 +23,7 @@ import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.flow_control.NoOpFlowControl;
 import org.apache.giraph.comm.flow_control.StaticFlowControl;
 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
-import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
+import org.apache.giraph.comm.netty.handler.TaskRequestIdGenerator;
 import org.apache.giraph.comm.netty.handler.ClientRequestId;
 import org.apache.giraph.comm.netty.handler.RequestEncoder;
 import org.apache.giraph.comm.netty.handler.RequestInfo;
@@ -174,12 +174,12 @@ public class NettyClient {
   /** Waiting interval for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
   /** Timed logger for printing request debugging */
-  private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
+  private final TimedLogger requestLogger;
   /** Worker executor group */
   private final EventLoopGroup workerGroup;
-  /** Address request id generator */
-  private final AddressRequestIdGenerator addressRequestIdGenerator =
-      new AddressRequestIdGenerator();
+  /** Task request id generator */
+  private final TaskRequestIdGenerator taskRequestIdGenerator =
+      new TaskRequestIdGenerator();
   /** Task info */
   private final TaskInfo myTaskInfo;
   /** Maximum thread pool size */
@@ -245,6 +245,7 @@ public class NettyClient {
     waitTimeBetweenConnectionRetriesMs =
         WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
+    requestLogger = new TimedLogger(waitingRequestMsecs, LOG);
     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
 
@@ -621,6 +622,7 @@ public class NettyClient {
     if (LOG.isInfoEnabled()) {
       LOG.info("stop: Halting netty client");
     }
+    flowControl.shutdown();
     // Close connections asynchronously, in a Netty-approved
     // way, without cleaning up thread pools until all channels
     // in addressChannelMap are closed (success or failure)
@@ -730,14 +732,16 @@ public class NettyClient {
    *
    * @param destTaskId destination to send the request to
    * @param request request itself
+   * @return request id generated for sending the request
    */
-  public void doSend(int destTaskId, WritableRequest request) {
+  public Long doSend(int destTaskId, WritableRequest request) {
     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
     if (clientRequestIdRequestInfoMap.isEmpty()) {
       inboundByteCounter.resetAll();
       outboundByteCounter.resetAll();
     }
     boolean registerRequest = true;
+    Long requestId = null;
 /*if_not[HADOOP_NON_SECURE]*/
     if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
       registerRequest = false;
@@ -748,8 +752,8 @@ public class NettyClient {
     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
     if (registerRequest) {
       request.setClientId(myTaskInfo.getTaskId());
-      request.setRequestId(
-        addressRequestIdGenerator.getNextRequestId(remoteServer));
+      requestId = taskRequestIdGenerator.getNextRequestId(destTaskId);
+      request.setRequestId(requestId);
       ClientRequestId clientRequestId =
         new ClientRequestId(destTaskId, request.getRequestId());
       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
@@ -769,6 +773,7 @@ public class NettyClient {
     ChannelFuture writeFuture = channel.write(request);
     newRequestInfo.setWriteFuture(writeFuture);
     writeFuture.addListener(logErrorListener);
+    return requestId;
   }
 
   /**
@@ -779,7 +784,7 @@ public class NettyClient {
    * @param response Actual response
    * @param shouldDrop Drop the message?
    */
-  public void messageReceived(int senderId, long requestId, short response,
+  public void messageReceived(int senderId, long requestId, int response,
       boolean shouldDrop) {
     if (shouldDrop) {
       synchronized (clientRequestIdRequestInfoMap) {
@@ -806,7 +811,7 @@ public class NettyClient {
             requestInfo + ".  Waiting on " +
             clientRequestIdRequestInfoMap.size() + " requests");
       }
-      flowControl.messageAckReceived(senderId, response);
+      flowControl.messageAckReceived(senderId, requestId, response);
       // Help #waitAllRequests() to finish faster
       synchronized (clientRequestIdRequestInfoMap) {
         clientRequestIdRequestInfoMap.notifyAll();
@@ -862,8 +867,7 @@ public class NettyClient {
       LOG.info("logInfoAboutOpenRequests: Waiting interval of " +
           waitingRequestMsecs + " msecs, " +
           clientRequestIdRequestInfoMap.size() +
-          " open requests, " + flowControl.getNumberOfUnsentRequests() +
-          " cached unsent requests, " + inboundByteCounter.getMetrics() + "\n" +
+          " open requests, " + inboundByteCounter.getMetrics() + "\n" +
           outboundByteCounter.getMetrics());
 
       if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {
@@ -907,6 +911,7 @@ public class NettyClient {
             .append(", ");
       }
       LOG.info(message);
+      flowControl.logInfo();
     }
   }
 
@@ -1022,6 +1027,16 @@ public class NettyClient {
   }
 
   /**
+   * Generate and get the next request id to be used for a given worker
+   *
+   * @param taskId id of the worker to generate the next request id
+   * @return request id
+   */
+  public Long getNextRequestId(int taskId) {
+    return taskRequestIdGenerator.getNextRequestId(taskId);
+  }
+
+  /**
    * @return number of open requests
    */
   public int getNumberOfOpenRequests() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
deleted file mode 100644
index 8ba5b96..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.netty.handler;
-
-import com.google.common.collect.Maps;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Generate different request ids based on the address of the well known
- * port on the workers.  Thread-safe.
- */
-public class AddressRequestIdGenerator {
-  /** Address request generator map */
-  private final ConcurrentMap<InetSocketAddress, AtomicLong>
-  addressRequestGeneratorMap = Maps.newConcurrentMap();
-
-  /**
-   * Get the next request id for a given destination.  Thread-safe.
-   *
-   * @param address Address of the worker (consistent during a superstep)
-   * @return Valid request id
-   */
-  public Long getNextRequestId(InetSocketAddress address) {
-    AtomicLong requestGenerator = addressRequestGeneratorMap.get(address);
-    if (requestGenerator == null) {
-      requestGenerator = new AtomicLong(0);
-      AtomicLong oldRequestGenerator =
-          addressRequestGeneratorMap.putIfAbsent(address, requestGenerator);
-      if (oldRequestGenerator != null) {
-        requestGenerator = oldRequestGenerator;
-      }
-    }
-    return requestGenerator.getAndIncrement();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index df50e2a..7bb4464 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -44,7 +44,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES
 public abstract class RequestServerHandler<R> extends
   ChannelInboundHandlerAdapter {
   /** Number of bytes in the encoded response */
-  public static final int RESPONSE_BYTES = 14;
+  public static final int RESPONSE_BYTES = 16;
   /** Time class to use */
   private static Time TIME = SystemTime.get();
   /** Class logger */
@@ -137,9 +137,9 @@ public abstract class RequestServerHandler<R> extends
     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
     buffer.writeInt(myTaskInfo.getTaskId());
     buffer.writeLong(request.getRequestId());
-    short signal =
+    int signal =
         flowControl.calculateResponse(alreadyDone, request.getClientId());
-    buffer.writeShort(signal);
+    buffer.writeInt(signal);
     ctx.write(buffer);
     // NettyServer is bootstrapped with auto-read set to true by default. After
     // the first request is processed, we set auto-read to false. This prevents

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 54cb201..12dde3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -64,11 +64,11 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
     ByteBuf buf = (ByteBuf) msg;
     int senderId = -1;
     long requestId = -1;
-    short response = -1;
+    int response = -1;
     try {
       senderId = buf.readInt();
       requestId = buf.readLong();
-      response = buf.readShort();
+      response = buf.readInt();
     } catch (IndexOutOfBoundsException e) {
       throw new IllegalStateException(
           "channelRead: Got IndexOutOfBoundsException ", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
new file mode 100644
index 0000000..b172e9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/TaskRequestIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty.handler;
+
+import com.google.common.collect.Maps;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Generate different request ids based on the task id.  Thread-safe.
+ */
+public class TaskRequestIdGenerator {
+  /** Task request generator map */
+  private final ConcurrentMap<Integer, AtomicLong>
+      taskRequestGeneratorMap = Maps.newConcurrentMap();
+
+  /**
+   * Get the next request id for a given destination.  Thread-safe.
+   *
+   * @param taskId id of the task(consistent during a superstep)
+   * @return Valid request id
+   */
+  public Long getNextRequestId(Integer taskId) {
+    AtomicLong requestGenerator = taskRequestGeneratorMap.get(taskId);
+    if (requestGenerator == null) {
+      requestGenerator = new AtomicLong(0);
+      AtomicLong oldRequestGenerator =
+          taskRequestGeneratorMap.putIfAbsent(taskId, requestGenerator);
+      if (oldRequestGenerator != null) {
+        requestGenerator = oldRequestGenerator;
+      }
+    }
+    return requestGenerator.getAndIncrement();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index bebac28..627c2af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -64,7 +64,9 @@ else[HADOOP_NON_SECURE]*/
   /** Send request for input split from worker to master */
   ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
   /** Send request with granted input split from master to workers */
-  REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
+  REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class),
+  /** Send request to resume sending messages (used in flow-control) */
+  SEND_RESUME_REQUEST(SendResumeRequest.class);
 
   /** Class of request which this type corresponds to */
   private final Class<? extends WritableRequest> requestClass;

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
new file mode 100644
index 0000000..0e5e8bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendResumeRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Send to a worker a signal to resume sending messages to sender worker. This
+ * type of request is used in adaptive credit-based flow control, where a
+ * worker (W) may assign credit value of 0 to some worker (U), so that U would
+ * stop sending messages to W. Later on, W may want to notify U to continue
+ * sending messages to W. Along with the resume signal, W also announces a new
+ * credit value to U.
+ */
+public class SendResumeRequest extends WritableRequest
+    implements WorkerRequest {
+  /** credit value */
+  private short credit;
+
+  /** Constructor used for reflection only */
+  public SendResumeRequest() { }
+
+  /**
+   * Constructor
+   *
+   * @param credit credit value
+   */
+  public SendResumeRequest(short credit) {
+    checkState(credit > 0);
+    this.credit = credit;
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    FlowControl flowControl =
+        serverData.getServiceWorker().getWorkerClient().getFlowControl();
+    checkState(flowControl != null);
+    ((CreditBasedFlowControl) flowControl).processResumeSignal(getClientId(),
+        credit, getRequestId());
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_RESUME_REQUEST;
+  }
+
+  @Override
+  void readFieldsRequest(DataInput input) throws IOException {
+    credit = input.readShort();
+  }
+
+  @Override
+  void writeRequest(DataOutput output) throws IOException {
+    output.writeShort(credit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 78bd5ef..df79b7f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -957,10 +957,6 @@ public class GiraphConfiguration extends Configuration
     return NUM_COMPUTE_THREADS.get(this);
   }
 
-  public int getNumOocThreads() {
-    return NUM_OOC_THREADS.get(this);
-  }
-
   /**
    * Set the number of input split threads
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index b5bb9ed..17e030f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,6 +71,8 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.ooc.OutOfCoreOracle;
+import org.apache.giraph.ooc.ThresholdBasedOracle;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
@@ -995,10 +997,16 @@ public interface GiraphConstants {
       new BooleanConfOption("giraph.useOutOfCoreGraph", false,
           "Enable out-of-core graph.");
 
-  /** Number of threads participating in swapping graph/messages to disk. */
-  IntConfOption NUM_OOC_THREADS =
-      new IntConfOption("giraph.numOutOfCoreThreads", 1,
-          "Number of threads participating in swapping data to disk.");
+  /**
+   * Out-of-core oracle that is to be used for adaptive out-of-core engine. If
+   * the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written
+   * to be `FixedPartitionsOracle`.
+   */
+  ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE =
+      ClassConfOption.create("giraph.outOfCoreOracle",
+          ThresholdBasedOracle.class, OutOfCoreOracle.class,
+          "Out-of-core oracle that is to be used for adaptive out-of-core " +
+              "engine");
 
   /** Maximum number of partitions to hold in memory for each worker. */
   IntConfOption MAX_PARTITIONS_IN_MEMORY =
@@ -1006,8 +1014,6 @@ public interface GiraphConstants {
           "Maximum number of partitions to hold in memory for each worker. By" +
               " default it is set to 0 (for adaptive out-of-core mechanism");
 
-
-
   /** Directory to write YourKit snapshots to */
   String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir";
   /** Default directory to write YourKit snapshots to */

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
index d96b474..0cb8486 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphStats.java
@@ -47,7 +47,7 @@ public class GiraphStats extends HadoopCountersBase {
     = "Aggregate sent messages";
   /** aggregate sent messages bytes counter name */
   public static final String AGGREGATE_SENT_MESSAGE_BYTES_NAME
-    = "Aggregate sent message message bytes";
+    = "Aggregate sent message bytes";
   /** workers counter name */
   public static final String CURRENT_WORKERS_NAME = "Current workers";
   /** current master partition task counter name */
@@ -56,6 +56,12 @@ public class GiraphStats extends HadoopCountersBase {
   /** last checkpointed superstep counter name */
   public static final String LAST_CHECKPOINTED_SUPERSTEP_NAME =
       "Last checkpointed superstep";
+  /** aggregate bytes loaded from local disks in out-of-core */
+  public static final String OOC_BYTES_LOADED_NAME =
+      "Aggregate bytes loaded from local disks (out-of-core)";
+  /** aggregate bytes stored to local disks in out-of-core */
+  public static final String OOC_BYTES_STORED_NAME =
+      "Aggregate bytes stored to local disks (out-of-core)";
 
   /** Singleton instance for everyone to use */
   private static GiraphStats INSTANCE;
@@ -82,8 +88,12 @@ public class GiraphStats extends HadoopCountersBase {
   private static final int AGG_SENT_MESSAGES = 9;
   /** Aggregate sent message bytes counter */
   private static final int AGG_SENT_MESSAGE_BYTES = 10;
+  /** Aggregate OOC loaded bytes counter */
+  private static final int OOC_BYTES_LOADED = 11;
+  /** Aggregate OOC stored bytes counter */
+  private static final int OOC_BYTES_STORED = 12;
   /** Number of counters in this class */
-  private static final int NUM_COUNTERS = 11;
+  private static final int NUM_COUNTERS = 13;
 
   /** All the counters stored */
   private final GiraphHadoopCounter[] counters;
@@ -111,6 +121,8 @@ public class GiraphStats extends HadoopCountersBase {
         getCounter(AGGREGATE_SENT_MESSAGES_NAME);
     counters[AGG_SENT_MESSAGE_BYTES] =
         getCounter(AGGREGATE_SENT_MESSAGE_BYTES_NAME);
+    counters[OOC_BYTES_LOADED] = getCounter(OOC_BYTES_LOADED_NAME);
+    counters[OOC_BYTES_STORED] = getCounter(OOC_BYTES_STORED_NAME);
   }
 
   /**
@@ -230,6 +242,24 @@ public class GiraphStats extends HadoopCountersBase {
     return counters[LAST_CHECKPOINTED_SUPERSTEP];
   }
 
+  /**
+   * Get OOCBytesLoaded counter
+   *
+   * @return OOCBytesLoaded counter
+   */
+  public GiraphHadoopCounter getAggregateOOCBytesLoaded() {
+    return counters[OOC_BYTES_LOADED];
+  }
+
+  /**
+   * Get OOCBytesStored counter
+   *
+   * @return OOCBytesStored counter
+   */
+  public GiraphHadoopCounter getAggregateOOCBytesStored() {
+    return counters[OOC_BYTES_STORED];
+  }
+
   @Override
   public Iterator<GiraphHadoopCounter> iterator() {
     return Arrays.asList(counters).iterator();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 0f3d668..104cae2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -23,6 +23,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.ProgressableUtils;
@@ -78,6 +79,8 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
    * from the one used during computation.
    */
   protected boolean useInputOutEdges;
+  /** Whether we spilled edges on disk */
+  private boolean hasEdgesOnDisk = false;
 
   /**
    * Constructor.
@@ -169,6 +172,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
     Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
     if (edges != null) {
       output.writeInt(edges.size());
+      if (edges.size() > 0) {
+        hasEdgesOnDisk = true;
+      }
       for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
         writeVertexKey(edge.getKey(), output);
         edge.getValue().write(output);
@@ -242,7 +248,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
   @Override
   public void moveEdgesToVertices() {
     final boolean createSourceVertex = configuration.getCreateSourceVertex();
-    if (transientEdges.isEmpty()) {
+    if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
       if (LOG.isInfoEnabled()) {
         LOG.info("moveEdgesToVertices: No edges to move");
       }
@@ -264,6 +270,10 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
           public Void call() throws Exception {
             Integer partitionId;
             I representativeVertexId = configuration.createVertexId();
+            OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
+            if (oocEngine != null) {
+              oocEngine.processingThreadStart();
+            }
             while (true) {
               Partition<I, V, E> partition =
                   service.getPartitionStore().getNextPartition();
@@ -280,7 +290,15 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
               Iterator<Et> iterator =
                   getPartitionEdgesIterator(partitionEdges);
               // process all vertices in given partition
+              int count = 0;
               while (iterator.hasNext()) {
+                // If out-of-core mechanism is used, check whether this thread
+                // can stay active or it should temporarily suspend and stop
+                // processing and generating more data for the moment.
+                if (oocEngine != null &&
+                    (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+                  oocEngine.activeThreadCheckIn();
+                }
                 Et entry = iterator.next();
                 I vertexId = getVertexId(entry, representativeVertexId);
                 OutEdges<I, E> outEdges = convertInputToComputeEdges(
@@ -320,6 +338,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
               // partition after modifying it.
               service.getPartitionStore().putPartition(partition);
             }
+            if (oocEngine != null) {
+              oocEngine.processingThreadFinish();
+            }
             return null;
           }
         };

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index d59d044..78c1ec3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -31,6 +31,7 @@ import org.apache.giraph.io.SimpleVertexWriter;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
@@ -99,6 +100,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   private final Counter messageBytesSentCounter;
   /** Compute time per partition */
   private final Histogram histogramComputePerPartition;
+  /** GC time per compute thread */
+  private final Histogram histogramGCTimePerThread;
+  /** Wait time per compute thread */
+  private final Histogram histogramWaitTimePerThread;
+  /** Processing time per compute thread */
+  private final Histogram histogramProcessingTimePerThread;
 
   /**
    * Constructor
@@ -125,6 +132,11 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
     histogramComputePerPartition = metrics.getUniformHistogram(
         MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
+    histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
+    histogramWaitTimePerThread =
+        metrics.getUniformHistogram("wait-per-thread-ms");
+    histogramProcessingTimePerThread =
+        metrics.getUniformHistogram("processing-per-thread-ms");
   }
 
   @Override
@@ -148,17 +160,32 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
 
     List<PartitionStats> partitionStatsList = Lists.newArrayList();
     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
+    OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
+    GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
+    if (oocEngine != null) {
+      oocEngine.processingThreadStart();
+    }
+    long timeWaiting = 0;
+    long timeProcessing = 0;
+    long timeDoingGC = 0;
     while (true) {
       long startTime = System.currentTimeMillis();
+      long startGCTime = taskManager.getSuperstepGCTime();
       Partition<I, V, E> partition = partitionStore.getNextPartition();
+      long timeDoingGCWhileWaiting =
+          taskManager.getSuperstepGCTime() - startGCTime;
+      timeDoingGC += timeDoingGCWhileWaiting;
+      timeWaiting += System.currentTimeMillis() - startTime -
+          timeDoingGCWhileWaiting;
       if (partition == null) {
         break;
       }
-
+      long startProcessingTime = System.currentTimeMillis();
+      startGCTime = taskManager.getSuperstepGCTime();
       try {
         serviceWorker.getServerData().resolvePartitionMutation(partition);
         PartitionStats partitionStats =
-            computePartition(computation, partition);
+            computePartition(computation, partition, oocEngine);
         partitionStatsList.add(partitionStats);
         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
         partitionStats.addMessagesSentCount(partitionMsgs);
@@ -180,10 +207,17 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       } finally {
         partitionStore.putPartition(partition);
       }
+      long timeDoingGCWhileProcessing =
+          taskManager.getSuperstepGCTime() - startGCTime;
+      timeDoingGC += timeDoingGCWhileProcessing;
+      timeProcessing += System.currentTimeMillis() - startProcessingTime -
+          timeDoingGCWhileProcessing;
       histogramComputePerPartition.update(
           System.currentTimeMillis() - startTime);
     }
-
+    histogramGCTimePerThread.update(timeDoingGC);
+    histogramWaitTimePerThread.update(timeWaiting);
+    histogramProcessingTimePerThread.update(timeProcessing);
     computation.postSuperstep();
 
     // Return VertexWriter after the usage
@@ -194,7 +228,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
           Time.NS_PER_SECOND_AS_FLOAT;
       LOG.info("call: Computation took " + seconds + " secs for "  +
           partitionStatsList.size() + " partitions on superstep " +
-          graphState.getSuperstep() + ".  Flushing started");
+          graphState.getSuperstep() + ".  Flushing started (time waiting on " +
+          "partitions was " +
+          String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
+          "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
+          ", time spent on gc was " +
+          String.format("%.2f s", timeDoingGC / 1000.0) + ")");
     }
     try {
       workerClientRequestProcessor.flush();
@@ -211,6 +250,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
     } catch (IOException e) {
       throw new IllegalStateException("call: Flushing failed.", e);
     }
+    if (oocEngine != null) {
+      oocEngine.processingThreadFinish();
+    }
     return partitionStatsList;
   }
 
@@ -219,17 +261,27 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
    *
    * @param computation Computation to use
    * @param partition Partition to compute
+   * @param oocEngine out-of-core engine
    * @return Partition stats for this computed partition
    */
   private PartitionStats computePartition(
       Computation<I, V, E, M1, M2> computation,
-      Partition<I, V, E> partition) throws IOException, InterruptedException {
+      Partition<I, V, E> partition, OutOfCoreEngine oocEngine)
+      throws IOException, InterruptedException {
     PartitionStats partitionStats =
         new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
     long verticesComputedProgress = 0;
     // Make sure this is thread-safe across runs
     synchronized (partition) {
+      int count = 0;
       for (Vertex<I, V, E> vertex : partition) {
+        // If out-of-core mechanism is used, check whether this thread
+        // can stay active or it should temporarily suspend and stop
+        // processing and generating more data for the moment.
+        if (oocEngine != null &&
+            (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+          oocEngine.activeThreadCheckIn();
+        }
         Iterable<M1> messages = messageStore.getVertexMessages(vertex.getId());
         if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
           vertex.wakeUp();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
index 499c862..dab3c2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java
@@ -42,6 +42,10 @@ public class GlobalStats implements Writable {
   private long messageBytesCount = 0;
   /** Whether the computation should be halted */
   private boolean haltComputation = false;
+  /** Bytes of data stored to disk in the last superstep */
+  private long oocStoreBytesCount = 0;
+  /** Bytes of data loaded to disk in the last superstep */
+  private long oocLoadBytesCount = 0;
   /**
    * Master's decision on whether we should checkpoint and
    * what to do next.
@@ -88,6 +92,14 @@ public class GlobalStats implements Writable {
     haltComputation = value;
   }
 
+  public long getOocStoreBytesCount() {
+    return oocStoreBytesCount;
+  }
+
+  public long getOocLoadBytesCount() {
+    return oocLoadBytesCount;
+  }
+
   public CheckpointStatus getCheckpointStatus() {
     return checkpointStatus;
   }
@@ -97,6 +109,24 @@ public class GlobalStats implements Writable {
   }
 
   /**
+   * Add bytes loaded to the global stats.
+   *
+   * @param oocLoadBytesCount number of bytes to be added
+   */
+  public void addOocLoadBytesCount(long oocLoadBytesCount) {
+    this.oocLoadBytesCount += oocLoadBytesCount;
+  }
+
+  /**
+   * Add bytes stored to the global stats.
+   *
+   * @param oocStoreBytesCount number of bytes to be added
+   */
+  public void addOocStoreBytesCount(long oocStoreBytesCount) {
+    this.oocStoreBytesCount += oocStoreBytesCount;
+  }
+
+  /**
    * Add messages to the global stats.
    *
    * @param messageCount Number of messages to be added.

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 19ac615..87d5248 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.graph;
 
 import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.ArrayList;
@@ -29,6 +31,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.sun.management.GarbageCollectionNotificationInfo;
+import com.yammer.metrics.core.Counter;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -46,6 +50,7 @@ import org.apache.giraph.metrics.GiraphTimer;
 import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
@@ -70,6 +75,11 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+import javax.management.Notification;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
 /**
  * The Giraph-specific business logic for a single BSP
  * compute node in whatever underlying type of cluster
@@ -110,6 +120,8 @@ end[PURE_YARN]*/
       "time-to-first-message-ms";
   /** Name of metric for time from first message till last message flushed */
   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
+  /** Name of metric for time spent doing GC per superstep in msec */
+  public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
 
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
@@ -156,6 +168,8 @@ end[PURE_YARN]*/
   private GiraphTimerContext communicationTimerContext;
   /** Timer for WorkerContext#preSuperstep() */
   private GiraphTimer wcPreSuperstepTimer;
+  /** Timer to keep aggregated time spent in GC in a superstep */
+  private Counter gcTimeMetric;
   /** The Hadoop Mapper#Context for this job */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** is this GraphTaskManager the master? */
@@ -293,7 +307,9 @@ end[PURE_YARN]*/
       return;
     }
     preLoadOnWorkerObservers();
+    GiraphTimerContext superstepTimerContext = superstepTimer.time();
     finishedSuperstepStats = serviceWorker.setup();
+    superstepTimerContext.stop();
     if (collectInputSuperstepStats(finishedSuperstepStats)) {
       return;
     }
@@ -304,8 +320,7 @@ end[PURE_YARN]*/
     // main superstep processing loop
     while (!finishedSuperstepStats.allVerticesHalted()) {
       final long superstep = serviceWorker.getSuperstep();
-      GiraphTimerContext superstepTimerContext =
-        getTimerForThisSuperstep(superstep);
+      superstepTimerContext = getTimerForThisSuperstep(superstep);
       GraphState graphState = new GraphState(superstep,
           finishedSuperstepStats.getVertexCount(),
           finishedSuperstepStats.getEdgeCount(),
@@ -619,6 +634,7 @@ end[PURE_YARN]*/
         LOG.info("setup: Starting up BspServiceWorker...");
       }
       serviceWorker = new BspServiceWorker<I, V, E>(context, this);
+      installGCMonitoring();
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Registering health of this worker...");
       }
@@ -626,6 +642,45 @@ end[PURE_YARN]*/
   }
 
   /**
+   * Install GC monitoring. This method intercepts all GC, log the gc, and
+   * notifies an out-of-core engine (if any is used) about the GC.
+   */
+  private void installGCMonitoring() {
+    List<GarbageCollectorMXBean> mxBeans = ManagementFactory
+        .getGarbageCollectorMXBeans();
+    final OutOfCoreEngine oocEngine =
+        serviceWorker.getServerData().getOocEngine();
+    for (GarbageCollectorMXBean gcBean : mxBeans) {
+      NotificationEmitter emitter = (NotificationEmitter) gcBean;
+      NotificationListener listener = new NotificationListener() {
+        @Override
+        public void handleNotification(Notification notification,
+                                       Object handle) {
+          if (notification.getType().equals(GarbageCollectionNotificationInfo
+              .GARBAGE_COLLECTION_NOTIFICATION)) {
+            GarbageCollectionNotificationInfo info =
+                GarbageCollectionNotificationInfo.from(
+                    (CompositeData) notification.getUserData());
+
+            if (LOG.isInfoEnabled()) {
+              LOG.info("installGCMonitoring: name = " + info.getGcName() +
+                  ", action = " + info.getGcAction() + ", cause = " +
+                  info.getGcCause() + ", duration = " +
+                  info.getGcInfo().getDuration() + "ms");
+            }
+            gcTimeMetric.inc(info.getGcInfo().getDuration());
+            if (oocEngine != null) {
+              oocEngine.gcCompleted(info);
+            }
+          }
+        }
+      };
+      //Add the listener
+      emitter.addNotificationListener(listener, null, null);
+    }
+  }
+
+  /**
    * Initialize the root logger and appender to the settings in conf.
    */
   private void initializeAndConfigureLogging() {
@@ -680,6 +735,7 @@ end[PURE_YARN]*/
         TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
     communicationTimer = new GiraphTimer(superstepMetrics,
         TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
+    gcTimeMetric = superstepMetrics.getCounter(TIMER_SUPERSTEP_GC_TIME);
     wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
         "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
   }
@@ -1000,6 +1056,12 @@ end[PURE_YARN]*/
     return conf;
   }
 
+  /**
+   * @return Time spent in GC recorder by the GC listener
+   */
+  public long getSuperstepGCTime() {
+    return gcTimeMetric.count();
+  }
 
   /**
    * Default handler for uncaught exceptions.

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index cf8d1bd..8372bd3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -942,6 +942,10 @@ public class BspServiceMaster<I extends WritableComparable,
                   workerFinishedInfoObj.getString(
                       JSONOBJ_METRICS_KEY)),
               workerMetrics);
+          globalStats.addOocLoadBytesCount(
+              workerMetrics.getBytesLoadedFromDisk());
+          globalStats.addOocStoreBytesCount(
+              workerMetrics.getBytesStoredOnDisk());
           aggregatedMetrics.add(workerMetrics, hostnamePartitionId);
         }
       } catch (JSONException e) {
@@ -2050,5 +2054,9 @@ public class BspServiceMaster<I extends WritableComparable,
     gs.getAggregateSentMessages().increment(globalStats.getMessageCount());
     gs.getAggregateSentMessageBytes()
       .increment(globalStats.getMessageBytesCount());
+    gs.getAggregateOOCBytesLoaded()
+      .increment(globalStats.getOocLoadBytesCount());
+    gs.getAggregateOOCBytesStored()
+      .increment(globalStats.getOocStoreBytesCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
index 9d782c4..f99043f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetric.java
@@ -20,41 +20,26 @@ package org.apache.giraph.metrics;
 
 /**
  * An aggregator over metrics from multiple hosts. Computes min, max, and mean.
+ *
+ * @param <T> value type
  */
-public class AggregatedMetric {
+public abstract class AggregatedMetric<T extends Number> {
   /** Minimum value seen with the host that it came from */
-  private ValueWithHostname min;
+  protected ValueWithHostname<T> min;
   /** Maximum value seen with the host that it came from */
-  private ValueWithHostname max;
+  protected ValueWithHostname<T> max;
   /** Total of all the values seen */
-  private long sum;
+  protected T sum;
   /** Number of values seen */
-  private long count;
-
-  /**
-   * Create new aggregated metric.
-   */
-  public AggregatedMetric() {
-    min = new ValueWithHostname(Long.MAX_VALUE);
-    max = new ValueWithHostname(Long.MIN_VALUE);
-  }
+  protected long count;
 
   /**
    * Add another item to the aggregation.
    *
-   * @param value long value to add
+   * @param value value to add
    * @param hostnamePartitionId String hostname it came from
    */
-  public void addItem(long value, String hostnamePartitionId) {
-    if (value < min.getValue()) {
-      min.set(value, hostnamePartitionId);
-    }
-    if (value > max.getValue()) {
-      max.set(value, hostnamePartitionId);
-    }
-    sum += value;
-    count++;
-  }
+  public abstract void addItem(T value, String hostnamePartitionId);
 
   /**
    * Whether this AggregatedMetric has any data.
@@ -70,7 +55,7 @@ public class AggregatedMetric {
    *
    * @return ValueWithHostname for minimum
    */
-  public ValueWithHostname min() {
+  public ValueWithHostname<T> min() {
     return min;
   }
 
@@ -79,16 +64,16 @@ public class AggregatedMetric {
    *
    * @return ValueWithHostname for maximum
    */
-  public ValueWithHostname max() {
+  public ValueWithHostname<T> max() {
     return max;
   }
 
   /**
    * Get total of all the values seen
    *
-   * @return long total of values seen
+   * @return total of values seen
    */
-  public long sum() {
+  public T sum() {
     return sum;
   }
 
@@ -97,9 +82,7 @@ public class AggregatedMetric {
    *
    * @return computed average of all the values
    */
-  public double mean() {
-    return sum / (double) count;
-  }
+  public abstract double mean();
 
   /**
    * Get number of values seen
@@ -110,3 +93,4 @@ public class AggregatedMetric {
     return count;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
new file mode 100644
index 0000000..abd0ff6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricDouble.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Aggregator over metrics with double values
+ */
+public final class AggregatedMetricDouble extends AggregatedMetric<Double> {
+  /**
+   * Constructor
+   */
+  public AggregatedMetricDouble() {
+    min = new ValueWithHostname<>(Double.MAX_VALUE);
+    max = new ValueWithHostname<>(Double.MIN_VALUE);
+    sum = 0.0;
+  }
+
+  @Override
+  public void addItem(Double value, String hostnamePartitionId) {
+    if (value < min.getValue()) {
+      min.set(value, hostnamePartitionId);
+    }
+    if (value > max.getValue()) {
+      max.set(value, hostnamePartitionId);
+    }
+    sum += value;
+    count++;
+  }
+
+  @Override
+  public double mean() {
+    return sum / count;
+  }
+}