You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/08 03:40:29 UTC

[james-project] 04/04: JAMES-3140 Better handle CassandraModSeqRetries

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 91491b393a5e80248d8e4f150d7868dfcf825a46
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 29 15:16:21 2020 +0700

    JAMES-3140 Better handle CassandraModSeqRetries
    
    In an empty modseq, retires did not include the initial row creation, but
    only its update.
    
    (Triggered because "100 mail should be well received" runs way faster
    with the blob store cache.)
---
 .../apache/james/backends/cassandra/Scenario.java  |  8 ++-
 .../backends/cassandra/TestingSessionTest.java     | 23 ++++-----
 .../cassandra/mail/CassandraModSeqProvider.java    | 16 ++----
 .../mail/CassandraModSeqProviderTest.java          | 57 ++++++++++++++++++++++
 4 files changed, 79 insertions(+), 25 deletions(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
index f2f3682..da3758d 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
@@ -33,10 +33,16 @@ import com.datastax.driver.core.Statement;
 import com.google.common.base.Preconditions;
 
 public class Scenario {
+    public static class InjectedFailureException extends RuntimeException {
+        public InjectedFailureException() {
+            super("Injected failure");
+        }
+    }
+
     @FunctionalInterface
     interface Behavior {
         Behavior THROW = (session, statement) -> {
-            throw new RuntimeException("Injected failure");
+            throw new InjectedFailureException();
         };
 
         Behavior EXECUTE_NORMALLY = Session::executeAsync;
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
index 6c8dfc5..54fca6f 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -30,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import org.apache.james.backends.cassandra.Scenario.Barrier;
+import org.apache.james.backends.cassandra.Scenario.InjectedFailureException;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -89,7 +90,7 @@ class TestingSessionTest {
                 .whenQueryStartsWith("SELECT value FROM schemaVersion;"));
 
         assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-            .isInstanceOf(RuntimeException.class);
+            .isInstanceOf(InjectedFailureException.class);
     }
 
     @Test
@@ -102,7 +103,7 @@ class TestingSessionTest {
         assertThatThrownBy(() -> new CassandraAsyncExecutor(cassandra.getConf())
                 .execute(select(VALUE).from(TABLE_NAME))
                 .block())
-            .isInstanceOf(RuntimeException.class);
+            .isInstanceOf(InjectedFailureException.class);
     }
 
     @Test
@@ -113,7 +114,7 @@ class TestingSessionTest {
                 .forAllQueries());
 
         assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-            .isInstanceOf(RuntimeException.class);
+            .isInstanceOf(InjectedFailureException.class);
     }
 
     @Test
@@ -142,9 +143,9 @@ class TestingSessionTest {
 
         SoftAssertions.assertSoftly(softly -> {
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
             assertThatCode(() -> dao.getCurrentSchemaVersion().block())
                 .doesNotThrowAnyException();
         });
@@ -165,7 +166,7 @@ class TestingSessionTest {
             assertThatCode(() -> dao.getCurrentSchemaVersion().block())
                 .doesNotThrowAnyException();
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
             assertThatCode(() -> dao.getCurrentSchemaVersion().block())
                 .doesNotThrowAnyException();
         });
@@ -180,11 +181,11 @@ class TestingSessionTest {
 
         SoftAssertions.assertSoftly(softly -> {
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
             assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-                .isInstanceOf(RuntimeException.class);
+                .isInstanceOf(InjectedFailureException.class);
         });
     }
 
@@ -198,7 +199,7 @@ class TestingSessionTest {
         dao.updateVersion(new SchemaVersion(36)).block();
 
         assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
-            .isInstanceOf(RuntimeException.class);
+            .isInstanceOf(InjectedFailureException.class);
     }
 
     @Test
@@ -287,6 +288,6 @@ class TestingSessionTest {
         barrier.releaseCaller();
 
         assertThatThrownBy(operation::block)
-            .isInstanceOf(RuntimeException.class);
+            .isInstanceOf(InjectedFailureException.class);
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index f3ee258..15f1a2d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -56,7 +56,6 @@ import reactor.util.retry.Retry;
 public class CassandraModSeqProvider implements ModSeqProvider {
 
     public static final String MOD_SEQ_CONDITION = "modSeqCondition";
-    private final long maxModSeqRetries;
 
     public static class ExceptionRelay extends RuntimeException {
         private final MailboxException underlying;
@@ -83,6 +82,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     }
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
+    private final long maxModSeqRetries;
     private final PreparedStatement select;
     private final PreparedStatement update;
     private final PreparedStatement insert;
@@ -178,24 +178,14 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     }
 
     public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
+        Duration firstBackoff = Duration.ofMillis(10);
+
         return findHighestModSeq(mailboxId)
             .flatMap(maybeHighestModSeq -> maybeHighestModSeq
                         .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))
                         .orElseGet(() -> tryInsertModSeq(mailboxId, ModSeq.first())))
-            .switchIfEmpty(handleRetries(mailboxId));
-    }
-
-    private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
-        Duration firstBackoff = Duration.ofMillis(10);
-        return tryFindThenUpdateOnce(mailboxId)
             .single()
             .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic()));
     }
 
-    private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
-        return Mono.defer(() -> findHighestModSeq(mailboxId)
-            .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next))
-            .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)));
-    }
-
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index 6c328a4..e3aed85 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -18,7 +18,13 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn;
+import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -27,6 +33,8 @@ import java.util.stream.LongStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.Scenario;
+import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.ModSeq;
@@ -40,8 +48,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.github.fge.lambdas.Throwing;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 class CassandraModSeqProviderTest {
     private static final CassandraId CASSANDRA_ID = new CassandraId.Factory().fromString("e22b3ac0-a80b-11e7-bb00-777268d65503");
 
@@ -84,6 +96,51 @@ class CassandraModSeqProviderTest {
     }
 
     @Test
+    void failedInsertsShouldBeRetried(CassandraCluster cassandra) throws Exception {
+        Barrier insertBarrier = new Barrier(2);
+        Barrier retryBarrier = new Barrier(1);
+        cassandra.getConf()
+            .registerScenario(
+                executeNormally()
+                    .times(2)
+                    .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"),
+                awaitOn(insertBarrier)
+                    .thenExecuteNormally()
+                    .times(2)
+                    .whenQueryStartsWith("INSERT INTO modseq (nextModseq,mailboxId) VALUES (:nextModseq,:mailboxId) IF NOT EXISTS;"),
+                awaitOn(retryBarrier)
+                    .thenExecuteNormally()
+                    .times(1)
+                    .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"));
+
+        Mono<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+            .subscribeOn(Schedulers.elastic())
+            .cache();
+        Mono<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+            .subscribeOn(Schedulers.elastic())
+            .cache();
+
+        operation1.subscribe();
+        operation2.subscribe();
+
+        insertBarrier.awaitCaller();
+        insertBarrier.releaseCaller();
+
+        retryBarrier.awaitCaller();
+        retryBarrier.releaseCaller();
+
+        // Artificially fail the insert failure
+        cassandra.getConf()
+            .execute(QueryBuilder.delete().from(TABLE_NAME)
+                .where(QueryBuilder.eq(MAILBOX_ID, CASSANDRA_ID.asUuid())));
+
+        retryBarrier.releaseCaller();
+
+        assertThatCode(() -> operation1.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
+        assertThatCode(() -> operation2.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
+    }
+
+    @Test
     void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
         int nbEntries = 10;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org