You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/29 03:09:00 UTC

[james-project] branch master updated (d21c297 -> d1c14f5)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from d21c297  JAMES-3491 Transport independant JMAP API
     new debe5f1  JAMES-3495 Reproduce the null messageId bug
     new eee94f1  JAMES-3495 MessageIdTable: ignore partially deleted rows
     new c16615b  JAMES-3495 MessageIdTable: cleanup asynchronously partially deleted entries
     new 6185038  JAMES-3493 Add logs for start and stop of the JAMES and the JMX servers
     new dd6f816  JAMES-3494 update dependencies
     new 461086b  JAMES-3494 increment scala and refined version
     new 5d7afcb  JAMES-3494 Remove RetryWithAsyncCallback
     new e592c49  JAMES-3494 fix compilation issues
     new 4c8e8ec  JAMES-3494 Stop waiting forever
     new 1b0f3bf  JAMES-3494 remove rabbitmq dependencies to cassandra-guice module
     new d3195ac  JAMES-3494 Fix netty version
     new 14bbab3  JAMES-3494 fix json body of request by quoting mailbox ids
     new d1c14f5  JAMES-3494 fix flacky tests

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/eventsourcing/CommandDispatcher.scala    |  11 +-
 .../cassandra/mail/CassandraMessageIdDAO.java      |  24 +-
 .../cassandra/mail/CassandraMessageIdDAOTest.java  |  25 ++
 pom.xml                                            |   6 +-
 server/blob/blob-s3/pom.xml                        |   4 +-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |  25 +-
 .../deduplication/DeDuplicationBlobStore.scala     |  16 +-
 server/container/guice/cassandra-guice/pom.xml     |   4 -
 .../src/test/resources/rabbitmq.properties         |   2 -
 .../guice/cassandra-rabbitmq-guice/pom.xml         |   4 +
 .../java/org/apache/james/GuiceJamesServer.java    |   2 +
 .../org/apache/james/modules/server/JMXServer.java |   6 +
 .../james/adapter/mailbox/ReIndexerManagement.java |   5 +-
 .../java/reactor/retry/RetryWithAsyncCallback.java | 270 --------------
 .../test/java/reactor/retry/RetryTestUtils.java    | 122 -------
 .../reactor/retry/RetryWithAsyncCallbackTest.java  | 391 ---------------------
 .../methods/integration/SetMessagesMethodTest.java |  28 +-
 server/protocols/jmap-rfc-8621/pom.xml             |   2 +-
 .../apache/james/jmap/method/EmailGetMethod.scala  |   2 +-
 .../jmap/method/EmailSetUpdatePerformer.scala      |   6 +-
 .../jmap/method/EmailSubmissionSetMethod.scala     |   2 +-
 .../james/jmap/method/MailboxGetMethod.scala       |   2 +-
 .../jmap/method/MailboxSetCreatePerformer.scala    |   2 +-
 .../jmap/method/VacationResponseGetMethod.scala    |   2 +-
 .../org/apache/james/jmap/routes/JmapApi.scala     |   2 +-
 server/queue/queue-activemq/pom.xml                |   2 +-
 .../org/apache/james/task/TaskManagerContract.java |   2 +-
 27 files changed, 120 insertions(+), 849 deletions(-)
 delete mode 100644 server/container/guice/cassandra-guice/src/test/resources/rabbitmq.properties
 delete mode 100644 server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
 delete mode 100644 server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
 delete mode 100644 server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 09/13: JAMES-3494 Stop waiting forever

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4c8e8ecd9f911f7bab2e18ae6072b9885f25d489
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jan 26 08:27:16 2021 +0700

    JAMES-3494 Stop waiting forever
---
 .../java/org/apache/james/adapter/mailbox/ReIndexerManagement.java   | 5 +++--
 .../src/test/java/org/apache/james/task/TaskManagerContract.java     | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
index 3cf9675..2612d01 100644
--- a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
+++ b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
@@ -22,6 +22,7 @@ package org.apache.james.adapter.mailbox;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -54,7 +55,7 @@ public class ReIndexerManagement implements ReIndexerManagementMBean {
                      .addContext(MDCBuilder.ACTION, "reIndex")
                      .build()) {
             TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace, Username.of(user), name), RunningOptions.DEFAULT));
-            taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE));
+            taskManager.await(taskId, Duration.of(365, ChronoUnit.DAYS));
         } catch (IOException | TaskManager.ReachedTimeoutException e) {
             throw new RuntimeException(e);
         }
@@ -68,7 +69,7 @@ public class ReIndexerManagement implements ReIndexerManagementMBean {
                      .addContext(MDCBuilder.ACTION, "reIndex")
                      .build()) {
             TaskId taskId = taskManager.submit(reIndexer.reIndex(RunningOptions.DEFAULT));
-            taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE));
+            taskManager.await(taskId, Duration.of(365, ChronoUnit.DAYS));
         } catch (IOException | TaskManager.ReachedTimeoutException e) {
             throw new RuntimeException(e);
         }
diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index 406671f..517c737 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -47,7 +47,7 @@ public interface TaskManagerContract {
         .pollDelay(slowPacedPollInterval)
         .await();
     ConditionFactory awaitAtMostFiveSeconds = calmlyAwait.atMost(FIVE_SECONDS);
-    java.time.Duration TIMEOUT = java.time.Duration.ofMillis(Long.MAX_VALUE);
+    java.time.Duration TIMEOUT = java.time.Duration.ofMinutes(15);
 
     TaskManager taskManager();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 11/13: JAMES-3494 Fix netty version

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d3195ac0104af8dc15ed8ce826d2c796be509bfd
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue Jan 26 16:49:09 2021 +0100

    JAMES-3494 Fix netty version
    
    The conflict between version from reactor-netty was causing JamesServerJmap to not handle request
    in projects depending on activeMQ
---
 server/blob/blob-s3/pom.xml         | 4 ++--
 server/queue/queue-activemq/pom.xml | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/server/blob/blob-s3/pom.xml b/server/blob/blob-s3/pom.xml
index 00ffb49..4bc87ab 100644
--- a/server/blob/blob-s3/pom.xml
+++ b/server/blob/blob-s3/pom.xml
@@ -76,12 +76,12 @@
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-codec</artifactId>
-            <version>4.1.46.Final</version>
+            <version>4.1.56.Final</version>
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-handler</artifactId>
-            <version>4.1.46.Final</version>
+            <version>4.1.56.Final</version>
         </dependency>
         <dependency>
             <groupId>io.projectreactor.addons</groupId>
diff --git a/server/queue/queue-activemq/pom.xml b/server/queue/queue-activemq/pom.xml
index a3c1a0c..8cd7f4b 100644
--- a/server/queue/queue-activemq/pom.xml
+++ b/server/queue/queue-activemq/pom.xml
@@ -38,7 +38,7 @@
             <?SORTPOM IGNORE?>
             <groupId>io.netty</groupId>
             <artifactId>netty-transport</artifactId>
-            <version>4.1.48.Final</version>
+            <version>4.1.56.Final</version>
             <?SORTPOM RESUME?>
         </dependency>
         <dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/13: JAMES-3494 Remove RetryWithAsyncCallback

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5d7afcb8debc4f1d00b444be0cab05a647f0b824
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:07:55 2021 +0100

    JAMES-3494 Remove RetryWithAsyncCallback
    
    Now that reactor is updated we have retryWhenAsync
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |  25 +-
 .../java/reactor/retry/RetryWithAsyncCallback.java | 270 --------------
 .../test/java/reactor/retry/RetryTestUtils.java    | 122 -------
 .../reactor/retry/RetryWithAsyncCallbackTest.java  | 391 ---------------------
 4 files changed, 15 insertions(+), 793 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index fa00d0c..37f2810 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -55,8 +55,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.pool.InstrumentedPool;
 import reactor.pool.PoolBuilder;
-import reactor.retry.Retry;
-import reactor.retry.RetryWithAsyncCallback;
+import reactor.util.retry.RetryBackoffSpec;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.BytesWrapper;
@@ -250,14 +249,20 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .then();
     }
 
