You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:48:33 UTC

[james-project] 01/05: JAMES-3058 Cassandra instrumentation: Future for ongoing computation

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 96d202e08c697f569071181be79a7228a631e22a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 12 09:54:03 2020 +0700

    JAMES-3058 Cassandra instrumentation: Future for ongoing computation
    
    Within Cassandra instrumentation we previously relyed in the following
    pattern when synchronizing Cassandra code:
    
    ```
    Mono<Void> operation = dao.updateVersion(newVersion).cache();
    
    operation.subscribeOn(Schedulers.elastic()).subscribe();
    barrier.awaitCaller();
    barrier.releaseCaller();
    operation.block();
    ```
    
    Which leaded to unstable test catched by our CI:
    
    ```
    [5c452decd240c33200a5bb639dce022d9846bea8] [INFO] Running org.apache.james.backends.cassandra.TestingSessionTest
    [5c452decd240c33200a5bb639dce022d9846bea8] 16:50:42.328 [ERROR] r.c.s.Schedulers - Scheduler worker in group main failed with an uncaught exception
    [5c452decd240c33200a5bb639dce022d9846bea8] org.apache.james.backends.cassandra.Scenario$InjectedFailureException: Injected failure
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.Scenario$Behavior.lambda$static$0(Scenario.java:45)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.Scenario$Behavior.lambda$awaitOn$1(Scenario.java:53)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.TestingSession.executeAsync(TestingSession.java:121)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor.lambda$execute$0(CassandraAsyncExecutor.java:47)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.Mono.subscribe(Mono.java:4210)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:47)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.Mono.subscribe(Mono.java:4195)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:124)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.lang.Thread.run(Thread.java:834)
    [5c452decd240c33200a5bb639dce022d9846bea8] Wrapped by: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.james.backends.cassandra.Scenario$InjectedFailureException: Injected failure
    [Then blocks forever]
    ```
    
    Relying on future to represent ongoing computation effectively solved
    the issue (unable to reproduce with a @RepeatedTest(100))
    
    The pattern then is:
    
    ```
    CompletableFuture<Void> operation = dao.updateVersion(newVersion)
        .subscribeOn(Schedulers.elastic())
        .toFuture();
    
    barrier.awaitCaller();
    barrier.releaseCaller();
    operation.get();
    ```
---
 .../backends/cassandra/TestingSessionTest.java     | 28 ++++++++++++----------
 .../cassandra/mail/CassandraMailboxDAOTest.java    | 16 +++++++------
 .../mail/CassandraModSeqProviderTest.java          | 20 +++++++---------
 3 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
index 54fca6f..27a191a 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -29,6 +29,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.Scenario.InjectedFailureException;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,7 +42,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class TestingSessionTest {
@@ -222,7 +223,7 @@ class TestingSessionTest {
     }
 
     @Test
-    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandra) {
+    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandra) throws Exception {
         SchemaVersion originalSchemaVersion = new SchemaVersion(32);
         SchemaVersion newVersion = new SchemaVersion(36);
 
@@ -234,11 +235,12 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.releaseCaller();
-        operation.block();
+        operation.get();
 
         assertThat(dao.getCurrentSchemaVersion().block())
             .contains(newVersion);
@@ -257,12 +259,13 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.awaitCaller();
         barrier.releaseCaller();
-        operation.block();
+        operation.get();
 
         assertThat(dao.getCurrentSchemaVersion().block())
             .contains(newVersion);
@@ -281,13 +284,14 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.awaitCaller();
         barrier.releaseCaller();
 
-        assertThatThrownBy(operation::block)
-            .isInstanceOf(InjectedFailureException.class);
+        assertThatThrownBy(operation::get)
+            .hasCauseInstanceOf(InjectedFailureException.class);
     }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index e2d3b2f..b7b570d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -46,7 +47,6 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class CassandraMailboxDAOTest {
@@ -139,16 +139,18 @@ class CassandraMailboxDAOTest {
             .times(2)
             .whenQueryStartsWith("UPDATE mailbox SET"));
 
-        Mono<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
-        Mono<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
-        readMailbox1.subscribeOn(Schedulers.elastic()).subscribe();
-        readMailbox2.subscribeOn(Schedulers.elastic()).subscribe();
+        CompletableFuture<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
+        CompletableFuture<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
         barrier.awaitCaller();
         barrier.releaseCaller();
 
-        assertThat(readMailbox1.block().getUidValidity())
-            .isEqualTo(readMailbox2.block().getUidValidity());
+        assertThat(readMailbox1.get().getUidValidity())
+            .isEqualTo(readMailbox2.get().getUidValidity());
     }
 
     @Test
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index e3aed85..3128df7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -24,16 +24,16 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.LongStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.Scenario;
 import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.core.Username;
@@ -51,7 +51,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.github.fge.lambdas.Throwing;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class CassandraModSeqProviderTest {
@@ -113,15 +112,12 @@ class CassandraModSeqProviderTest {
                     .times(1)
                     .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"));
 
-        Mono<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
-            .cache();
-        Mono<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+            .toFuture();
+        CompletableFuture<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
-            .cache();
-
-        operation1.subscribe();
-        operation2.subscribe();
+            .toFuture();
 
         insertBarrier.awaitCaller();
         insertBarrier.releaseCaller();
@@ -136,8 +132,8 @@ class CassandraModSeqProviderTest {
 
         retryBarrier.releaseCaller();
 
-        assertThatCode(() -> operation1.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
-        assertThatCode(() -> operation2.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
+        assertThatCode(() -> operation1.get(1, TimeUnit.SECONDS)).doesNotThrowAnyException();
+        assertThatCode(() -> operation2.get(1, TimeUnit.SECONDS)).doesNotThrowAnyException();
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org