You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/04/21 20:18:24 UTC

[2/3] hadoop git commit: HDFS-8163. Using monotonicNow for block report scheduling causes test failures on recently restarted systems. (Arpit Agarwal)

HDFS-8163. Using monotonicNow for block report scheduling causes test failures on recently restarted systems. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b3acc5c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b3acc5c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b3acc5c

Branch: refs/heads/branch-2
Commit: 7b3acc5c9dc6240fc15ff399061ffc8b5017bfd8
Parents: e7bb0fc
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Apr 21 10:58:05 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Apr 21 10:58:12 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/util/Time.java  |   2 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/BPOfferService.java    |   2 +-
 .../hdfs/server/datanode/BPServiceActor.java    | 203 ++++++++++++-------
 .../datanode/TestBpServiceActorScheduler.java   | 163 +++++++++++++++
 5 files changed, 299 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b3acc5c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
index b988923..20e2965 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
@@ -47,6 +47,8 @@ public final class Time {
    * milliseconds, and not affected by settimeofday or similar system clock
    * changes.  This is appropriate to use when computing how much longer to
    * wait for an interval to expire.
+   * This function can return a negative value and it must be handled correctly
+   * by callers. See the documentation of System#nanoTime for caveats.
    * @return a monotonic clock that counts in milliseconds.
    */
   public static long monotonicNow() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b3acc5c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d8558e1..51168e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -248,6 +248,9 @@ Release 2.7.1 - UNRELEASED
     HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
     goes for infinite loop (vinayakumarb)
 
+    HDFS-8163. Using monotonicNow for block report scheduling causes
+    test failures on recently restarted systems. (Arpit Agarwal)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b3acc5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 67979f3..5097e4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -430,7 +430,7 @@ class BPOfferService {
    */
   void scheduleBlockReport(long delay) {
     for (BPServiceActor actor : bpServices) {
-      actor.scheduleBlockReport(delay);
+      actor.getScheduler().scheduleBlockReport(delay);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b3acc5c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index ba22225..5bc505f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 
@@ -82,19 +83,11 @@ class BPServiceActor implements Runnable {
 
   final BPOfferService bpos;
   
-  // lastBlockReport and lastHeartbeat may be assigned/read
-  // by testing threads (through BPServiceActor#triggerXXX), while also 
-  // assigned/read by the actor thread. Thus they should be declared as volatile
-  // to make sure the "happens-before" consistency.
-  volatile long lastBlockReport = 0;
-
-  boolean resetBlockReportTime = true;
-
   volatile long lastCacheReport = 0;
+  private final Scheduler scheduler;
 
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
-  private volatile long lastHeartbeat = 0;
 
   static enum RunningState {
     CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
@@ -130,6 +123,7 @@ class BPServiceActor implements Runnable {
     this.nnAddr = nnAddr;
     this.dnConf = dn.getDnConf();
     prevBlockReportId = DFSUtil.getRandom().nextLong();
+    scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
   }
 
   boolean isAlive() {
@@ -232,33 +226,6 @@ class BPServiceActor implements Runnable {
     register(nsInfo);
   }
 
-  // This is useful to make sure NN gets Heartbeat before Blockreport
-  // upon NN restart while DN keeps retrying Otherwise,
-  // 1. NN restarts.
-  // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
-  // 3. After reregistration completes, DN will send Blockreport first.
-  // 4. Given NN receives Blockreport after Heartbeat, it won't mark
-  //    DatanodeStorageInfo#blockContentsStale to false until the next
-  //    Blockreport.
-  void scheduleHeartbeat() {
-    lastHeartbeat = 0;
-  }
-
-  /**
-   * This methods  arranges for the data node to send the block report at 
-   * the next heartbeat.
-   */
-  void scheduleBlockReport(long delay) {
-    if (delay > 0) { // send BR after random delay
-      lastBlockReport = monotonicNow()
-      - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
-    } else { // send at next heartbeat
-      lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
-    }
-    resetBlockReportTime = true; // reset future BRs for randomness
-  }
-
-  
   /**
    * Report received blocks and delete hints to the Namenode for each
    * storage.
@@ -387,10 +354,10 @@ class BPServiceActor implements Runnable {
   @VisibleForTesting
   void triggerBlockReportForTests() {
     synchronized (pendingIncrementalBRperStorage) {
-      lastBlockReport = 0;
-      lastHeartbeat = 0;
+      scheduler.scheduleHeartbeat();
+      long nextBlockReportTime = scheduler.scheduleBlockReport(0);
       pendingIncrementalBRperStorage.notifyAll();
-      while (lastBlockReport == 0) {
+      while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
         try {
           pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
@@ -403,9 +370,9 @@ class BPServiceActor implements Runnable {
   @VisibleForTesting
   void triggerHeartbeatForTests() {
     synchronized (pendingIncrementalBRperStorage) {
-      lastHeartbeat = 0;
+      final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
       pendingIncrementalBRperStorage.notifyAll();
-      while (lastHeartbeat == 0) {
+      while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
         try {
           pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
@@ -454,8 +421,7 @@ class BPServiceActor implements Runnable {
    */
   List<DatanodeCommand> blockReport() throws IOException {
     // send block report if timer has expired.
-    final long startTime = monotonicNow();
-    if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
+    if (!scheduler.isBlockReportDue()) {
       return null;
     }
 
@@ -536,29 +502,10 @@ class BPServiceActor implements Runnable {
                   (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
           ".");
     }
-    scheduleNextBlockReport(startTime);
+    scheduler.scheduleNextBlockReport();
     return cmds.size() == 0 ? null : cmds;
   }
 
-  private void scheduleNextBlockReport(long previousReportStartTime) {
-    // If we have sent the first set of block reports, then wait a random
-    // time before we start the periodic block reports.
-    if (resetBlockReportTime) {
-      lastBlockReport = previousReportStartTime -
-          DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
-      resetBlockReportTime = false;
-    } else {
-      /* say the last block report was at 8:20:14. The current report
-       * should have started around 9:20:14 (default 1 hour interval).
-       * If current time is :
-       *   1) normal like 9:20:18, next report should be at 10:20:14
-       *   2) unexpected like 11:35:43, next report should be at 12:20:14
-       */
-      lastBlockReport += (monotonicNow() - lastBlockReport) /
-          dnConf.blockReportInterval * dnConf.blockReportInterval;
-    }
-  }
-
   DatanodeCommand cacheReport() throws IOException {
     // If caching is disabled, do not send a cache report
     if (dn.getFSDataset().getCacheCapacity() == 0) {
@@ -686,13 +633,12 @@ class BPServiceActor implements Runnable {
     //
     while (shouldRun()) {
       try {
-        final long startTime = monotonicNow();
+        final long startTime = scheduler.monotonicNow();
 
         //
         // Every so often, send heartbeat or block-report
         //
-        boolean sendHeartbeat =
-            startTime - lastHeartbeat >= dnConf.heartBeatInterval;
+        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
         if (sendHeartbeat) {
           //
           // All heartbeat messages include following info:
@@ -701,11 +647,11 @@ class BPServiceActor implements Runnable {
           // -- Total capacity
           // -- Bytes remaining
           //
-          lastHeartbeat = startTime;
+          scheduler.scheduleNextHeartbeat();
           if (!dn.areHeartbeatsDisabledForTests()) {
             HeartbeatResponse resp = sendHeartBeat();
             assert resp != null;
-            dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
+            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
 
             // If the state of this NN has changed (eg STANDBY->ACTIVE)
             // then let the BPOfferService update itself.
@@ -746,8 +692,7 @@ class BPServiceActor implements Runnable {
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // or work arrives, and then iterate again.
         //
-        long waitTime = dnConf.heartBeatInterval - 
-        (monotonicNow() - lastHeartbeat);
+        long waitTime = scheduler.getHeartbeatWaitTime();
         synchronized(pendingIncrementalBRperStorage) {
           if (waitTime > 0 && !sendImmediateIBR) {
             try {
@@ -820,7 +765,7 @@ class BPServiceActor implements Runnable {
     bpos.registrationSucceeded(this, bpRegistration);
 
     // random short delay - helps scatter the BR from all DNs
-    scheduleBlockReport(dnConf.initialBlockReportDelay);
+    scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
   }
 
 
@@ -935,7 +880,7 @@ class BPServiceActor implements Runnable {
       NamespaceInfo nsInfo = retrieveNamespaceInfo();
       // and re-register
       register(nsInfo);
-      scheduleHeartbeat();
+      scheduler.scheduleHeartbeat();
     }
   }
 
@@ -1013,7 +958,7 @@ class BPServiceActor implements Runnable {
     } else {
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       synchronized(pendingIncrementalBRperStorage) {
-        lastBlockReport = 0;
+        scheduler.scheduleBlockReport(0);
         pendingIncrementalBRperStorage.notifyAll();
       }
     }
@@ -1044,4 +989,116 @@ class BPServiceActor implements Runnable {
       }
     }
   }
+
+  Scheduler getScheduler() {
+    return scheduler;
+  }
+
+  /**
+   * Utility class that wraps the timestamp computations for scheduling
+   * heartbeats and block reports.
+   */
+  static class Scheduler {
+    // nextBlockReportTime and nextHeartbeatTime may be assigned/read
+    // by testing threads (through BPServiceActor#triggerXXX), while also
+    // assigned/read by the actor thread.
+    @VisibleForTesting
+    volatile long nextBlockReportTime = monotonicNow();
+
+    @VisibleForTesting
+    volatile long nextHeartbeatTime = monotonicNow();
+
+    @VisibleForTesting
+    boolean resetBlockReportTime = true;
+
+    private final long heartbeatIntervalMs;
+    private final long blockReportIntervalMs;
+
+    Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
+      this.heartbeatIntervalMs = heartbeatIntervalMs;
+      this.blockReportIntervalMs = blockReportIntervalMs;
+    }
+
+    // This is useful to make sure NN gets Heartbeat before Blockreport
+    // upon NN restart while DN keeps retrying Otherwise,
+    // 1. NN restarts.
+    // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
+    // 3. After reregistration completes, DN will send Blockreport first.
+    // 4. Given NN receives Blockreport after Heartbeat, it won't mark
+    //    DatanodeStorageInfo#blockContentsStale to false until the next
+    //    Blockreport.
+    long scheduleHeartbeat() {
+      nextHeartbeatTime = monotonicNow();
+      return nextHeartbeatTime;
+    }
+
+    long scheduleNextHeartbeat() {
+      // Numerical overflow is possible here and is okay.
+      nextHeartbeatTime += heartbeatIntervalMs;
+      return nextHeartbeatTime;
+    }
+
+    boolean isHeartbeatDue(long startTime) {
+      return (nextHeartbeatTime - startTime <= 0);
+    }
+
+    boolean isBlockReportDue() {
+      return nextBlockReportTime - monotonicNow() <= 0;
+    }
+
+    /**
+     * This methods  arranges for the data node to send the block report at
+     * the next heartbeat.
+     */
+    long scheduleBlockReport(long delay) {
+      if (delay > 0) { // send BR after random delay
+        // Numerical overflow is possible here and is okay.
+        nextBlockReportTime =
+            monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
+      } else { // send at next heartbeat
+        nextBlockReportTime = monotonicNow();
+      }
+      resetBlockReportTime = true; // reset future BRs for randomness
+      return nextBlockReportTime;
+    }
+
+    /**
+     * Schedule the next block report after the block report interval. If the
+     * current block report was delayed then the next block report is sent per
+     * the original schedule.
+     * Numerical overflow is possible here.
+     */
+    void scheduleNextBlockReport() {
+      // If we have sent the first set of block reports, then wait a random
+      // time before we start the periodic block reports.
+      if (resetBlockReportTime) {
+        nextBlockReportTime = monotonicNow() +
+            DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
+        resetBlockReportTime = false;
+      } else {
+        /* say the last block report was at 8:20:14. The current report
+         * should have started around 9:20:14 (default 1 hour interval).
+         * If current time is :
+         *   1) normal like 9:20:18, next report should be at 10:20:14
+         *   2) unexpected like 11:35:43, next report should be at 12:20:14
+         */
+        nextBlockReportTime +=
+              (((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
+                  blockReportIntervalMs)) * blockReportIntervalMs;
+      }
+    }
+
+    long getHeartbeatWaitTime() {
+      return nextHeartbeatTime - monotonicNow();
+    }
+
+    /**
+     * Wrapped for testing.
+     * @return
+     */
+    @VisibleForTesting
+    public long monotonicNow() {
+      return Time.monotonicNow();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b3acc5c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
new file mode 100644
index 0000000..0d7484c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static java.lang.Math.abs;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * Verify the block report and heartbeat scheduling logic of BPServiceActor
+ * using a few different values .
+ */
+public class TestBpServiceActorScheduler {
+  protected static final Log LOG = LogFactory.getLog(TestBpServiceActorScheduler.class);
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
+  private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
+  private final Random random = new Random(System.nanoTime());
+
+  @Test
+  public void testInit() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      assertTrue(scheduler.isHeartbeatDue(now));
+      assertTrue(scheduler.isBlockReportDue());
+    }
+  }
+
+  @Test
+  public void testScheduleBlockReportImmediate() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      scheduler.scheduleBlockReport(0);
+      assertTrue(scheduler.resetBlockReportTime);
+      assertThat(scheduler.nextBlockReportTime, is(now));
+    }
+  }
+
+  @Test
+  public void testScheduleBlockReportDelayed() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      final long delayMs = 10;
+      scheduler.scheduleBlockReport(delayMs);
+      assertTrue(scheduler.resetBlockReportTime);
+      assertTrue(scheduler.nextBlockReportTime - now >= 0);
+      assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
+    }
+  }
+
+  /**
+   * If resetBlockReportTime is true then the next block report must be scheduled
+   * in the range [now, now + BLOCK_REPORT_INTERVAL_SEC).
+   */
+  @Test
+  public void testScheduleNextBlockReport() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      assertTrue(scheduler.resetBlockReportTime);
+      scheduler.scheduleNextBlockReport();
+      assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
+    }
+  }
+
+  /**
+   * If resetBlockReportTime is false then the next block report must be scheduled
+   * exactly at (now + BLOCK_REPORT_INTERVAL_SEC).
+   */
+  @Test
+  public void testScheduleNextBlockReport2() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      scheduler.resetBlockReportTime = false;
+      scheduler.scheduleNextBlockReport();
+      assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
+    }
+  }
+
+  /**
+   * Tests the case when a block report was delayed past its scheduled time.
+   * In that case the next block report should not be delayed for a full interval.
+   */
+  @Test
+  public void testScheduleNextBlockReport3() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      scheduler.resetBlockReportTime = false;
+
+      // Make it look like the block report was scheduled to be sent between 1-3
+      // intervals ago but sent just now.
+      final long blockReportDelay =
+          BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
+      final long origBlockReportTime = now - blockReportDelay;
+      scheduler.nextBlockReportTime = origBlockReportTime;
+      scheduler.scheduleNextBlockReport();
+      assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
+      assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
+    }
+  }
+
+  @Test
+  public void testScheduleHeartbeat() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      scheduler.scheduleNextHeartbeat();
+      assertFalse(scheduler.isHeartbeatDue(now));
+      scheduler.scheduleHeartbeat();
+      assertTrue(scheduler.isHeartbeatDue(now));
+    }
+  }
+
+  private Scheduler makeMockScheduler(long now) {
+    LOG.info("Using now = " + now);
+    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+    doReturn(now).when(mockScheduler).monotonicNow();
+    mockScheduler.nextBlockReportTime = now;
+    mockScheduler.nextHeartbeatTime = now;
+    return mockScheduler;
+  }
+
+  List<Long> getTimestamps() {
+    return Arrays.asList(
+        0L, Long.MIN_VALUE, Long.MAX_VALUE, // test boundaries
+        Long.MAX_VALUE - 1,                 // test integer overflow
+        abs(random.nextLong()),             // positive random
+        -abs(random.nextLong()));           // negative random
+  }
+}