-    private Retry<Object> createBucketOnRetry(BucketName bucketName) {
-        return RetryWithAsyncCallback.onlyIf(retryContext -> retryContext.exception() instanceof NoSuchBucketException)
-            .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
-            .withBackoffScheduler(Schedulers.elastic())
-            .retryMax(MAX_RETRIES)
-            .onRetryWithMono(retryContext -> clientPool.withPoolable(client -> Mono
-                .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
-                .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())).next());
+    private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
+        return RetryBackoffSpec.backoff(MAX_RETRIES, FIRST_BACK_OFF)
+            .maxAttempts(MAX_RETRIES)
+            .doBeforeRetryAsync(retrySignal -> {
+                if (retrySignal.failure() instanceof NoSuchBucketException) {
+                    return clientPool.withPoolable(client -> Mono
+                        .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
+                        .next()
+                        .then();
+                } else {
+                    return Mono.error(retrySignal.failure());
+                }
+            });
     }
 
     @Override
diff --git a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java b/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
deleted file mode 100644
index 33220bc..0000000
--- a/server/container/util/src/main/java/reactor/retry/RetryWithAsyncCallback.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.util.Logger;
-import reactor.util.Loggers;
-
-/**
- * This class is a copy of reactor.retry.DefaultRetry.
- * Its goal is to provide a way to execute an async action before retrying.
- * To do so it provides a retryWithMono method which is the async equivalent of the synchronous method doOnRetry.
- *
- * This is a temporary solution as this new requirement has been exposed in an issue in the reactor project.
- * see : https://github.com/reactor/reactor-addons/issues/220
- *
- */
-public class RetryWithAsyncCallback<T> extends AbstractRetry<T, Throwable> implements Retry<T> {
-
-    static final Logger log = Loggers.getLogger(RetryWithAsyncCallback.class);
-    static final Consumer<? super RetryContext<?>> NOOP_ON_RETRY = r -> { };
-    static final Function<? super RetryContext<?>, Mono<?>> NOOP_ON_RETRY_MONO = r -> Mono.empty();
-
-    /**
-     * Returns a retry function that retries any exception, once.
-     * More constraints may be added using {@link #retryMax(long)} or {@link #timeout(Duration)}.
-     *
-     * @return retry function that retries on any exception
-     */
-    public static <T> RetryWithAsyncCallback<T> any() {
-        return RetryWithAsyncCallback.<T>create(context -> true);
-    }
-
-    /**
-     * Returns a retry function that retries errors resulting from any of the
-     * specified exceptions, once.
-     * More constraints may be added using {@link #retryMax(long)}
-     * or {@link #timeout(Duration)}.
-     *
-     * @param retriableExceptions Exceptions that may be retried
-     * @return retry function that retries indefinitely, only for specified exceptions
-     */
-    @SafeVarargs
-    public static <T> RetryWithAsyncCallback<T> anyOf(Class<? extends Throwable>... retriableExceptions) {
-        Predicate<? super RetryContext<T>> predicate = context -> {
-            Throwable exception = context.exception();
-            if (exception == null) {
-                return true;
-            }
-            for (Class<? extends Throwable> clazz : retriableExceptions) {
-                if (clazz.isInstance(exception)) {
-                    return true;
-                }
-            }
-            return false;
-        };
-        return RetryWithAsyncCallback.<T>create(predicate);
-    }
-
-    /**
-     * Returns a retry function that retries errors resulting from all exceptions except
-     * the specified non-retriable exceptions, once.
-     * More constraints may be added using
-     * {@link #retryMax(long)} or {@link #timeout(Duration)}.
-     *
-     * @param nonRetriableExceptions exceptions that may not be retried
-     * @return retry function that retries all exceptions except the specified non-retriable exceptions.
-     */
-    @SafeVarargs
-    public static <T> RetryWithAsyncCallback<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
-        Predicate<? super RetryContext<T>> predicate = context -> {
-            Throwable exception = context.exception();
-            if (exception == null) {
-                return true;
-            }
-            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
-                if (clazz.isInstance(exception)) {
-                    return false;
-                }
-            }
-            return true;
-        };
-        return RetryWithAsyncCallback.<T>create(predicate);
-    }
-
-    /**
-     * Retry function that retries only if the predicate returns true, with no limit to
-     * the number of attempts.
-     * @param predicate Predicate that determines if next retry is performed
-     * @return Retry function with predicate
-     */
-    public static <T> RetryWithAsyncCallback<T> onlyIf(Predicate<? super RetryContext<T>> predicate) {
-        return RetryWithAsyncCallback.create(predicate).retryMax(Long.MAX_VALUE);
-    }
-
-    public static <T> RetryWithAsyncCallback<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
-        return new RetryWithAsyncCallback<T>(retryPredicate,
-            Long.MAX_VALUE,
-            null,
-            Backoff.zero(),
-            Jitter.noJitter(),
-            null,
-            NOOP_ON_RETRY,
-            NOOP_ON_RETRY_MONO,
-            (T) null);
-    }
-
-    final Predicate<? super RetryContext<T>> retryPredicate;
-    final Consumer<? super RetryContext<T>> onRetry;
-    final Function<? super RetryContext<T>, Mono<?>> onRetryMono;
-
-    RetryWithAsyncCallback(Predicate<? super RetryContext<T>> retryPredicate,
-                           long maxIterations,
-                           Duration timeout,
-                           Backoff backoff,
-                           Jitter jitter,
-                           Scheduler backoffScheduler,
-                           final Consumer<? super RetryContext<T>> onRetry,
-                           Function<? super RetryContext<T>, Mono<?>> onRetryMono,
-                           T applicationContext) {
-        super(maxIterations, timeout, backoff, jitter, backoffScheduler, applicationContext);
-        this.retryPredicate = retryPredicate;
-        this.onRetry = onRetry;
-        this.onRetryMono = onRetryMono;
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> fixedBackoff(Duration backoffInterval) {
-        return backoff(Backoff.fixed(backoffInterval));
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> noBackoff() {
-        return backoff(Backoff.zero());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> exponentialBackoff(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false));
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> exponentialBackoffWithJitter(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 2, false)).jitter(Jitter.random());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> randomBackoff(Duration firstBackoff, Duration maxBackoff) {
-        return backoff(Backoff.exponential(firstBackoff, maxBackoff, 3, true)).jitter(Jitter.random());
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> withApplicationContext(T applicationContext) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> doOnRetry(Consumer<? super RetryContext<T>> onRetry) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    public RetryWithAsyncCallback<T> onRetryWithMono(Function<? super RetryContext<T>, Mono<?>> onRetryMono) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> retryOnce() {
-        return retryMax(1);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> retryMax(long maxIterations) {
-        if (maxIterations < 0) {
-            throw new IllegalArgumentException("maxIterations should be >= 0");
-        }
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> timeout(Duration timeout) {
-        if (timeout.isNegative()) {
-            throw new IllegalArgumentException("timeout should be >= 0");
-        }
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> backoff(Backoff backoff) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> jitter(Jitter jitter) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, backoffScheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public RetryWithAsyncCallback<T> withBackoffScheduler(Scheduler scheduler) {
-        return new RetryWithAsyncCallback<>(retryPredicate, maxIterations, timeout,
-            backoff, jitter, scheduler, onRetry, onRetryMono, applicationContext);
-    }
-
-    @Override
-    public Publisher<Long> apply(Flux<Throwable> errors) {
-        Instant timeoutInstant = calculateTimeout();
-        DefaultContext<T> context = new DefaultContext<>(applicationContext, 0L, null, null);
-        return errors.index()
-            .concatMap(tuple -> retry(tuple.getT2(), tuple.getT1() + 1L, timeoutInstant, context));
-    }
-
-    Publisher<Long> retry(Throwable e, long iteration, Instant timeoutInstant, DefaultContext<T> context) {
-        DefaultContext<T> tmpContext = new DefaultContext<>(applicationContext, iteration, context.lastBackoff, e);
-        BackoffDelay nextBackoff = calculateBackoff(tmpContext, timeoutInstant);
-        DefaultContext<T> retryContext = new DefaultContext<T>(applicationContext, iteration, nextBackoff, e);
-        context.lastBackoff = nextBackoff;
-
-        if (!retryPredicate.test(retryContext)) {
-            log.debug("Stopping retries since predicate returned false, retry context: {}", retryContext);
-            return Mono.error(e);
-        } else if (nextBackoff == RETRY_EXHAUSTED) {
-            log.debug("Retries exhausted, retry context: {}", retryContext);
-            return Mono.error(new RetryExhaustedException(e));
-        } else {
-            log.debug("Scheduling retry attempt, retry context: {}", retryContext);
-            onRetry.accept(retryContext);
-            return onRetryMono.apply(retryContext)
-                .then(Mono.from(retryMono(nextBackoff.delay())));
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "Retry{max=" + this.maxIterations + ",backoff=" + backoff + ",jitter=" +
-            jitter + "}";
-    }
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java b/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
deleted file mode 100644
index 075dcd9..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryTestUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Flux;
-
-public class RetryTestUtils {
-
-	static void assertDelays(Queue<? extends IterationContext<?>> retries, Long... delayMs) {
-		assertEquals(delayMs.length, retries.size());
-		int index = 0;
-		for (Iterator<? extends IterationContext<?>> it = retries.iterator(); it.hasNext(); ) {
-			IterationContext<?> repeatContext = it.next();
-			assertEquals(delayMs[index].longValue(), repeatContext.backoff().toMillis());
-			index++;
-		}
-	}
-
-	static void assertRandomDelays(Queue<? extends IterationContext<?>> retries, int firstMs, int maxMs) {
-		long prevMs = 0;
-		int randomValues = 0;
-		for (IterationContext<?> context : retries) {
-			long backoffMs = context.backoff().toMillis();
-			assertTrue("Unexpected delay " + backoffMs, backoffMs >= firstMs && backoffMs <= maxMs);
-			if (backoffMs != firstMs && backoffMs != prevMs)
-				randomValues++;
-			prevMs = backoffMs;
-		}
-		assertTrue("Delays not random", randomValues >= 2); // Allow for at most one edge case.
-	}
-
-	static <T> void testReuseInParallel(int threads, int iterations,
-			Function<Backoff, Function<Flux<T>, Publisher<Long>>> retryOrRepeat,
-			Consumer<Function<Flux<T>, Publisher<Long>>> testTask) throws Exception {
-		int repeatCount = iterations - 1;
-		AtomicInteger nextBackoff = new AtomicInteger();
-		// Keep track of the number of backoff invocations per instance
-		ConcurrentHashMap<Long, Integer> backoffCounts = new ConcurrentHashMap<>();
-		// Use a countdown latch to get all instances to stop in the first backoff callback
-		CountDownLatch latch = new CountDownLatch(threads);
-		Backoff customBackoff = context -> {
-			Duration backoff = context.backoff();
-			if (latch.getCount() > 0) {
-				assertNull("Wrong context, backoff must be null", backoff);
-				backoff = Duration.ofMillis(nextBackoff.incrementAndGet());
-				backoffCounts.put(backoff.toMillis(), 1);
-				latch.countDown();
-				try {
-					latch.await(10, TimeUnit.SECONDS);
-				}
-				catch (Exception e) {
-					// ignore, errors are handled later
-				}
-			} else {
-				assertNotNull("Wrong context, backoff must not be null", backoff);
-				long index = backoff.toMillis();
-				backoffCounts.put(index, backoffCounts.get(index) + 1);
-			}
-			return new BackoffDelay(backoff);
-		};
-		Function<Flux<T>, Publisher<Long>> retryFunc = retryOrRepeat.apply(customBackoff);
-		ExecutorService executor = Executors.newFixedThreadPool(threads);
-		List<Future<?>> futures = new ArrayList<>();
-		try {
-			for (int i = 0; i < threads; i++) {
-				Runnable runnable = () -> testTask.accept(retryFunc);
-				futures.add(executor.submit(runnable));
-			}
-			for (Future<?> future : futures)
-				future.get(5, TimeUnit.SECONDS);
-		}
-		finally {
-			executor.shutdownNow();
-		}
-
-		assertEquals(0, latch.getCount());
-		assertEquals(threads, backoffCounts.size());
-		for (Integer count : backoffCounts.values()) {
-			//backoff not invoked anymore when maxIteration reached
-			assertEquals(repeatCount, count.intValue());
-		}
-	}
-}
diff --git a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java b/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
deleted file mode 100644
index 2f2aad5..0000000
--- a/server/container/util/src/test/java/reactor/retry/RetryWithAsyncCallbackTest.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package reactor.retry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.function.Consumer;
-
-import org.junit.Test;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.test.StepVerifier;
-
-public class RetryWithAsyncCallbackTest {
-
-    private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
-
-    @Test
-    public void shouldTimeoutRetryWithVirtualTime() {
-        // given
-        final int minBackoff = 1;
-        final int maxBackoff = 5;
-        final int timeout = 10;
-
-        // then
-        StepVerifier.withVirtualTime(() ->
-            Mono.<String>error(new RuntimeException("Something went wrong"))
-                .retryWhen(RetryWithAsyncCallback.anyOf(Exception.class)
-                    .exponentialBackoffWithJitter(Duration.ofSeconds(minBackoff), Duration.ofSeconds(maxBackoff))
-                    .timeout(Duration.ofSeconds(timeout)))
-                .subscribeOn(Schedulers.elastic()))
-            .expectSubscription()
-//				.expectNoEvent(Duration.ofSeconds(timeout))
-            .thenAwait(Duration.ofSeconds(timeout))
-            .expectError(RetryExhaustedException.class)
-            .verify(Duration.ofSeconds(timeout));
-    }
-
-    @Test
-    public void fluxRetryNoBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 0L, 0L);
-    }
-
-    @Test
-    public void monoRetryNoBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(onRetry()));
-
-        StepVerifier.create(mono)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 0L, 0L);
-    }
-
-    @Test
-    public void fluxRetryFixedBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> flux)
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(300))
-            .thenAwait(Duration.ofMillis(300))
-            .expectNext(0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class);
-        RetryTestUtils.assertDelays(retries, 500L);
-    }
-
-    @Test
-    public void monoRetryFixedBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any().fixedBackoff(Duration.ofMillis(500)).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .expectNoEvent(Duration.ofMillis(300))
-            .thenAwait(Duration.ofMillis(300))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class);
-        RetryTestUtils.assertDelays(retries, 500L);
-    }
-
-
-    @Test
-    public void fluxRetryExponentialBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any()
-                .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
-                .timeout(Duration.ofMillis(1500))
-                .doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(50))  // delay=100
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(150)) // delay=200
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(250)) // delay=400
-            .expectNext(0, 1)
-            .expectNoEvent(Duration.ofMillis(450)) // delay=500
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
-    }
-    @Test
-    public void monoRetryExponentialBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any()
-                .exponentialBackoff(Duration.ofMillis(100), Duration.ofMillis(500))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .thenAwait(Duration.ofMillis(100))
-            .thenAwait(Duration.ofMillis(200))
-            .thenAwait(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(500))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertDelays(retries, 100L, 200L, 400L, 500L);
-    }
-
-    @Test
-    public void fluxRetryRandomBackoff() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException()))
-            .retryWhen(RetryWithAsyncCallback.any()
-                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1, 0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, IOException.class));
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
-    }
-
-    @Test
-    public void monoRetryRandomBackoff() {
-        Mono<?> mono = Mono.error(new IOException())
-            .retryWhen(RetryWithAsyncCallback.any()
-                .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(2000))
-                .retryMax(4)
-                .doOnRetry(onRetry()));
-
-        StepVerifier.withVirtualTime(() -> mono)
-            .expectSubscription()
-            .thenAwait(Duration.ofMillis(100))
-            .thenAwait(Duration.ofMillis(2000))
-            .thenAwait(Duration.ofMillis(2000))
-            .thenAwait(Duration.ofMillis(2000))
-            .verifyError(RetryExhaustedException.class);
-
-        assertRetries(IOException.class, IOException.class, IOException.class, IOException.class);
-        RetryTestUtils.assertRandomDelays(retries, 100, 2000);
-    }
-
-
-    @Test
-    public void fluxRetriableExceptions() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
-            .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        Flux<Integer> nonRetriable = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException()))
-            .retryWhen(RetryWithAsyncCallback.anyOf(IOException.class).retryOnce().doOnRetry(onRetry()));
-        StepVerifier.create(nonRetriable)
-            .expectNext(0, 1)
-            .verifyError(RuntimeException.class);
-
-    }
-
-    @Test
-    public void fluxNonRetriableExceptions() {
-
-        Retry<?> retry = RetryWithAsyncCallback.allBut(RuntimeException.class).retryOnce().doOnRetry(onRetry());
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IllegalStateException())).retryWhen(retry);
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1)
-            .verifyError(IllegalStateException.class);
-
-
-        Flux<Integer> retriable = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
-        StepVerifier.create(retriable)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void fluxRetryAnyException() {
-        Retry<?> retry = RetryWithAsyncCallback.any().retryOnce().doOnRetry(onRetry());
-
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException())).retryWhen(retry);
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        Flux<Integer> flux2 = Flux.concat(Flux.range(0, 2), Flux.error(new RuntimeException())).retryWhen(retry);
-        StepVerifier.create(flux2)
-            .expectNext(0, 1, 0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
-    }
-
-    @Test
-    public void fluxRetryOnPredicate() {
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new SocketException()))
-            .retryWhen(RetryWithAsyncCallback.onlyIf(context -> context.iteration() < 3).doOnRetry(onRetry()));
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(SocketException.class);
-    }
-
-
-    @Test
-    public void doOnRetry() {
-        Semaphore semaphore = new Semaphore(0);
-        Retry<?> retry = RetryWithAsyncCallback.any()
-            .retryOnce()
-            .fixedBackoff(Duration.ofMillis(500))
-            .doOnRetry(context -> semaphore.release());
-
-        StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
-            .expectNext(0, 1)
-            .then(semaphore::acquireUninterruptibly)
-            .expectNoEvent(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(200))
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
-            .then(semaphore::acquireUninterruptibly)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void onRetryWithMono() {
-        Semaphore semaphore = new Semaphore(0);
-        Retry<?> retry = RetryWithAsyncCallback.any()
-            .retryOnce()
-            .fixedBackoff(Duration.ofMillis(500))
-            .onRetryWithMono(context -> Mono.fromCallable(() -> { semaphore.release(); return 0; }));
-
-        StepVerifier.withVirtualTime(() -> Flux.range(0, 2).concatWith(Mono.error(new SocketException())).retryWhen(retry))
-            .expectNext(0, 1)
-            .then(semaphore::acquireUninterruptibly)
-            .expectNoEvent(Duration.ofMillis(400))
-            .thenAwait(Duration.ofMillis(200))
-            .expectNext(0, 1)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new SocketException()).retryWhen(retry.noBackoff()))
-            .then(semaphore::acquireUninterruptibly)
-            .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class));
-    }
-
-    @Test
-    public void retryApplicationContext() {
-        class AppContext {
-            boolean needsRollback;
-            void rollback() {
-                needsRollback = false;
-            }
-            void run() {
-                assertFalse("Rollback not performed", needsRollback);
-                needsRollback = true;
-            }
-        }
-        AppContext appContext = new AppContext();
-        Retry<?> retry = RetryWithAsyncCallback.<AppContext>any().withApplicationContext(appContext)
-            .retryMax(2)
-            .doOnRetry(context -> {
-                AppContext ac = context.applicationContext();
-                assertNotNull("Application context not propagated", ac);
-                ac.rollback();
-            });
-
-        StepVerifier.withVirtualTime(() -> Mono.error(new RuntimeException()).doOnNext(i -> appContext.run()).retryWhen(retry))
-            .verifyErrorMatches(e -> isRetryExhausted(e, RuntimeException.class));
-
-    }
-
-    @Test
-    public void fluxRetryCompose() {
-        Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
-        Flux<Integer> flux = Flux.concat(Flux.range(0, 2), Flux.error(new IOException())).as(retry::apply);
-
-        StepVerifier.create(flux)
-            .expectNext(0, 1, 0, 1, 0, 1)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-    }
-
-    @Test
-    public void monoRetryCompose() {
-        Retry<?> retry = RetryWithAsyncCallback.any().noBackoff().retryMax(2).doOnRetry(this.onRetry());
-        Flux<?> flux = Mono.error(new IOException()).as(retry::apply);
-
-        StepVerifier.create(flux)
-            .verifyError(RetryExhaustedException.class);
-        assertRetries(IOException.class, IOException.class);
-    }
-
-    @Test
-    public void functionReuseInParallel() throws Exception {
-        int retryCount = 19;
-        int range = 100;
-        Integer[] values = new Integer[(retryCount + 1) * range];
-        for (int i = 0; i <= retryCount; i++) {
-            for (int j = 1; j <= range; j++)
-                values[i * range + j - 1] = j;
-        }
-        RetryTestUtils.testReuseInParallel(2, 20,
-            backoff -> RetryWithAsyncCallback.<Integer>any().retryMax(19).backoff(backoff),
-            retryFunc -> StepVerifier.create(Flux.range(1, range).concatWith(Mono.error(new SocketException())).retryWhen(retryFunc))
-                .expectNext(values)
-                .verifyErrorMatches(e -> isRetryExhausted(e, SocketException.class)));
-    }
-
-    Consumer<? super RetryContext<?>> onRetry() {
-        return context -> retries.add(context);
-    }
-
-    @SafeVarargs
-    private final void assertRetries(Class<? extends Throwable>... exceptions) {
-        assertEquals(exceptions.length, retries.size());
-        int index = 0;
-        for (Iterator<RetryContext<?>> it = retries.iterator(); it.hasNext(); ) {
-            RetryContext<?> retryContext = it.next();
-            assertEquals(index + 1, retryContext.iteration());
-            assertEquals(exceptions[index], retryContext.exception().getClass());
-            index++;
-        }
-    }
-
-    static boolean isRetryExhausted(Throwable e, Class<? extends Throwable> cause) {
-        return e instanceof RetryExhaustedException && cause.isInstance(e.getCause());
-    }
-
-    @Test
-    public void retryToString() {
-        System.out.println(RetryWithAsyncCallback.any().noBackoff().retryMax(2).toString());
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 13/13: JAMES-3494 fix flacky tests

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d1c14f5344746c20b16baaefd3b049c44d4e7f0d
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jan 27 10:16:32 2021 +0100

    JAMES-3494 fix flacky tests
---
 .../james/jmap/draft/methods/integration/SetMessagesMethodTest.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
index e898c0d..0c95481 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
@@ -356,7 +356,7 @@ public abstract class SetMessagesMethodTest {
             .body(NAME, equalTo("messagesSet"))
             .body(ARGUMENTS + ".destroyed", hasSize(2))
             .body(ARGUMENTS + ".notDestroyed", aMapWithSize(1))
-            .body(ARGUMENTS + ".destroyed", contains(message1.getMessageId().serialize(), message3.getMessageId().serialize()))
+            .body(ARGUMENTS + ".destroyed", containsInAnyOrder(message1.getMessageId().serialize(), message3.getMessageId().serialize()))
             .body(ARGUMENTS + ".notDestroyed", hasEntry(equalTo(missingMessageId), Matchers.allOf(
                 hasEntry("type", "notFound"),
                 hasEntry("description", "The message " + missingMessageId + " can't be found")))
@@ -4050,7 +4050,7 @@ public abstract class SetMessagesMethodTest {
             .body(ARGUMENTS + ".notCreated", hasKey(messageCreationId))
             .body(notCreatedPath + ".type", equalTo("invalidProperties"))
             .body(notCreatedPath + ".properties", contains("attachments"))
-            .body(notCreatedPath + ".attachmentsNotFound", contains("brokenId1", "brokenId2"))
+            .body(notCreatedPath + ".attachmentsNotFound", containsInAnyOrder("brokenId1", "brokenId2"))
             .body(ARGUMENTS + ".created", aMapWithSize(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 10/13: JAMES-3494 remove rabbitmq dependencies to cassandra-guice module

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1b0f3bf89a8fa0f690cc93639cd09e621fc6556a
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue Jan 26 15:52:23 2021 +0100

    JAMES-3494 remove rabbitmq dependencies to cassandra-guice module
---
 server/container/guice/cassandra-guice/pom.xml                        | 4 ----
 .../guice/cassandra-guice/src/test/resources/rabbitmq.properties      | 2 --
 server/container/guice/cassandra-rabbitmq-guice/pom.xml               | 4 ++++
 3 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/server/container/guice/cassandra-guice/pom.xml b/server/container/guice/cassandra-guice/pom.xml
index b0f1fa2..28ee945 100644
--- a/server/container/guice/cassandra-guice/pom.xml
+++ b/server/container/guice/cassandra-guice/pom.xml
@@ -296,10 +296,6 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>queue-rabbitmq-guice</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>${james.groupId}</groupId>
             <artifactId>quota-mailing-cassandra</artifactId>
         </dependency>
         <dependency>
diff --git a/server/container/guice/cassandra-guice/src/test/resources/rabbitmq.properties b/server/container/guice/cassandra-guice/src/test/resources/rabbitmq.properties
deleted file mode 100644
index 25d0dd6..0000000
--- a/server/container/guice/cassandra-guice/src/test/resources/rabbitmq.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-uri=amqp://james:james@rabbitmq_host:5672
-management.uri=http://james:james@rabbitmq_host:15672/api/
\ No newline at end of file
diff --git a/server/container/guice/cassandra-rabbitmq-guice/pom.xml b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
index fa427f1..a75b05e 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/pom.xml
+++ b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
@@ -165,6 +165,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>queue-rabbitmq-guice</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/13: JAMES-3495 MessageIdTable: ignore partially deleted rows

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit eee94f10671580ceda427c2a48ff5d90fa271b8c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jan 27 20:30:05 2021 +0700

    JAMES-3495 MessageIdTable: ignore partially deleted rows
---
 .../mailbox/cassandra/mail/CassandraMessageIdDAO.java | 19 +++++++++++++------
 .../cassandra/mail/CassandraMessageIdDAOTest.java     |  2 --
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 6d7f6bc..567ee53 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.cassandra.table.Flag.SEEN;
 import static org.apache.james.mailbox.cassandra.table.Flag.USER;
 import static org.apache.james.mailbox.cassandra.table.Flag.USER_FLAGS;
 import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SEQ;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.util.Optional;
 
@@ -256,7 +257,6 @@ public class CassandraMessageIdDAO {
     private Mono<Optional<ComposedMessageIdWithMetaData>> asOptionalOfCassandraMessageId(Mono<Row> row) {
         return row
                 .map(this::fromRowToComposedMessageIdWithFlags)
-                .map(Optional::of)
                 .defaultIfEmpty(Optional.empty());
     }
 
@@ -268,7 +268,8 @@ public class CassandraMessageIdDAO {
 
     public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId, MessageRange set, Limit limit) {
         return retrieveRows(mailboxId, set, limit)
-            .map(this::fromRowToComposedMessageIdWithFlags);
+            .map(this::fromRowToComposedMessageIdWithFlags)
+            .handle(publishIfPresent());
     }
 
     public Flux<MessageUid> listUids(CassandraId mailboxId) {
@@ -279,7 +280,8 @@ public class CassandraMessageIdDAO {
 
     public Flux<ComposedMessageIdWithMetaData> retrieveAllMessages() {
         return cassandraAsyncExecutor.executeRows(listStatement.bind())
-            .map(this::fromRowToComposedMessageIdWithFlags);
+            .map(this::fromRowToComposedMessageIdWithFlags)
+            .handle(publishIfPresent());
     }
 
     private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set, Limit limit) {
@@ -329,14 +331,19 @@ public class CassandraMessageIdDAO {
                 .setLong(IMAP_UID_LTE, to.asLong())));
     }
 
-    private ComposedMessageIdWithMetaData fromRowToComposedMessageIdWithFlags(Row row) {
-        return ComposedMessageIdWithMetaData.builder()
+    private Optional<ComposedMessageIdWithMetaData> fromRowToComposedMessageIdWithFlags(Row row) {
+        if (row.getUUID(MESSAGE_ID) == null) {
+            // Out of order updates with concurrent deletes can result in the row being partially deleted
+            // We filter out such records
+            return Optional.empty();
+        }
+        return Optional.of(ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(
                         CassandraId.of(row.getUUID(MAILBOX_ID)),
                         messageIdFactory.of(row.getUUID(MESSAGE_ID)),
                         MessageUid.of(row.getLong(IMAP_UID))))
                 .flags(FlagsExtractor.getFlags(row))
                 .modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
-                .build();
+                .build());
     }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index d72d2c2..2ae675e 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -39,7 +39,6 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -86,7 +85,6 @@ class CassandraMessageIdDAOTest {
         assertThat(message.isPresent()).isFalse();
     }
 
-    @Disabled("A record with a 'null' messageId is returned")
     @Test
     void outOfOrderUpdatesShouldBeIgnored() {
         CassandraId mailboxId = CassandraId.timeBased();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/13: JAMES-3493 Add logs for start and stop of the JAMES and the JMX servers

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 61850385e475fa75f948001a6ba361ed30edc859
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Jan 27 10:18:54 2021 +0700

    JAMES-3493 Add logs for start and stop of the JAMES and the JMX servers
---
 .../src/main/java/org/apache/james/GuiceJamesServer.java            | 2 ++
 .../src/main/java/org/apache/james/modules/server/JMXServer.java    | 6 ++++++
 2 files changed, 8 insertions(+)

diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/GuiceJamesServer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/GuiceJamesServer.java
index f149796..36ebe4a 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/GuiceJamesServer.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/GuiceJamesServer.java
@@ -93,6 +93,7 @@ public class GuiceJamesServer {
             injector.getInstance(InitializationOperations.class).initModules();
             guiceProbeProvider = injector.getInstance(GuiceProbeProvider.class);
             isStartedProbe.notifyStarted();
+            LOGGER.info("JAMES server started");
         } catch (Throwable e) {
             LOGGER.error("Fatal error while starting James", e);
             throw e;
@@ -104,6 +105,7 @@ public class GuiceJamesServer {
         if (preDestroy != null) {
             preDestroy.stage();
         }
+        LOGGER.info("JAMES server stopped");
     }
 
     public boolean isStarted() {
diff --git a/server/container/guice/jmx/src/main/java/org/apache/james/modules/server/JMXServer.java b/server/container/guice/jmx/src/main/java/org/apache/james/modules/server/JMXServer.java
index aa51ec2..e1ffb3c 100644
--- a/server/container/guice/jmx/src/main/java/org/apache/james/modules/server/JMXServer.java
+++ b/server/container/guice/jmx/src/main/java/org/apache/james/modules/server/JMXServer.java
@@ -36,11 +36,15 @@ import javax.management.remote.JMXServiceURL;
 
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.util.RestrictingRMISocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableMap;
 
 public class JMXServer implements Startable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMXServer.class);
+
     private final JmxConfiguration jmxConfiguration;
     private final Set<String> registeredKeys;
     private final Object lock;
@@ -101,6 +105,7 @@ public class JMXServer implements Startable {
                 ManagementFactory.getPlatformMBeanServer());
 
             jmxConnectorServer.start();
+            LOGGER.info("JMX server started");
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -115,6 +120,7 @@ public class JMXServer implements Startable {
             restrictingRMISocketFactory.getSockets()
                 .forEach(Throwing.consumer(ServerSocket::close)
                     .sneakyThrow());
+            LOGGER.info("JMX server stopped");
         } catch (Exception e) {
             throw new RuntimeException(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/13: JAMES-3495 MessageIdTable: cleanup asynchronously partially deleted entries

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c16615b512e2969a34c739c73f6214c77b243c54
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jan 27 22:11:00 2021 +0700

    JAMES-3495 MessageIdTable: cleanup asynchronously partially deleted entries
---
 .../apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 567ee53..d5ccb2c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -68,6 +68,7 @@ import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdDAO {
 
@@ -334,7 +335,11 @@ public class CassandraMessageIdDAO {
     private Optional<ComposedMessageIdWithMetaData> fromRowToComposedMessageIdWithFlags(Row row) {
         if (row.getUUID(MESSAGE_ID) == null) {
             // Out of order updates with concurrent deletes can result in the row being partially deleted
-            // We filter out such records
+            // We filter out such records, and cleanup them.
+            delete(CassandraId.of(row.getUUID(MAILBOX_ID)),
+                MessageUid.of(row.getLong(IMAP_UID)))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
             return Optional.empty();
         }
         return Optional.of(ComposedMessageIdWithMetaData.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/13: JAMES-3494 increment scala and refined version

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 461086b8802533fe38d9f5afe4a693539cefc024
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:57:32 2021 +0100

    JAMES-3494 increment scala and refined version
---
 pom.xml                                | 2 +-
 server/protocols/jmap-rfc-8621/pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index ea61d1e..dbb7bc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -646,7 +646,7 @@
         <bouncycastle.version>1.68</bouncycastle.version>
 
         <scala.base>2.13</scala.base>
-        <scala.version>${scala.base}.1</scala.version>
+        <scala.version>${scala.base}.4</scala.version>
         <scalatestplus-play.version>5.0.0</scalatestplus-play.version>
         <doclint>none</doclint>
     </properties>
diff --git a/server/protocols/jmap-rfc-8621/pom.xml b/server/protocols/jmap-rfc-8621/pom.xml
index 5d0a424..c8ae1da 100644
--- a/server/protocols/jmap-rfc-8621/pom.xml
+++ b/server/protocols/jmap-rfc-8621/pom.xml
@@ -119,7 +119,7 @@
         <dependency>
             <groupId>eu.timepit</groupId>
             <artifactId>refined_${scala.base}</artifactId>
-            <version>0.9.13</version>
+            <version>0.9.20</version>
         </dependency>
         <dependency>
             <groupId>io.projectreactor</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/13: JAMES-3495 Reproduce the null messageId bug

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit debe5f18cf12502b994750f19db95ca22fc383ae
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jan 27 20:24:45 2021 +0700

    JAMES-3495 Reproduce the null messageId bug
    
    This likely occurs due to the EXPUNGE model used by IMAP (Update the message
    to mark it as deleted then remove it)
---
 .../cassandra/mail/CassandraMessageIdDAOTest.java  | 27 ++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index e76637e..d72d2c2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -85,6 +86,32 @@ class CassandraMessageIdDAOTest {
         assertThat(message.isPresent()).isFalse();
     }
 
+    @Disabled("A record with a 'null' messageId is returned")
+    @Test
+    void outOfOrderUpdatesShouldBeIgnored() {
+        CassandraId mailboxId = CassandraId.timeBased();
+        MessageUid messageUid = MessageUid.of(1);
+        CassandraMessageId messageId = messageIdFactory.generate();
+        testee.insert(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build())
+            .block();
+
+        testee.delete(mailboxId, messageUid).block();
+
+        testee.updateMetadata(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid))
+                .flags(new Flags(org.apache.james.mailbox.cassandra.table.Flag.ANSWERED))
+                .modSeq(ModSeq.of(2))
+                .build())
+            .block();
+
+        Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).block();
+        assertThat(message.isPresent()).isFalse();
+    }
+
     @Test
     void deleteShouldDeleteOnlyConcernedRowWhenMultipleRowExists() {
         CassandraId mailboxId = CassandraId.timeBased();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 08/13: JAMES-3494 fix compilation issues

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e592c4913388c9ba3c0f1fea60c15b737b186197
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:51:40 2021 +0100

    JAMES-3494 fix compilation issues
---
 .../apache/james/eventsourcing/CommandDispatcher.scala   | 11 ++++++-----
 .../blob/deduplication/DeDuplicationBlobStore.scala      | 16 +++++++++++-----
 .../org/apache/james/jmap/method/EmailGetMethod.scala    |  2 +-
 .../james/jmap/method/EmailSetUpdatePerformer.scala      |  6 ++----
 .../james/jmap/method/EmailSubmissionSetMethod.scala     |  2 +-
 .../org/apache/james/jmap/method/MailboxGetMethod.scala  |  2 +-
 .../james/jmap/method/MailboxSetCreatePerformer.scala    |  2 +-
 .../james/jmap/method/VacationResponseGetMethod.scala    |  2 +-
 .../scala/org/apache/james/jmap/routes/JmapApi.scala     |  2 +-
 9 files changed, 25 insertions(+), 20 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index a547bc4..ce2e40c 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -19,12 +19,13 @@
 package org.apache.james.eventsourcing
 
 import java.util
-
 import com.google.common.base.Preconditions
+
 import javax.inject.Inject
 import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.SMono
+import reactor.util.retry.Retry
 
 import scala.jdk.CollectionConverters._
 
@@ -51,11 +52,11 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
 
   def dispatch(c: Command): Publisher[util.List[_ <: Event]] = {
     tryDispatch(c)
-      .retry(CommandDispatcher.MAX_RETRY, {
+      .retryWhen(Retry.max(CommandDispatcher.MAX_RETRY)
+        .filter {
         case _: EventStoreFailedException => true
         case _ => false
-      })
-      .onErrorMap({
+      }).onErrorMap({
         case _: EventStoreFailedException => CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY)
         case error => error
       })
@@ -75,7 +76,7 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
         SMono(eventsPublisher)
           .flatMap(events => eventBus.publish(events.asScala).`then`(SMono.just(events)))
       case _ =>
-        SMono.raiseError(CommandDispatcher.UnknownCommandException(c))
+        SMono.error(CommandDispatcher.UnknownCommandException(c))
     }
   }
 
diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 6cab727..a32bdff 100644
--- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -20,10 +20,11 @@
 package org.apache.james.server.blob.deduplication
 
 import java.io.InputStream
-
+import java.util.concurrent.Callable
 import com.google.common.base.Preconditions
 import com.google.common.hash.{Hashing, HashingInputStream}
 import com.google.common.io.{ByteSource, FileBackedOutputStream}
+
 import javax.inject.{Inject, Named}
 import org.apache.commons.io.IOUtils
 import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName}
@@ -32,6 +33,8 @@ import reactor.core.publisher.Mono
 import reactor.core.scala.publisher.SMono
 import reactor.util.function.{Tuple2, Tuples}
 
+import scala.compat.java8.FunctionConverters._
+
 object DeDuplicationBlobStore {
   val DEFAULT_BUCKET = "defaultBucket"
   val LAZY_RESOURCE_CLEANUP = false
@@ -56,10 +59,13 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
     val hashingInputStream = new HashingInputStream(Hashing.sha256, data)
-    val sourceSupplier: FileBackedOutputStream => SMono[BlobId] = (fileBackedOutputStream: FileBackedOutputStream) => saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream)
-    Mono.using(() => new FileBackedOutputStream(DeDuplicationBlobStore.FILE_THRESHOLD),
-      sourceSupplier,
-      (fileBackedOutputStream: FileBackedOutputStream) => fileBackedOutputStream.reset(),
+    val sourceSupplier: FileBackedOutputStream => Mono[BlobId] = (fileBackedOutputStream: FileBackedOutputStream) => saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream).asJava()
+    val ressourceSupplier: Callable[FileBackedOutputStream] = () => new FileBackedOutputStream(DeDuplicationBlobStore.FILE_THRESHOLD)
+
+    Mono.using(
+      ressourceSupplier,
+      sourceSupplier.asJava,
+      ((fileBackedOutputStream: FileBackedOutputStream) => fileBackedOutputStream.reset()).asJava,
       DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP)
   }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
index 3ee9579..736b656 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
@@ -167,7 +167,7 @@ class EmailGetMethod @Inject() (readerFactory: EmailViewReaderFactory,
     }))
 
     SFlux.merge(Seq(retrieveEmails(messagesIds, mailboxSession, request), parsingErrors))
-      .reduce(EmailGetResults.empty(), EmailGetResults.merge)
+      .reduce(EmailGetResults.empty())(EmailGetResults.merge)
   }
 
   private def asMessageId(id: UnparsedEmailId): Either[(UnparsedEmailId, IllegalArgumentException),  MessageId] =
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index 8f11639..d40273b 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -169,8 +169,7 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
 
   private def updateByRange(ranges: List[MessageRange],
                             metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                            operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] = {
-
+                            operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] =
     SFlux.fromIterable(ranges)
       .concatMap(range => {
         val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
@@ -183,8 +182,7 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
           .onErrorResume(e => SMono.just(messageIds.map(id => EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
           .subscribeOn(Schedulers.elastic())
       })
-      .reduce(Seq(), _ ++ _)
-  }
+      .reduce(Seq[EmailUpdateResult]())( _ ++ _)
 
   private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
                                 metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
index b5791b4..e63e283 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
@@ -162,7 +162,7 @@ class EmailSubmissionSetMethod @Inject()(serializer: EmailSubmissionSetSerialize
     SFlux.fromIterable(request.create
       .getOrElse(Map.empty)
       .view)
-      .foldLeft((CreationResults(Nil), processingContext)) {
+      .fold((CreationResults(Nil), processingContext)) {
         (acc : (CreationResults, ProcessingContext), elem: (EmailSubmissionCreationId, JsObject)) => {
           val (emailSubmissionCreationId, jsObject) = elem
           val (creationResult, updatedProcessingContext) = createSubmission(session, emailSubmissionCreationId, jsObject, acc._2)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
index dc7f123..04cd1f8 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
@@ -79,7 +79,7 @@ class MailboxGetMethod @Inject() (serializer: MailboxSerializer,
     val requestedProperties: Properties = request.properties.getOrElse(Mailbox.allProperties)
     (requestedProperties -- Mailbox.allProperties match {
       case invalidProperties if invalidProperties.isEmpty() => getMailboxes(capabilities, request, mailboxSession)
-        .reduce(MailboxGetResults.empty(), MailboxGetResults.merge)
+        .reduce(MailboxGetResults.empty())(MailboxGetResults.merge)
         .flatMap(mailboxes => retrieveState(capabilities, mailboxSession)
           .map(state => mailboxes.asResponse(request.accountId, state)))
         .map(mailboxGetResponse => Invocation(
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetCreatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetCreatePerformer.scala
index 4fe260a..e9db0ef 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetCreatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetCreatePerformer.scala
@@ -88,7 +88,7 @@ class MailboxSetCreatePerformer @Inject()(serializer: MailboxSerializer,
     SFlux.fromIterable(mailboxSetRequest.create
       .getOrElse(Map.empty)
       .view)
-      .foldLeft((MailboxCreationResults(Nil), processingContext)){
+      .fold((MailboxCreationResults(Nil), processingContext)){
         (acc : (MailboxCreationResults, ProcessingContext), elem: (MailboxCreationId, JsObject)) => {
           val (mailboxCreationId, jsObject) = elem
           val (creationResult, updatedProcessingContext) = createMailbox(mailboxSession, mailboxCreationId, jsObject, acc._2)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
index 0bdcb31..6683f84 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
@@ -67,7 +67,7 @@ class VacationResponseGetMethod @Inject()(vacationRepository: VacationRepository
       val requestedProperties: Properties = request.properties.getOrElse(VacationResponse.allProperties)
       (requestedProperties -- VacationResponse.allProperties match {
         case invalidProperties if invalidProperties.isEmpty() => getVacationResponse(request, mailboxSession)
-          .reduce(VacationResponseGetResult.empty, VacationResponseGetResult.merge)
+          .reduce(VacationResponseGetResult.empty)(VacationResponseGetResult.merge)
           .map(vacationResult => vacationResult.asResponse(request.accountId))
           .map(vacationResponseGetResponse => Invocation(
             methodName = methodName,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JmapApi.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JmapApi.scala
index a8ca359..65a0320 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JmapApi.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JmapApi.scala
@@ -59,7 +59,7 @@ class JMAPApi (methods: Set[Method], defaultCapabilities: Set[Capability]) {
 
   private def processSequentiallyAndUpdateContext(requestObject: RequestObject, mailboxSession: MailboxSession, processingContext: ProcessingContext, capabilities: Set[CapabilityIdentifier]): SMono[Seq[(InvocationWithContext)]] =
     SFlux.fromIterable(requestObject.methodCalls)
-      .foldLeft(List[SFlux[InvocationWithContext]]())((acc, elem) => {
+      .fold(List[SFlux[InvocationWithContext]]())((acc, elem) => {
         val lastProcessingContext: SMono[ProcessingContext] = acc.headOption
           .map(last => SMono.fromPublisher(Flux.from(last.map(_.processingContext)).last()))
           .getOrElse(SMono.just(processingContext))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 12/13: JAMES-3494 fix json body of request by quoting mailbox ids

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 14bbab3475e8c139863c6acca4fc48efebf4073d
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jan 27 09:54:59 2021 +0100

    JAMES-3494 fix json body of request by quoting mailbox ids
---
 .../methods/integration/SetMessagesMethodTest.java | 24 +++++++++++-----------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
index fc5d16a..e898c0d 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
@@ -1181,18 +1181,18 @@ public abstract class SetMessagesMethodTest {
         given()
             .header("Authorization", accessToken.asString())
             .body(String.format("[[\"setMessages\", {\"update\": {" +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]}, " +
-                    "  \"%s\" : { \"mailboxIds\": [" + mailboxId.serialize() + "]} " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]}, " +
+                    "  \"%s\" : { \"mailboxIds\": [\"" + mailboxId.serialize() + "\"]} " +
                     "} }, \"#0\"]]", serializedMessageId1, serializedMessageId2, serializedMessageId3,
                 serializedMessageId4, serializedMessageId5, serializedMessageId6,
                 serializedMessageId7, serializedMessageId8, serializedMessageId9,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/13: JAMES-3494 update dependencies

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dd6f8165ec8e8838776fd6e3dd91298c9f4c9c91
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jan 25 15:04:09 2021 +0100

    JAMES-3494 update dependencies
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index e4f7545..ea61d1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2250,14 +2250,14 @@
             <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-bom</artifactId>
-                <version>Dysprosium-SR6</version>
+                <version>2020.0.3</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
             <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
-                <version>0.5.1</version>
+                <version>0.8.0</version>
             </dependency>
             <dependency>
                 <groupId>io.rest-assured</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org