You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/07 06:21:58 UTC

[bookkeeper] branch master updated: Introduce MockClock and MockExecutorController to improve timer-based testing

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fe87f9  Introduce MockClock and MockExecutorController to improve timer-based testing
7fe87f9 is described below

commit 7fe87f9354a04bb6789f8111242d35a1b730d3ba
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sat Jan 6 22:21:52 2018 -0800

    Introduce MockClock and MockExecutorController to improve timer-based testing
    
    Descriptions of the changes in this PR:
    
    - introduce MockClock and MockExecutorController to control time advancing to trigger execution on tasks
    - convert LedgerDirsMonitor from using Thread to use ScheduledExecutorService (moved the logic in `run` method to `check` without code changes)
    - change TestLedgerDirsManager to use MockClock and MockExecutorController to remove the usage of `Thread.sleep`
    
    Master Issue:  #943
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #944 from sijie/mock_scheduler
---
 .../common/testing/executors/MockClock.java        |  65 +++++++
 .../common/testing/executors/MockClockTest.java    |  46 +++++
 .../testing/executors/MockExecutorController.java  | 215 +++++++++++++++++++++
 .../executors/MockExecutorControllerTest.java      | 104 ++++++++++
 bookkeeper-server/pom.xml                          |   7 +
 .../bookkeeper/bookie/LedgerDirsMonitor.java       | 178 ++++++++---------
 .../bookkeeper/bookie/TestLedgerDirsManager.java   |  49 +++--
 7 files changed, 560 insertions(+), 104 deletions(-)

diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java
new file mode 100644
index 0000000..7a640ca
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * A mock implementation of {@link Clock}.
+ */
+public class MockClock extends Clock {
+
+    private final ZoneId zoneId;
+    private Instant now = Instant.ofEpochMilli(0);
+
+    public MockClock() {
+        this(ZoneId.systemDefault());
+    }
+
+    private MockClock(ZoneId zoneId) {
+        this.zoneId = zoneId;
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return zoneId;
+    }
+
+    @Override
+    public MockClock withZone(ZoneId zone) {
+        return new MockClock(zone);
+    }
+
+    @Override
+    public Instant instant() {
+        return now;
+    }
+
+    /**
+     * Advance the clock by the given amount of time.
+     *
+     * @param duration duration to advance.
+     */
+    public void advance(Duration duration) {
+        now = now.plus(duration);
+    }
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java
new file mode 100644
index 0000000..7ca3279
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link MockClock}.
+ */
+public class MockClockTest {
+
+    private MockClock clock;
+
+    @Before
+    public void setup() {
+        this.clock = new MockClock();
+    }
+
+    @Test
+    public void testAdvance() {
+        assertEquals(0L, clock.millis());
+        clock.advance(Duration.ofMillis(10));
+        assertEquals(10L, clock.millis());
+    }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
new file mode 100644
index 0000000..12b3a23
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.mockito.stubbing.Answer;
+
+/**
+ * A mocked scheduled executor that records scheduled tasks and executes them when the clock is
+ * advanced past their execution time.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PUBLIC)
+public class MockExecutorController {
+
+    @Data
+    @Getter
+    private class DeferredTask implements ScheduledFuture<Void> {
+
+        private final Runnable runnable;
+        private final long scheduledAtMillis;
+        @Getter
+        private final CompletableFuture<Void> future;
+
+        public DeferredTask(Runnable runnable,
+                            long delayTimeMs) {
+            this.runnable = runnable;
+            this.scheduledAtMillis = delayTimeMs + clock.millis();
+            this.future = FutureUtils.createFuture();
+        }
+
+        @Override
+        public long getDelay(TimeUnit unit) {
+            return unit.convert(scheduledAtMillis - clock.millis(), TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return future.cancel(mayInterruptIfRunning);
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return future.isCancelled();
+        }
+
+        @Override
+        public boolean isDone() {
+            return future.isDone();
+        }
+
+        @Override
+        public Void get() throws InterruptedException, ExecutionException {
+            future.get();
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException {
+            future.get(timeout, unit);
+            return null;
+        }
+
+        void run() {
+            runnable.run();
+            FutureUtils.complete(future, null);
+        }
+
+    }
+
+    @Getter
+    private final MockClock clock = new MockClock();
+    private final List<DeferredTask> deferredTasks = Lists.newArrayList();
+
+    public MockExecutorController controlSubmit(ScheduledExecutorService service) {
+        doAnswer(answerNow()).when(service).submit(any(Runnable.class));
+        return this;
+    }
+
+    public MockExecutorController controlExecute(ScheduledExecutorService service) {
+        doAnswer(answerNow()).when(service).execute(any(Runnable.class));
+        return this;
+    }
+
+    public MockExecutorController controlSchedule(ScheduledExecutorService service) {
+        doAnswer(answerDelay(this)).when(service).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+        return this;
+    }
+
+    public MockExecutorController controlScheduleAtFixedRate(ScheduledExecutorService service,
+                                                             int maxInvocations) {
+        doAnswer(answerAtFixedRate(this, maxInvocations))
+            .when(service)
+            .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+        return this;
+    }
+
+    private static Answer<ScheduledFuture<?>> answerAtFixedRate(MockExecutorController controller, int numTimes) {
+        return invocationOnMock -> {
+            Runnable task = invocationOnMock.getArgument(0);
+            long initialDelay = invocationOnMock.getArgument(1);
+            long delay = invocationOnMock.getArgument(2);
+            TimeUnit unit = invocationOnMock.getArgument(3);
+
+            DeferredTask deferredTask = null;
+            for (int i = 0; i < numTimes; i++) {
+                long delayMs = unit.toMillis(initialDelay) + i * unit.toMillis(delay);
+
+                deferredTask = controller.addDelayedTask(
+                    controller,
+                    delayMs,
+                    task);
+            }
+            return deferredTask;
+        };
+    }
+
+    private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController executor) {
+        return invocationOnMock -> {
+
+           Runnable task = invocationOnMock.getArgument(0);
+           long value = invocationOnMock.getArgument(1);
+           TimeUnit unit = invocationOnMock.getArgument(2);
+           DeferredTask deferredTask = executor.addDelayedTask(executor, unit.toMillis(value), task);
+           if (value <= 0) {
+               task.run();
+               FutureUtils.complete(deferredTask.future, null);
+           }
+           return deferredTask;
+       };
+    }
+
+    private static Answer<Future<?>> answerNow() {
+        return invocationOnMock -> {
+
+           Runnable task = invocationOnMock.getArgument(0);
+           task.run();
+           SettableFuture<Void> future = SettableFuture.create();
+           future.set(null);
+           return future;
+       };
+    }
+
+    private DeferredTask addDelayedTask(
+            MockExecutorController executor,
+            long delayTimeMs,
+            Runnable task) {
+        checkArgument(delayTimeMs >= 0);
+        DeferredTask deferredTask = new DeferredTask(task, delayTimeMs);
+        executor.deferredTasks.add(deferredTask);
+        return deferredTask;
+    }
+
+    public void advance(Duration duration) {
+        clock.advance(duration);
+        Iterator<DeferredTask> entries = deferredTasks.iterator();
+        List<DeferredTask> toExecute = Lists.newArrayList();
+        while (entries.hasNext()) {
+            DeferredTask next = entries.next();
+            if (next.getDelay(TimeUnit.MILLISECONDS) <= 0) {
+                entries.remove();
+                toExecute.add(next);
+            }
+        }
+        for (DeferredTask task : toExecute) {
+            task.run();
+        }
+    }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java
new file mode 100644
index 0000000..50ea923
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link MockExecutorController}.
+ */
+public class MockExecutorControllerTest {
+
+    private static final int MAX_SCHEDULES = 5;
+
+    private ScheduledExecutorService executor;
+    private MockExecutorController mockExecutorControl;
+
+    @Before
+    public void setup() {
+        this.executor = mock(ScheduledExecutorService.class);
+        this.mockExecutorControl = new MockExecutorController()
+            .controlExecute(executor)
+            .controlSubmit(executor)
+            .controlSchedule(executor)
+            .controlScheduleAtFixedRate(executor, MAX_SCHEDULES);
+    }
+
+    @Test
+    public void testSubmit() {
+        Runnable task = mock(Runnable.class);
+        doNothing().when(task).run();
+        executor.submit(task);
+        verify(task, times(1)).run();
+    }
+
+    @Test
+    public void testExecute() {
+        Runnable task = mock(Runnable.class);
+        doNothing().when(task).run();
+        executor.execute(task);
+        verify(task, times(1)).run();
+    }
+
+    @Test
+    public void testDelay() {
+        Runnable task = mock(Runnable.class);
+        doNothing().when(task).run();
+        executor.schedule(task, 10, TimeUnit.MILLISECONDS);
+        mockExecutorControl.advance(Duration.ofMillis(5));
+        verify(task, times(0)).run();
+        mockExecutorControl.advance(Duration.ofMillis(10));
+        verify(task, times(1)).run();
+    }
+
+    @Test
+    public void testScheduleAtFixedRate() {
+        Runnable task = mock(Runnable.class);
+        doNothing().when(task).run();
+        executor.scheduleAtFixedRate(task, 5, 10, TimeUnit.MILLISECONDS);
+
+        // first delay
+        mockExecutorControl.advance(Duration.ofMillis(2));
+        verify(task, times(0)).run();
+        mockExecutorControl.advance(Duration.ofMillis(3));
+        verify(task, times(1)).run();
+
+        // subsequent delays
+        for (int i = 1; i < MAX_SCHEDULES; i++) {
+            mockExecutorControl.advance(Duration.ofMillis(2));
+            verify(task, times(i)).run();
+            mockExecutorControl.advance(Duration.ofMillis(8));
+            verify(task, times(i + 1)).run();
+        }
+
+        // no more invocations
+        mockExecutorControl.advance(Duration.ofMillis(500));
+        verify(task, times(MAX_SCHEDULES)).run();
+    }
+
+}
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 7b62fb7..bbf218f 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -172,6 +172,13 @@
     </dependency>
     <!-- testing dependencies -->
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>
       <version>2.7.3</version>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index a9ebc36..afd8ad8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -21,12 +21,17 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -40,7 +45,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Thread to monitor the disk space periodically.
  */
-class LedgerDirsMonitor extends BookieThread {
+class LedgerDirsMonitor {
     private static final Logger LOG = LoggerFactory.getLogger(LedgerDirsMonitor.class);
 
     private final int interval;
@@ -48,10 +53,12 @@ class LedgerDirsMonitor extends BookieThread {
     private final ConcurrentMap<File, Float> diskUsages;
     private final DiskChecker diskChecker;
     private final LedgerDirsManager ldm;
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> checkTask;
 
-    public LedgerDirsMonitor(final ServerConfiguration conf, final DiskChecker diskChecker,
+    public LedgerDirsMonitor(final ServerConfiguration conf,
+                             final DiskChecker diskChecker,
                              final LedgerDirsManager ldm) {
-        super("LedgerDirsMonitorThread");
         this.interval = conf.getDiskCheckInterval();
         this.conf = conf;
         this.diskChecker = diskChecker;
@@ -59,98 +66,88 @@ class LedgerDirsMonitor extends BookieThread {
         this.ldm = ldm;
     }
 
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                List<File> writableDirs = ldm.getWritableLedgerDirs();
-                // Check all writable dirs disk space usage.
-                for (File dir : writableDirs) {
+    private void check() {
+        try {
+            List<File> writableDirs = ldm.getWritableLedgerDirs();
+            // Check all writable dirs disk space usage.
+            for (File dir : writableDirs) {
+                try {
+                    diskUsages.put(dir, diskChecker.checkDir(dir));
+                } catch (DiskErrorException e) {
+                    LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
+                    // Notify disk failure to all listeners
+                    for (LedgerDirsListener listener : ldm.getListeners()) {
+                        listener.diskFailed(dir);
+                    }
+                } catch (DiskWarnThresholdException e) {
+                    LOG.warn("Ledger directory {} is almost full.", dir);
+                    diskUsages.put(dir, e.getUsage());
+                    for (LedgerDirsListener listener : ldm.getListeners()) {
+                        listener.diskAlmostFull(dir);
+                    }
+                } catch (DiskOutOfSpaceException e) {
+                    LOG.error("Ledger directory {} is out-of-space.", dir);
+                    diskUsages.put(dir, e.getUsage());
+                    // Notify disk full to all listeners
+                    ldm.addToFilledDirs(dir);
+                }
+            }
+            // Let's get NoWritableLedgerDirException without waiting for the next iteration
+            // in case we are out of writable dirs
+            // otherwise for the duration of {interval} we end up in the state where
+            // bookie cannot get writable dir but considered to be writable
+            ldm.getWritableLedgerDirs();
+        } catch (NoWritableLedgerDirException e) {
+            for (LedgerDirsListener listener : ldm.getListeners()) {
+                listener.allDisksFull();
+            }
+        }
+
+        List<File> fullfilledDirs = new ArrayList<File>(ldm.getFullFilledLedgerDirs());
+        boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs();
+        float totalDiskUsage = 0;
+
+        // When bookie is in READONLY mode .i.e there are no writableLedgerDirs:
+        // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
+        // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if
+        // their usage is < conf.getDiskUsageThreshold.
+        try {
+            if (hasWritableLedgerDirs
+                    || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
+                            .getDiskLowWaterMarkUsageThreshold()) {
+                // Check all full-filled disk space usage
+                for (File dir : fullfilledDirs) {
                     try {
                         diskUsages.put(dir, diskChecker.checkDir(dir));
+                        ldm.addToWritableDirs(dir, true);
                     } catch (DiskErrorException e) {
-                        LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
-                        // Notify disk failure to all listeners
+                        // Notify disk failure to all the listeners
                         for (LedgerDirsListener listener : ldm.getListeners()) {
                             listener.diskFailed(dir);
                         }
                     } catch (DiskWarnThresholdException e) {
-                        LOG.warn("Ledger directory {} is almost full.", dir);
                         diskUsages.put(dir, e.getUsage());
-                        for (LedgerDirsListener listener : ldm.getListeners()) {
-                            listener.diskAlmostFull(dir);
-                        }
+                        // the full-filled dir become writable but still
+                        // above
+                        // warn threshold
+                        ldm.addToWritableDirs(dir, false);
                     } catch (DiskOutOfSpaceException e) {
-                        LOG.error("Ledger directory {} is out-of-space.", dir);
+                        // the full-filled dir is still full-filled
                         diskUsages.put(dir, e.getUsage());
-                        // Notify disk full to all listeners
-                        ldm.addToFilledDirs(dir);
                     }
                 }
-                // Let's get NoWritableLedgerDirException without waiting for the next iteration
-                // in case we are out of writable dirs
-                // otherwise for the duration of {interval} we end up in the state where
-                // bookie cannot get writable dir but considered to be writable
-                ldm.getWritableLedgerDirs();
-            } catch (NoWritableLedgerDirException e) {
-                for (LedgerDirsListener listener : ldm.getListeners()) {
-                    listener.allDisksFull();
-                }
+            } else {
+                LOG.debug(
+                        "Current TotalDiskUsage: {} is greater than LWMThreshold: {}."
+                            + " So not adding any filledDir to WritableDirsList",
+                        totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
             }
-
-            List<File> fullfilledDirs = new ArrayList<File>(ldm.getFullFilledLedgerDirs());
-            boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs();
-            float totalDiskUsage = 0;
-
-            // When bookie is in READONLY mode .i.e there are no writableLedgerDirs:
-            // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
-            // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if
-            // their usage is < conf.getDiskUsageThreshold.
-            try {
-                if (hasWritableLedgerDirs
-                        || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
-                                .getDiskLowWaterMarkUsageThreshold()) {
-                    // Check all full-filled disk space usage
-                    for (File dir : fullfilledDirs) {
-                        try {
-                            diskUsages.put(dir, diskChecker.checkDir(dir));
-                            ldm.addToWritableDirs(dir, true);
-                        } catch (DiskErrorException e) {
-                            // Notify disk failure to all the listeners
-                            for (LedgerDirsListener listener : ldm.getListeners()) {
-                                listener.diskFailed(dir);
-                            }
-                        } catch (DiskWarnThresholdException e) {
-                            diskUsages.put(dir, e.getUsage());
-                            // the full-filled dir become writable but still
-                            // above
-                            // warn threshold
-                            ldm.addToWritableDirs(dir, false);
-                        } catch (DiskOutOfSpaceException e) {
-                            // the full-filled dir is still full-filled
-                            diskUsages.put(dir, e.getUsage());
-                        }
-                    }
-                } else {
-                    LOG.debug(
-                            "Current TotalDiskUsage: {} is greater than LWMThreshold: {}."
-                                + " So not adding any filledDir to WritableDirsList",
-                            totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
-                }
-            } catch (IOException ioe) {
-                LOG.error("Got IOException while monitoring Dirs", ioe);
-                for (LedgerDirsListener listener : ldm.getListeners()) {
-                    listener.fatalError();
-                }
-            }
-            try {
-                Thread.sleep(interval);
-            } catch (InterruptedException e) {
-                LOG.info("LedgerDirsMonitor thread is interrupted");
-                break;
+        } catch (IOException ioe) {
+            LOG.error("Got IOException while monitoring Dirs", ioe);
+            for (LedgerDirsListener listener : ldm.getListeners()) {
+                listener.fatalError();
             }
         }
-        LOG.info("LedgerDirsMonitorThread exited!");
     }
 
     /**
@@ -167,20 +164,25 @@ class LedgerDirsMonitor extends BookieThread {
     }
 
     // start the daemon for disk monitoring
-    @Override
     public void start() {
-        this.setDaemon(true);
-        super.start();
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setNameFormat("LedgerDirsMonitorThread")
+                .setDaemon(true)
+                .build());
+        this.checkTask = this.executor.scheduleAtFixedRate(() -> check(), interval, interval, TimeUnit.MILLISECONDS);
     }
 
     // shutdown disk monitoring daemon
     public void shutdown() {
         LOG.info("Shutting down LedgerDirsMonitor");
-        this.interrupt();
-        try {
-            this.join();
-        } catch (InterruptedException e) {
-            // Ignore
+        if (null != checkTask) {
+            if (checkTask.cancel(true)) {
+                LOG.debug("Failed to cancel check task in LedgerDirsMonitor");
+            }
+        }
+        if (null != executor) {
+            executor.shutdown();
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 7a7f672..4efc69f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -28,16 +28,20 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.Gauge;
@@ -48,14 +52,17 @@ import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
  * Test LedgerDirsManager.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LedgerDirsMonitor.class)
 public class TestLedgerDirsManager {
-    private static final Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class);
 
     ServerConfiguration conf;
     File curDir;
@@ -70,6 +77,10 @@ public class TestLedgerDirsManager {
 
     final List<File> tempDirs = new ArrayList<File>();
 
+    // Thread used by monitor
+    ScheduledExecutorService executor;
+    MockExecutorController executorController;
+
     File createTempDir(String prefix, String suffix) throws IOException {
         File dir = IOUtils.createTempDir(prefix, suffix);
         tempDirs.add(dir);
@@ -78,6 +89,8 @@ public class TestLedgerDirsManager {
 
     @Before
     public void setUp() throws Exception {
+        PowerMockito.mockStatic(Executors.class);
+
         File tmpDir = createTempDir("bkTest", ".dir");
         curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
@@ -88,6 +101,12 @@ public class TestLedgerDirsManager {
         conf.setDiskCheckInterval(diskCheckInterval);
         conf.setIsForceGCAllowWhenNoSpace(true);
 
+        executor = PowerMockito.mock(ScheduledExecutorService.class);
+        executorController = new MockExecutorController()
+            .controlScheduleAtFixedRate(executor, 10);
+        PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any()))
+            .thenReturn(executor);
+
         mockDiskChecker = new MockDiskChecker(threshold, warnThreshold);
         statsProvider = new TestStatsProvider();
         statsLogger = statsProvider.getStatsLogger("test");
@@ -162,7 +181,6 @@ public class TestLedgerDirsManager {
 
     @Test
     public void testLedgerDirsMonitorDuringTransition() throws Exception {
-
         MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
         ledgerMonitor.start();
@@ -170,12 +188,11 @@ public class TestLedgerDirsManager {
         assertFalse(mockLedgerDirsListener.readOnly);
         mockDiskChecker.setUsage(threshold + 0.05f);
 
-        Thread.sleep((diskCheckInterval * 2) + 100);
-
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertTrue(mockLedgerDirsListener.readOnly);
-        mockDiskChecker.setUsage(threshold - 0.05f);
 
-        Thread.sleep(diskCheckInterval + 100);
+        mockDiskChecker.setUsage(threshold - 0.05f);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
 
         assertFalse(mockLedgerDirsListener.readOnly);
     }
@@ -205,36 +222,36 @@ public class TestLedgerDirsManager {
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
         ledgerMonitor.start();
 
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertFalse(mockLedgerDirsListener.readOnly);
 
         // go above LWM but below threshold
         // should still be writable
         mockDiskChecker.setUsage(lwm2nospace);
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertFalse(mockLedgerDirsListener.readOnly);
 
         // exceed the threshold, should go to readonly
         mockDiskChecker.setUsage(nospaceExceeded);
-        Thread.sleep(diskCheckInterval + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertTrue(mockLedgerDirsListener.readOnly);
 
         // drop below threshold but above LWM
         // should stay read-only
         mockDiskChecker.setUsage(lwm2nospace);
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertTrue(mockLedgerDirsListener.readOnly);
 
         // drop below LWM
         // should become writable
         mockDiskChecker.setUsage(lwm2warn);
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertFalse(mockLedgerDirsListener.readOnly);
 
         // go above LWM but below threshold
         // should still be writable
         mockDiskChecker.setUsage(lwm2nospace);
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
         assertFalse(mockLedgerDirsListener.readOnly);
     }
 
@@ -330,7 +347,7 @@ public class TestLedgerDirsManager {
         usageMap.put(dir1, dir1Usage);
         usageMap.put(dir2, dir2Usage);
         mockDiskChecker.setUsageMap(usageMap);
-        Thread.sleep((diskCheckInterval * 2) + 100);
+        executorController.advance(Duration.ofMillis(diskCheckInterval));
 
         float sample1 = getGauge(dir1.getParent()).getSample().floatValue();
         float sample2 = getGauge(dir2.getParent()).getSample().floatValue();

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].