You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2021/11/09 19:33:05 UTC

[bookkeeper] branch master updated: fix: Stream Storage tests are flaky; statelib test times out (#2883)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 06fe2ab  fix: Stream Storage tests are flaky; statelib test times out (#2883)
06fe2ab is described below

commit 06fe2ab00612c7b91bc1f07aa27fbaaedb492bf2
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Nov 9 11:32:59 2021 -0800

    fix: Stream Storage tests are flaky; statelib test times out (#2883)
---
 .../kv/TestRocksdbKVAsyncStoreWithCheckpoints.java |  6 ++++-
 .../testing/executors/MockExecutorController.java  | 26 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
index 3dfd3ac..d3c41a3 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVAsyncStoreWithCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.statelib.impl.kv;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.statelib.testing.executors.MockExecutorController.THREAD_NAME_PREFIX;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -28,6 +29,8 @@ import static org.mockito.Mockito.mock;
 
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.File;
 import java.net.URI;
 import java.nio.file.Files;
@@ -119,7 +122,8 @@ public class TestRocksdbKVAsyncStoreWithCheckpoints extends TestDistributedLogBa
         checkpointStore = new FSCheckpointManager(remoteDir);
 
         // initialize the scheduler
-        realWriteExecutor = Executors.newSingleThreadExecutor();
+        realWriteExecutor = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_PREFIX + "%d").build());
         mockWriteExecutor = mock(ScheduledExecutorService.class);
         writeExecutorController = new MockExecutorController(realWriteExecutor)
             .controlExecute(mockWriteExecutor)
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
index 2575bbd..0b27377 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/testing/executors/MockExecutorController.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.statelib.testing.executors;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
@@ -41,6 +43,7 @@ import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.junit.Assert;
 import org.mockito.stubbing.Answer;
 
 /**
@@ -49,6 +52,7 @@ import org.mockito.stubbing.Answer;
  */
 @Slf4j
 public class MockExecutorController {
+    public static final String THREAD_NAME_PREFIX = "realWriteExecutor-";
 
     @Data
     @Getter
@@ -126,19 +130,32 @@ public class MockExecutorController {
             runnable.run();
         } else {
             try {
+                Assert.assertThat("calling this on the same thread will result in deadlock",
+                        Thread.currentThread().getName(),
+                        not(containsString(THREAD_NAME_PREFIX)));
                 executor.submit(runnable).get();
             } catch (InterruptedException | ExecutionException e) {
+                log.warn("runTask failed", e);
             }
         }
     }
 
+    private Future<?> runTaskAsync(Runnable runnable) {
+        if (null == executor) {
+            runnable.run();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return executor.submit(runnable);
+        }
+    }
+
     public MockExecutorController controlSubmit(ScheduledExecutorService service) {
         doAnswer(answerNow(this)).when(service).submit(any(Runnable.class));
         return this;
     }
 
     public MockExecutorController controlExecute(ScheduledExecutorService service) {
-        doAnswer(answerNow(this)).when(service).execute(any(Runnable.class));
+        doAnswer(answerNowAsync(this)).when(service).execute(any(Runnable.class));
         return this;
     }
 
@@ -190,6 +207,13 @@ public class MockExecutorController {
         };
     }
 
+    private static Answer<Future<?>> answerNowAsync(MockExecutorController controller) {
+        return invocationOnMock -> {
+            Runnable task = invocationOnMock.getArgument(0);
+            return controller.runTaskAsync(task);
+        };
+    }
+
     private static Answer<Future<?>> answerNow(MockExecutorController controller) {
         return invocationOnMock -> {