You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/07 06:21:58 UTC
[bookkeeper] branch master updated: Introduce MockClock and
MockExecutorController to improve timer-based testing
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7fe87f9 Introduce MockClock and MockExecutorController to improve timer-based testing
7fe87f9 is described below
commit 7fe87f9354a04bb6789f8111242d35a1b730d3ba
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sat Jan 6 22:21:52 2018 -0800
Introduce MockClock and MockExecutorController to improve timer-based testing
Descriptions of the changes in this PR:
- introduce MockClock and MockExecutorController to control time advancing to trigger execution on tasks
- convert LedgerDirsMonitor from using Thread to use ScheduledExecutorService (moved the logic in `run` method to `check` without code changes)
- change TestLedgerDirsManager to use MockClock and MockExecutorController to remove the usage of `Thread.sleep`
Master Issue: #943
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>, Enrico Olivelli <eo...@gmail.com>
This closes #944 from sijie/mock_scheduler
---
.../common/testing/executors/MockClock.java | 65 +++++++
.../common/testing/executors/MockClockTest.java | 46 +++++
.../testing/executors/MockExecutorController.java | 215 +++++++++++++++++++++
.../executors/MockExecutorControllerTest.java | 104 ++++++++++
bookkeeper-server/pom.xml | 7 +
.../bookkeeper/bookie/LedgerDirsMonitor.java | 178 ++++++++---------
.../bookkeeper/bookie/TestLedgerDirsManager.java | 49 +++--
7 files changed, 560 insertions(+), 104 deletions(-)
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java
new file mode 100644
index 0000000..7a640ca
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClock.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * A mock implementation of {@link Clock}.
+ */
+public class MockClock extends Clock {
+
+ private final ZoneId zoneId;
+ private Instant now = Instant.ofEpochMilli(0);
+
+ public MockClock() {
+ this(ZoneId.systemDefault());
+ }
+
+ private MockClock(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zoneId;
+ }
+
+ @Override
+ public MockClock withZone(ZoneId zone) {
+ return new MockClock(zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return now;
+ }
+
+ /**
+ * Advance the clock by the given amount of time.
+ *
+ * @param duration duration to advance.
+ */
+ public void advance(Duration duration) {
+ now = now.plus(duration);
+ }
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java
new file mode 100644
index 0000000..7ca3279
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockClockTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link MockClock}.
+ */
+public class MockClockTest {
+
+ private MockClock clock;
+
+ @Before
+ public void setup() {
+ this.clock = new MockClock();
+ }
+
+ @Test
+ public void testAdvance() {
+ assertEquals(0L, clock.millis());
+ clock.advance(Duration.ofMillis(10));
+ assertEquals(10L, clock.millis());
+ }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
new file mode 100644
index 0000000..12b3a23
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.mockito.stubbing.Answer;
+
+/**
+ * A mocked scheduled executor that records scheduled tasks and executes them when the clock is
+ * advanced past their execution time.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PUBLIC)
+public class MockExecutorController {
+
+ @Data
+ @Getter
+ private class DeferredTask implements ScheduledFuture<Void> {
+
+ private final Runnable runnable;
+ private final long scheduledAtMillis;
+ @Getter
+ private final CompletableFuture<Void> future;
+
+ public DeferredTask(Runnable runnable,
+ long delayTimeMs) {
+ this.runnable = runnable;
+ this.scheduledAtMillis = delayTimeMs + clock.millis();
+ this.future = FutureUtils.createFuture();
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(scheduledAtMillis - clock.millis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return future.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ future.get();
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ future.get(timeout, unit);
+ return null;
+ }
+
+ void run() {
+ runnable.run();
+ FutureUtils.complete(future, null);
+ }
+
+ }
+
+ @Getter
+ private final MockClock clock = new MockClock();
+ private final List<DeferredTask> deferredTasks = Lists.newArrayList();
+
+ public MockExecutorController controlSubmit(ScheduledExecutorService service) {
+ doAnswer(answerNow()).when(service).submit(any(Runnable.class));
+ return this;
+ }
+
+ public MockExecutorController controlExecute(ScheduledExecutorService service) {
+ doAnswer(answerNow()).when(service).execute(any(Runnable.class));
+ return this;
+ }
+
+ public MockExecutorController controlSchedule(ScheduledExecutorService service) {
+ doAnswer(answerDelay(this)).when(service).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+ return this;
+ }
+
+ public MockExecutorController controlScheduleAtFixedRate(ScheduledExecutorService service,
+ int maxInvocations) {
+ doAnswer(answerAtFixedRate(this, maxInvocations))
+ .when(service)
+ .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
+ return this;
+ }
+
+ private static Answer<ScheduledFuture<?>> answerAtFixedRate(MockExecutorController controller, int numTimes) {
+ return invocationOnMock -> {
+ Runnable task = invocationOnMock.getArgument(0);
+ long initialDelay = invocationOnMock.getArgument(1);
+ long delay = invocationOnMock.getArgument(2);
+ TimeUnit unit = invocationOnMock.getArgument(3);
+
+ DeferredTask deferredTask = null;
+ for (int i = 0; i < numTimes; i++) {
+ long delayMs = unit.toMillis(initialDelay) + i * unit.toMillis(delay);
+
+ deferredTask = controller.addDelayedTask(
+ controller,
+ delayMs,
+ task);
+ }
+ return deferredTask;
+ };
+ }
+
+ private static Answer<ScheduledFuture<?>> answerDelay(MockExecutorController executor) {
+ return invocationOnMock -> {
+
+ Runnable task = invocationOnMock.getArgument(0);
+ long value = invocationOnMock.getArgument(1);
+ TimeUnit unit = invocationOnMock.getArgument(2);
+ DeferredTask deferredTask = executor.addDelayedTask(executor, unit.toMillis(value), task);
+ if (value <= 0) {
+ task.run();
+ FutureUtils.complete(deferredTask.future, null);
+ }
+ return deferredTask;
+ };
+ }
+
+ private static Answer<Future<?>> answerNow() {
+ return invocationOnMock -> {
+
+ Runnable task = invocationOnMock.getArgument(0);
+ task.run();
+ SettableFuture<Void> future = SettableFuture.create();
+ future.set(null);
+ return future;
+ };
+ }
+
+ private DeferredTask addDelayedTask(
+ MockExecutorController executor,
+ long delayTimeMs,
+ Runnable task) {
+ checkArgument(delayTimeMs >= 0);
+ DeferredTask deferredTask = new DeferredTask(task, delayTimeMs);
+ executor.deferredTasks.add(deferredTask);
+ return deferredTask;
+ }
+
+ public void advance(Duration duration) {
+ clock.advance(duration);
+ Iterator<DeferredTask> entries = deferredTasks.iterator();
+ List<DeferredTask> toExecute = Lists.newArrayList();
+ while (entries.hasNext()) {
+ DeferredTask next = entries.next();
+ if (next.getDelay(TimeUnit.MILLISECONDS) <= 0) {
+ entries.remove();
+ toExecute.add(next);
+ }
+ }
+ for (DeferredTask task : toExecute) {
+ task.run();
+ }
+ }
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java
new file mode 100644
index 0000000..50ea923
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorControllerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.testing.executors;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link MockExecutorController}.
+ */
+public class MockExecutorControllerTest {
+
+ private static final int MAX_SCHEDULES = 5;
+
+ private ScheduledExecutorService executor;
+ private MockExecutorController mockExecutorControl;
+
+ @Before
+ public void setup() {
+ this.executor = mock(ScheduledExecutorService.class);
+ this.mockExecutorControl = new MockExecutorController()
+ .controlExecute(executor)
+ .controlSubmit(executor)
+ .controlSchedule(executor)
+ .controlScheduleAtFixedRate(executor, MAX_SCHEDULES);
+ }
+
+ @Test
+ public void testSubmit() {
+ Runnable task = mock(Runnable.class);
+ doNothing().when(task).run();
+ executor.submit(task);
+ verify(task, times(1)).run();
+ }
+
+ @Test
+ public void testExecute() {
+ Runnable task = mock(Runnable.class);
+ doNothing().when(task).run();
+ executor.execute(task);
+ verify(task, times(1)).run();
+ }
+
+ @Test
+ public void testDelay() {
+ Runnable task = mock(Runnable.class);
+ doNothing().when(task).run();
+ executor.schedule(task, 10, TimeUnit.MILLISECONDS);
+ mockExecutorControl.advance(Duration.ofMillis(5));
+ verify(task, times(0)).run();
+ mockExecutorControl.advance(Duration.ofMillis(10));
+ verify(task, times(1)).run();
+ }
+
+ @Test
+ public void testScheduleAtFixedRate() {
+ Runnable task = mock(Runnable.class);
+ doNothing().when(task).run();
+ executor.scheduleAtFixedRate(task, 5, 10, TimeUnit.MILLISECONDS);
+
+ // first delay
+ mockExecutorControl.advance(Duration.ofMillis(2));
+ verify(task, times(0)).run();
+ mockExecutorControl.advance(Duration.ofMillis(3));
+ verify(task, times(1)).run();
+
+ // subsequent delays
+ for (int i = 1; i < MAX_SCHEDULES; i++) {
+ mockExecutorControl.advance(Duration.ofMillis(2));
+ verify(task, times(i)).run();
+ mockExecutorControl.advance(Duration.ofMillis(8));
+ verify(task, times(i + 1)).run();
+ }
+
+ // no more invocations
+ mockExecutorControl.advance(Duration.ofMillis(500));
+ verify(task, times(MAX_SCHEDULES)).run();
+ }
+
+}
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 7b62fb7..bbf218f 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -172,6 +172,13 @@
</dependency>
<!-- testing dependencies -->
<dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>2.7.3</version>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index a9ebc36..afd8ad8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -21,12 +21,17 @@
package org.apache.bookkeeper.bookie;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -40,7 +45,7 @@ import org.slf4j.LoggerFactory;
/**
* Thread to monitor the disk space periodically.
*/
-class LedgerDirsMonitor extends BookieThread {
+class LedgerDirsMonitor {
private static final Logger LOG = LoggerFactory.getLogger(LedgerDirsMonitor.class);
private final int interval;
@@ -48,10 +53,12 @@ class LedgerDirsMonitor extends BookieThread {
private final ConcurrentMap<File, Float> diskUsages;
private final DiskChecker diskChecker;
private final LedgerDirsManager ldm;
+ private ScheduledExecutorService executor;
+ private ScheduledFuture<?> checkTask;
- public LedgerDirsMonitor(final ServerConfiguration conf, final DiskChecker diskChecker,
+ public LedgerDirsMonitor(final ServerConfiguration conf,
+ final DiskChecker diskChecker,
final LedgerDirsManager ldm) {
- super("LedgerDirsMonitorThread");
this.interval = conf.getDiskCheckInterval();
this.conf = conf;
this.diskChecker = diskChecker;
@@ -59,98 +66,88 @@ class LedgerDirsMonitor extends BookieThread {
this.ldm = ldm;
}
- @Override
- public void run() {
- while (true) {
- try {
- List<File> writableDirs = ldm.getWritableLedgerDirs();
- // Check all writable dirs disk space usage.
- for (File dir : writableDirs) {
+ private void check() {
+ try {
+ List<File> writableDirs = ldm.getWritableLedgerDirs();
+ // Check all writable dirs disk space usage.
+ for (File dir : writableDirs) {
+ try {
+ diskUsages.put(dir, diskChecker.checkDir(dir));
+ } catch (DiskErrorException e) {
+ LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
+ // Notify disk failure to all listeners
+ for (LedgerDirsListener listener : ldm.getListeners()) {
+ listener.diskFailed(dir);
+ }
+ } catch (DiskWarnThresholdException e) {
+ LOG.warn("Ledger directory {} is almost full.", dir);
+ diskUsages.put(dir, e.getUsage());
+ for (LedgerDirsListener listener : ldm.getListeners()) {
+ listener.diskAlmostFull(dir);
+ }
+ } catch (DiskOutOfSpaceException e) {
+ LOG.error("Ledger directory {} is out-of-space.", dir);
+ diskUsages.put(dir, e.getUsage());
+ // Notify disk full to all listeners
+ ldm.addToFilledDirs(dir);
+ }
+ }
+ // Let's get NoWritableLedgerDirException without waiting for the next iteration
+ // in case we are out of writable dirs
+ // otherwise for the duration of {interval} we end up in the state where
+ // bookie cannot get writable dir but considered to be writable
+ ldm.getWritableLedgerDirs();
+ } catch (NoWritableLedgerDirException e) {
+ for (LedgerDirsListener listener : ldm.getListeners()) {
+ listener.allDisksFull();
+ }
+ }
+
+ List<File> fullfilledDirs = new ArrayList<File>(ldm.getFullFilledLedgerDirs());
+ boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs();
+ float totalDiskUsage = 0;
+
+ // When bookie is in READONLY mode .i.e there are no writableLedgerDirs:
+ // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
+ // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if
+ // their usage is < conf.getDiskUsageThreshold.
+ try {
+ if (hasWritableLedgerDirs
+ || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
+ .getDiskLowWaterMarkUsageThreshold()) {
+ // Check all full-filled disk space usage
+ for (File dir : fullfilledDirs) {
try {
diskUsages.put(dir, diskChecker.checkDir(dir));
+ ldm.addToWritableDirs(dir, true);
} catch (DiskErrorException e) {
- LOG.error("Ledger directory {} failed on disk checking : ", dir, e);
- // Notify disk failure to all listeners
+ // Notify disk failure to all the listeners
for (LedgerDirsListener listener : ldm.getListeners()) {
listener.diskFailed(dir);
}
} catch (DiskWarnThresholdException e) {
- LOG.warn("Ledger directory {} is almost full.", dir);
diskUsages.put(dir, e.getUsage());
- for (LedgerDirsListener listener : ldm.getListeners()) {
- listener.diskAlmostFull(dir);
- }
+ // the full-filled dir become writable but still
+ // above
+ // warn threshold
+ ldm.addToWritableDirs(dir, false);
} catch (DiskOutOfSpaceException e) {
- LOG.error("Ledger directory {} is out-of-space.", dir);
+ // the full-filled dir is still full-filled
diskUsages.put(dir, e.getUsage());
- // Notify disk full to all listeners
- ldm.addToFilledDirs(dir);
}
}
- // Let's get NoWritableLedgerDirException without waiting for the next iteration
- // in case we are out of writable dirs
- // otherwise for the duration of {interval} we end up in the state where
- // bookie cannot get writable dir but considered to be writable
- ldm.getWritableLedgerDirs();
- } catch (NoWritableLedgerDirException e) {
- for (LedgerDirsListener listener : ldm.getListeners()) {
- listener.allDisksFull();
- }
+ } else {
+ LOG.debug(
+ "Current TotalDiskUsage: {} is greater than LWMThreshold: {}."
+ + " So not adding any filledDir to WritableDirsList",
+ totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
}
-
- List<File> fullfilledDirs = new ArrayList<File>(ldm.getFullFilledLedgerDirs());
- boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs();
- float totalDiskUsage = 0;
-
- // When bookie is in READONLY mode .i.e there are no writableLedgerDirs:
- // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
- // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if
- // their usage is < conf.getDiskUsageThreshold.
- try {
- if (hasWritableLedgerDirs
- || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
- .getDiskLowWaterMarkUsageThreshold()) {
- // Check all full-filled disk space usage
- for (File dir : fullfilledDirs) {
- try {
- diskUsages.put(dir, diskChecker.checkDir(dir));
- ldm.addToWritableDirs(dir, true);
- } catch (DiskErrorException e) {
- // Notify disk failure to all the listeners
- for (LedgerDirsListener listener : ldm.getListeners()) {
- listener.diskFailed(dir);
- }
- } catch (DiskWarnThresholdException e) {
- diskUsages.put(dir, e.getUsage());
- // the full-filled dir become writable but still
- // above
- // warn threshold
- ldm.addToWritableDirs(dir, false);
- } catch (DiskOutOfSpaceException e) {
- // the full-filled dir is still full-filled
- diskUsages.put(dir, e.getUsage());
- }
- }
- } else {
- LOG.debug(
- "Current TotalDiskUsage: {} is greater than LWMThreshold: {}."
- + " So not adding any filledDir to WritableDirsList",
- totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
- }
- } catch (IOException ioe) {
- LOG.error("Got IOException while monitoring Dirs", ioe);
- for (LedgerDirsListener listener : ldm.getListeners()) {
- listener.fatalError();
- }
- }
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- LOG.info("LedgerDirsMonitor thread is interrupted");
- break;
+ } catch (IOException ioe) {
+ LOG.error("Got IOException while monitoring Dirs", ioe);
+ for (LedgerDirsListener listener : ldm.getListeners()) {
+ listener.fatalError();
}
}
- LOG.info("LedgerDirsMonitorThread exited!");
}
/**
@@ -167,20 +164,25 @@ class LedgerDirsMonitor extends BookieThread {
}
// start the daemon for disk monitoring
- @Override
public void start() {
- this.setDaemon(true);
- super.start();
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("LedgerDirsMonitorThread")
+ .setDaemon(true)
+ .build());
+ this.checkTask = this.executor.scheduleAtFixedRate(() -> check(), interval, interval, TimeUnit.MILLISECONDS);
}
// shutdown disk monitoring daemon
public void shutdown() {
LOG.info("Shutting down LedgerDirsMonitor");
- this.interrupt();
- try {
- this.join();
- } catch (InterruptedException e) {
- // Ignore
+ if (null != checkTask) {
+ if (checkTask.cancel(true)) {
+ LOG.debug("Failed to cancel check task in LedgerDirsMonitor");
+ }
+ }
+ if (null != executor) {
+ executor.shutdown();
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 7a7f672..4efc69f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -28,16 +28,20 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.stats.Gauge;
@@ -48,14 +52,17 @@ import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
/**
* Test LedgerDirsManager.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(LedgerDirsMonitor.class)
public class TestLedgerDirsManager {
- private static final Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class);
ServerConfiguration conf;
File curDir;
@@ -70,6 +77,10 @@ public class TestLedgerDirsManager {
final List<File> tempDirs = new ArrayList<File>();
+ // Thread used by monitor
+ ScheduledExecutorService executor;
+ MockExecutorController executorController;
+
File createTempDir(String prefix, String suffix) throws IOException {
File dir = IOUtils.createTempDir(prefix, suffix);
tempDirs.add(dir);
@@ -78,6 +89,8 @@ public class TestLedgerDirsManager {
@Before
public void setUp() throws Exception {
+ PowerMockito.mockStatic(Executors.class);
+
File tmpDir = createTempDir("bkTest", ".dir");
curDir = Bookie.getCurrentDirectory(tmpDir);
Bookie.checkDirectoryStructure(curDir);
@@ -88,6 +101,12 @@ public class TestLedgerDirsManager {
conf.setDiskCheckInterval(diskCheckInterval);
conf.setIsForceGCAllowWhenNoSpace(true);
+ executor = PowerMockito.mock(ScheduledExecutorService.class);
+ executorController = new MockExecutorController()
+ .controlScheduleAtFixedRate(executor, 10);
+ PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any()))
+ .thenReturn(executor);
+
mockDiskChecker = new MockDiskChecker(threshold, warnThreshold);
statsProvider = new TestStatsProvider();
statsLogger = statsProvider.getStatsLogger("test");
@@ -162,7 +181,6 @@ public class TestLedgerDirsManager {
@Test
public void testLedgerDirsMonitorDuringTransition() throws Exception {
-
MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
ledgerMonitor.start();
@@ -170,12 +188,11 @@ public class TestLedgerDirsManager {
assertFalse(mockLedgerDirsListener.readOnly);
mockDiskChecker.setUsage(threshold + 0.05f);
- Thread.sleep((diskCheckInterval * 2) + 100);
-
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertTrue(mockLedgerDirsListener.readOnly);
- mockDiskChecker.setUsage(threshold - 0.05f);
- Thread.sleep(diskCheckInterval + 100);
+ mockDiskChecker.setUsage(threshold - 0.05f);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
}
@@ -205,36 +222,36 @@ public class TestLedgerDirsManager {
dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
ledgerMonitor.start();
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
// go above LWM but below threshold
// should still be writable
mockDiskChecker.setUsage(lwm2nospace);
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
// exceed the threshold, should go to readonly
mockDiskChecker.setUsage(nospaceExceeded);
- Thread.sleep(diskCheckInterval + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertTrue(mockLedgerDirsListener.readOnly);
// drop below threshold but above LWM
// should stay read-only
mockDiskChecker.setUsage(lwm2nospace);
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertTrue(mockLedgerDirsListener.readOnly);
// drop below LWM
// should become writable
mockDiskChecker.setUsage(lwm2warn);
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
// go above LWM but below threshold
// should still be writable
mockDiskChecker.setUsage(lwm2nospace);
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
}
@@ -330,7 +347,7 @@ public class TestLedgerDirsManager {
usageMap.put(dir1, dir1Usage);
usageMap.put(dir2, dir2Usage);
mockDiskChecker.setUsageMap(usageMap);
- Thread.sleep((diskCheckInterval * 2) + 100);
+ executorController.advance(Duration.ofMillis(diskCheckInterval));
float sample1 = getGauge(dir1.getParent()).getSample().floatValue();
float sample2 = getGauge(dir2.getParent()).getSample().floatValue();
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].