You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2021/11/09 19:33:05 UTC
[bookkeeper] branch master updated: fix: Stream Storage tests are
flaky; statelib test times out (#2883)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 06fe2ab fix: Stream Storage tests are flaky; statelib test times out (#2883)
06fe2ab is described below
commit 06fe2ab00612c7b91bc1f07aa27fbaaedb492bf2
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Nov 9 11:32:59 2021 -0800
fix: Stream Storage tests are flaky; statelib test times out (#2883)
---
.../kv/TestRocksdbKVAsyncStoreWithCheckpoints.java | 6 ++++-
.../testing/executors/MockExecutorController.java | 26 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
index 3dfd3ac..d3c41a3 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.statelib.impl.kv;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.statelib.testing.executors.MockExecutorController.THREAD_NAME_PREFIX;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,6 +29,8 @@ import static org.mockito.Mockito.mock;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.File;
import java.net.URI;
import java.nio.file.Files;
@@ -119,7 +122,8 @@ public class TestRocksdbKVAsyncStoreWithCheckpoints extends TestDistributedLogBa
checkpointStore = new FSCheckpointManager(remoteDir);
// initialize the scheduler
- realWriteExecutor = Executors.newSingleThreadExecutor();
+ realWriteExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_PREFIX + "%d").build());
mockWriteExecutor = mock(ScheduledExecutorService.class);
writeExecutorController = new MockExecutorController(realWriteExecutor)
.controlExecute(mockWriteExecutor)
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
index 2575bbd..0b27377 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.statelib.testing.executors;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
@@ -41,6 +43,7 @@ import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.junit.Assert;
import org.mockito.stubbing.Answer;
/**
@@ -49,6 +52,7 @@ import org.mockito.stubbing.Answer;
*/
@Slf4j
public class MockExecutorController {
+ public static final String THREAD_NAME_PREFIX = "realWriteExecutor-";
@Data
@Getter
@@ -126,19 +130,32 @@ public class MockExecutorController {
runnable.run();
} else {
try {
+ Assert.assertThat("calling this on the same thread will result in deadlock",
+ Thread.currentThread().getName(),
+ not(containsString(THREAD_NAME_PREFIX)));
executor.submit(runnable).get();
} catch (InterruptedException | ExecutionException e) {
+ log.warn("runTask failed", e);
}
}
}
+ private Future<?> runTaskAsync(Runnable runnable) {
+ if (null == executor) {
+ runnable.run();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return executor.submit(runnable);
+ }
+ }
+
public MockExecutorController controlSubmit(ScheduledExecutorService service) {
doAnswer(answerNow(this)).when(service).submit(any(Runnable.class));
return this;
}
public MockExecutorController controlExecute(ScheduledExecutorService service) {
- doAnswer(answerNow(this)).when(service).execute(any(Runnable.class));
+ doAnswer(answerNowAsync(this)).when(service).execute(any(Runnable.class));
return this;
}
@@ -190,6 +207,13 @@ public class MockExecutorController {
};
}
+ private static Answer<Future<?>> answerNowAsync(MockExecutorController controller) {
+ return invocationOnMock -> {
+ Runnable task = invocationOnMock.getArgument(0);
+ return controller.runTaskAsync(task);
+ };
+ }
+
private static Answer<Future<?>> answerNow(MockExecutorController controller) {
return invocationOnMock -> {