You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/04/21 20:18:25 UTC
[3/3] hadoop git commit: HDFS-8163. Using monotonicNow for block
report scheduling causes test failures on recently restarted systems. (Arpit
Agarwal)
HDFS-8163. Using monotonicNow for block report scheduling causes test failures on recently restarted systems. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b17d365f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b17d365f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b17d365f
Branch: refs/heads/branch-2.7
Commit: b17d365fa1d762ed56fc2077fb0f8396383602f3
Parents: 7dd5f42
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Apr 21 10:58:05 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Apr 21 11:17:27 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/util/Time.java | 2 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BPOfferService.java | 2 +-
.../hdfs/server/datanode/BPServiceActor.java | 203 ++++++++++++-------
.../datanode/TestBpServiceActorScheduler.java | 163 +++++++++++++++
5 files changed, 301 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b17d365f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
index 347cdc5..987e1b0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Time.java
@@ -42,6 +42,8 @@ public final class Time {
* milliseconds, and not affected by settimeofday or similar system clock
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
+ * This function can return a negative value and it must be handled correctly
+ * by callers. See the documentation of System#nanoTime for caveats.
* @return a monotonic clock that counts in milliseconds.
*/
public static long monotonicNow() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b17d365f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fb7e7e7..37fa10e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -38,6 +38,9 @@ Release 2.7.1 - UNRELEASED
HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
goes for infinite loop (vinayakumarb)
+ HDFS-8163. Using monotonicNow for block report scheduling causes
+ test failures on recently restarted systems. (Arpit Agarwal)
+
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b17d365f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 3e7c897..36a868e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -429,7 +429,7 @@ class BPOfferService {
*/
void scheduleBlockReport(long delay) {
for (BPServiceActor actor : bpServices) {
- actor.scheduleBlockReport(delay);
+ actor.getScheduler().scheduleBlockReport(delay);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b17d365f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 10cce45..49a1991 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@@ -82,20 +83,13 @@ class BPServiceActor implements Runnable {
final BPOfferService bpos;
- // lastBlockReport, lastDeletedReport and lastHeartbeat may be assigned/read
- // by testing threads (through BPServiceActor#triggerXXX), while also
- // assigned/read by the actor thread. Thus they should be declared as volatile
- // to make sure the "happens-before" consistency.
- volatile long lastBlockReport = 0;
volatile long lastDeletedReport = 0;
- boolean resetBlockReportTime = true;
-
volatile long lastCacheReport = 0;
+ private final Scheduler scheduler;
Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
- private volatile long lastHeartbeat = 0;
static enum RunningState {
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
@@ -129,6 +123,7 @@ class BPServiceActor implements Runnable {
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf();
+ scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
}
boolean isAlive() {
@@ -231,33 +226,6 @@ class BPServiceActor implements Runnable {
register(nsInfo);
}
- // This is useful to make sure NN gets Heartbeat before Blockreport
- // upon NN restart while DN keeps retrying Otherwise,
- // 1. NN restarts.
- // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
- // 3. After reregistration completes, DN will send Blockreport first.
- // 4. Given NN receives Blockreport after Heartbeat, it won't mark
- // DatanodeStorageInfo#blockContentsStale to false until the next
- // Blockreport.
- void scheduleHeartbeat() {
- lastHeartbeat = 0;
- }
-
- /**
- * This methods arranges for the data node to send the block report at
- * the next heartbeat.
- */
- void scheduleBlockReport(long delay) {
- if (delay > 0) { // send BR after random delay
- lastBlockReport = monotonicNow()
- - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
- } else { // send at next heartbeat
- lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
- }
- resetBlockReportTime = true; // reset future BRs for randomness
- }
-
-
/**
* Report received blocks and delete hints to the Namenode for each
* storage.
@@ -386,10 +354,10 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
- lastBlockReport = 0;
- lastHeartbeat = 0;
+ scheduler.scheduleHeartbeat();
+ long nextBlockReportTime = scheduler.scheduleBlockReport(0);
pendingIncrementalBRperStorage.notifyAll();
- while (lastBlockReport == 0) {
+ while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
@@ -402,9 +370,9 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
void triggerHeartbeatForTests() {
synchronized (pendingIncrementalBRperStorage) {
- lastHeartbeat = 0;
+ final long nextHeartbeatTime = scheduler.scheduleHeartbeat();
pendingIncrementalBRperStorage.notifyAll();
- while (lastHeartbeat == 0) {
+ while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
@@ -453,8 +421,8 @@ class BPServiceActor implements Runnable {
*/
List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired.
- final long startTime = monotonicNow();
- if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
+ final long startTime = scheduler.monotonicNow();
+ if (!scheduler.isBlockReportDue()) {
return null;
}
@@ -536,29 +504,10 @@ class BPServiceActor implements Runnable {
(nCmds + " commands: " + Joiner.on("; ").join(cmds)))) +
".");
}
- scheduleNextBlockReport(startTime);
+ scheduler.scheduleNextBlockReport();
return cmds.size() == 0 ? null : cmds;
}
- private void scheduleNextBlockReport(long previousReportStartTime) {
- // If we have sent the first set of block reports, then wait a random
- // time before we start the periodic block reports.
- if (resetBlockReportTime) {
- lastBlockReport = previousReportStartTime -
- DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (monotonicNow() - lastBlockReport) /
- dnConf.blockReportInterval * dnConf.blockReportInterval;
- }
- }
-
DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) {
@@ -685,12 +634,13 @@ class BPServiceActor implements Runnable {
//
while (shouldRun()) {
try {
- final long startTime = monotonicNow();
+ final long startTime = scheduler.monotonicNow();
//
// Every so often, send heartbeat or block-report
//
- if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
+ final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
+ if (sendHeartbeat) {
//
// All heartbeat messages include following info:
// -- Datanode name
@@ -698,11 +648,11 @@ class BPServiceActor implements Runnable {
// -- Total capacity
// -- Bytes remaining
//
- lastHeartbeat = startTime;
+ scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
assert resp != null;
- dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
+ dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
@@ -745,8 +695,7 @@ class BPServiceActor implements Runnable {
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
- long waitTime = dnConf.heartBeatInterval -
- (monotonicNow() - lastHeartbeat);
+ long waitTime = scheduler.getHeartbeatWaitTime();
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
@@ -819,7 +768,7 @@ class BPServiceActor implements Runnable {
bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(dnConf.initialBlockReportDelay);
+ scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
}
@@ -934,7 +883,7 @@ class BPServiceActor implements Runnable {
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// and re-register
register(nsInfo);
- scheduleHeartbeat();
+ scheduler.scheduleHeartbeat();
}
}
@@ -1012,7 +961,7 @@ class BPServiceActor implements Runnable {
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) {
- lastBlockReport = 0;
+ scheduler.scheduleBlockReport(0);
pendingIncrementalBRperStorage.notifyAll();
}
}
@@ -1043,4 +992,116 @@ class BPServiceActor implements Runnable {
}
}
}
+
+ Scheduler getScheduler() {
+ return scheduler;
+ }
+
+ /**
+ * Utility class that wraps the timestamp computations for scheduling
+ * heartbeats and block reports.
+ */
+ static class Scheduler {
+ // nextBlockReportTime and nextHeartbeatTime may be assigned/read
+ // by testing threads (through BPServiceActor#triggerXXX), while also
+ // assigned/read by the actor thread.
+ @VisibleForTesting
+ volatile long nextBlockReportTime = monotonicNow();
+
+ @VisibleForTesting
+ volatile long nextHeartbeatTime = monotonicNow();
+
+ @VisibleForTesting
+ boolean resetBlockReportTime = true;
+
+ private final long heartbeatIntervalMs;
+ private final long blockReportIntervalMs;
+
+ Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.blockReportIntervalMs = blockReportIntervalMs;
+ }
+
+ // This is useful to make sure NN gets Heartbeat before Blockreport
+ // upon NN restart while DN keeps retrying Otherwise,
+ // 1. NN restarts.
+ // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
+ // 3. After reregistration completes, DN will send Blockreport first.
+ // 4. Given NN receives Blockreport after Heartbeat, it won't mark
+ // DatanodeStorageInfo#blockContentsStale to false until the next
+ // Blockreport.
+ long scheduleHeartbeat() {
+ nextHeartbeatTime = monotonicNow();
+ return nextHeartbeatTime;
+ }
+
+ long scheduleNextHeartbeat() {
+ // Numerical overflow is possible here and is okay.
+ nextHeartbeatTime += heartbeatIntervalMs;
+ return nextHeartbeatTime;
+ }
+
+ boolean isHeartbeatDue(long startTime) {
+ return (nextHeartbeatTime - startTime <= 0);
+ }
+
+ boolean isBlockReportDue() {
+ return nextBlockReportTime - monotonicNow() <= 0;
+ }
+
+ /**
+ * This methods arranges for the data node to send the block report at
+ * the next heartbeat.
+ */
+ long scheduleBlockReport(long delay) {
+ if (delay > 0) { // send BR after random delay
+ // Numerical overflow is possible here and is okay.
+ nextBlockReportTime =
+ monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
+ } else { // send at next heartbeat
+ nextBlockReportTime = monotonicNow();
+ }
+ resetBlockReportTime = true; // reset future BRs for randomness
+ return nextBlockReportTime;
+ }
+
+ /**
+ * Schedule the next block report after the block report interval. If the
+ * current block report was delayed then the next block report is sent per
+ * the original schedule.
+ * Numerical overflow is possible here.
+ */
+ void scheduleNextBlockReport() {
+ // If we have sent the first set of block reports, then wait a random
+ // time before we start the periodic block reports.
+ if (resetBlockReportTime) {
+ nextBlockReportTime = monotonicNow() +
+ DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
+ resetBlockReportTime = false;
+ } else {
+ /* say the last block report was at 8:20:14. The current report
+ * should have started around 9:20:14 (default 1 hour interval).
+ * If current time is :
+ * 1) normal like 9:20:18, next report should be at 10:20:14
+ * 2) unexpected like 11:35:43, next report should be at 12:20:14
+ */
+ nextBlockReportTime +=
+ (((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
+ blockReportIntervalMs)) * blockReportIntervalMs;
+ }
+ }
+
+ long getHeartbeatWaitTime() {
+ return nextHeartbeatTime - monotonicNow();
+ }
+
+ /**
+ * Wrapped for testing.
+ * @return
+ */
+ @VisibleForTesting
+ public long monotonicNow() {
+ return Time.monotonicNow();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b17d365f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
new file mode 100644
index 0000000..0d7484c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static java.lang.Math.abs;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * Verify the block report and heartbeat scheduling logic of BPServiceActor
+ * using a few different values .
+ */
+public class TestBpServiceActorScheduler {
+ protected static final Log LOG = LogFactory.getLog(TestBpServiceActorScheduler.class);
+
+ @Rule
+ public Timeout timeout = new Timeout(300000);
+
+ private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
+ private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
+ private final Random random = new Random(System.nanoTime());
+
+ @Test
+ public void testInit() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ assertTrue(scheduler.isHeartbeatDue(now));
+ assertTrue(scheduler.isBlockReportDue());
+ }
+ }
+
+ @Test
+ public void testScheduleBlockReportImmediate() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ scheduler.scheduleBlockReport(0);
+ assertTrue(scheduler.resetBlockReportTime);
+ assertThat(scheduler.nextBlockReportTime, is(now));
+ }
+ }
+
+ @Test
+ public void testScheduleBlockReportDelayed() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ final long delayMs = 10;
+ scheduler.scheduleBlockReport(delayMs);
+ assertTrue(scheduler.resetBlockReportTime);
+ assertTrue(scheduler.nextBlockReportTime - now >= 0);
+ assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
+ }
+ }
+
+ /**
+ * If resetBlockReportTime is true then the next block report must be scheduled
+ * in the range [now, now + BLOCK_REPORT_INTERVAL_SEC).
+ */
+ @Test
+ public void testScheduleNextBlockReport() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ assertTrue(scheduler.resetBlockReportTime);
+ scheduler.scheduleNextBlockReport();
+ assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
+ }
+ }
+
+ /**
+ * If resetBlockReportTime is false then the next block report must be scheduled
+ * exactly at (now + BLOCK_REPORT_INTERVAL_SEC).
+ */
+ @Test
+ public void testScheduleNextBlockReport2() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ scheduler.resetBlockReportTime = false;
+ scheduler.scheduleNextBlockReport();
+ assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
+ }
+ }
+
+ /**
+ * Tests the case when a block report was delayed past its scheduled time.
+ * In that case the next block report should not be delayed for a full interval.
+ */
+ @Test
+ public void testScheduleNextBlockReport3() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ scheduler.resetBlockReportTime = false;
+
+ // Make it look like the block report was scheduled to be sent between 1-3
+ // intervals ago but sent just now.
+ final long blockReportDelay =
+ BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
+ final long origBlockReportTime = now - blockReportDelay;
+ scheduler.nextBlockReportTime = origBlockReportTime;
+ scheduler.scheduleNextBlockReport();
+ assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
+ assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
+ }
+ }
+
+ @Test
+ public void testScheduleHeartbeat() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ scheduler.scheduleNextHeartbeat();
+ assertFalse(scheduler.isHeartbeatDue(now));
+ scheduler.scheduleHeartbeat();
+ assertTrue(scheduler.isHeartbeatDue(now));
+ }
+ }
+
+ private Scheduler makeMockScheduler(long now) {
+ LOG.info("Using now = " + now);
+ Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+ doReturn(now).when(mockScheduler).monotonicNow();
+ mockScheduler.nextBlockReportTime = now;
+ mockScheduler.nextHeartbeatTime = now;
+ return mockScheduler;
+ }
+
+ List<Long> getTimestamps() {
+ return Arrays.asList(
+ 0L, Long.MIN_VALUE, Long.MAX_VALUE, // test boundaries
+ Long.MAX_VALUE - 1, // test integer overflow
+ abs(random.nextLong()), // positive random
+ -abs(random.nextLong())); // negative random
+ }
+}