You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/02 04:32:19 UTC

[james-project] 12/15: JAMES-3078 Allow writting reactive methods

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8d756d9bd4ff58b6e66283c5c03300dcc488e4f2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 22 18:03:17 2020 +0700

    JAMES-3078 Allow writting reactive methods
---
 .../james/jmap/draft/methods/GetFilterMethod.java  |   2 +-
 .../jmap/draft/methods/GetMailboxesMethod.java     |   2 +-
 .../jmap/draft/methods/GetMessageListMethod.java   |   4 +-
 .../jmap/draft/methods/GetMessagesMethod.java      |   2 +-
 .../draft/methods/GetVacationResponseMethod.java   |   2 +-
 .../jmap/draft/methods/JmapResponseWriter.java     |   6 +-
 .../jmap/draft/methods/JmapResponseWriterImpl.java |   5 +-
 .../apache/james/jmap/draft/methods/Method.java    |  16 ++-
 .../james/jmap/draft/methods/RequestHandler.java   |  22 +--
 .../james/jmap/draft/methods/SetFilterMethod.java  |   2 +-
 .../jmap/draft/methods/SetMailboxesMethod.java     |   2 +-
 .../jmap/draft/methods/SetMessagesMethod.java      |   2 +-
 .../draft/methods/SetVacationResponseMethod.java   |   2 +-
 .../org/apache/james/jmap/http/JMAPApiRoutes.java  |   8 +-
 .../jmap/draft/methods/GetMailboxesMethodTest.java |  24 ++--
 .../jmap/draft/methods/GetMessagesMethodTest.java  |  46 +++---
 .../methods/GetVacationResponseMethodTest.java     |   6 +-
 .../draft/methods/JmapResponseWriterImplTest.java  | 155 +++++++++++----------
 .../jmap/draft/methods/RequestHandlerTest.java     |   7 +-
 .../jmap/draft/methods/SetMailboxesMethodTest.java |  12 +-
 .../methods/SetVacationResponseMethodTest.java     |  18 +--
 .../apache/james/jmap/http/JMAPApiRoutesTest.java  |   5 +-
 22 files changed, 185 insertions(+), 165 deletions(-)

diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
index 390fb69..5bb238c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
@@ -65,7 +65,7 @@ public class GetFilterMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(methodCallId);
         Preconditions.checkNotNull(mailboxSession);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
index 375524f..da0ad86 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
@@ -88,7 +88,7 @@ public class GetMailboxesMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof GetMailboxesRequest);
         GetMailboxesRequest mailboxesRequest = (GetMailboxesRequest) request;
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
index eb6e66e..19c7503 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
@@ -89,7 +89,7 @@ public class GetMessageListMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof GetMessageListRequest);
 
         GetMessageListRequest messageListRequest = (GetMessageListRequest) request;
@@ -180,7 +180,7 @@ public class GetMessageListMethod implements Method {
                     .ids(messageListResponse.getMessageIds())
                     .properties(messageListRequest.getFetchMessageProperties())
                     .build();
-            return getMessagesMethod.process(getMessagesRequest, methodCallId, mailboxSession);
+            return getMessagesMethod.processToStream(getMessagesRequest, methodCallId, mailboxSession);
         }
         return Stream.empty();
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java
index 383455e..d16367f 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessagesMethod.java
@@ -81,7 +81,7 @@ public class GetMessagesMethod implements Method {
     }
     
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(mailboxSession);
         Preconditions.checkArgument(request instanceof GetMessagesRequest);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
index 6717312..18c3017 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
@@ -64,7 +64,7 @@ public class GetVacationResponseMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(methodCallId);
         Preconditions.checkNotNull(mailboxSession);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriter.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriter.java
index fcf4bd3..2bd49a2 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriter.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriter.java
@@ -19,12 +19,12 @@
 
 package org.apache.james.jmap.draft.methods;
 
-import java.util.stream.Stream;
-
 import org.apache.james.jmap.draft.model.InvocationResponse;
 
+import reactor.core.publisher.Flux;
+
 public interface JmapResponseWriter {
 
-    Stream<InvocationResponse> formatMethodResponse(Stream<JmapResponse> jmapResponse);
+    Flux<InvocationResponse> formatMethodResponse(Flux<JmapResponse> jmapResponse);
 
 }
\ No newline at end of file
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImpl.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImpl.java
index 13a043c..a0e1dd8 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImpl.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImpl.java
@@ -21,7 +21,6 @@ package org.apache.james.jmap.draft.methods;
 
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -36,6 +35,8 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
 import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Flux;
+
 public class JmapResponseWriterImpl implements JmapResponseWriter {
 
     public static final String PROPERTIES_FILTER = "propertiesFilter";
@@ -47,7 +48,7 @@ public class JmapResponseWriterImpl implements JmapResponseWriter {
     }
 
     @Override
-    public Stream<InvocationResponse> formatMethodResponse(Stream<JmapResponse> jmapResponses) {
+    public Flux<InvocationResponse> formatMethodResponse(Flux<JmapResponse> jmapResponses) {
         return jmapResponses.map(jmapResponse -> {
             ObjectMapper objectMapper = newConfiguredObjectMapper(jmapResponse);
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java
index 62909b6..564f6de 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/Method.java
@@ -30,6 +30,10 @@ import org.apache.james.mailbox.MailboxSession;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public interface Method {
 
     String JMAP_PREFIX = "JMAP-";
@@ -123,7 +127,15 @@ public interface Method {
     Request.Name requestHandled();
 
     Class<? extends JmapRequest> requestType();
-    
-    Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession);
 
+    default Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+        return Mono.fromCallable(() -> processToStream(request, methodCallId, mailboxSession))
+            .flatMapMany(Flux::fromStream)
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    default Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+        return process(request, methodCallId, mailboxSession)
+            .toStream();
+    }
 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/RequestHandler.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/RequestHandler.java
index 261c06f..2b71839 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/RequestHandler.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/RequestHandler.java
@@ -26,7 +26,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -39,6 +38,8 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
+
 public class RequestHandler {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RequestHandler.class);
@@ -55,7 +56,7 @@ public class RequestHandler {
                 .collect(Collectors.toMap(Method::requestHandled, Function.identity()));
     }
 
-    public Stream<InvocationResponse> handle(AuthenticatedRequest request) throws IOException {
+    public Flux<InvocationResponse> handle(AuthenticatedRequest request) {
         Optional<MailboxSession> mailboxSession = Optional.ofNullable(request.getMailboxSession());
         try (Closeable closeable =
                  MDCBuilder.create()
@@ -67,23 +68,24 @@ public class RequestHandler {
                 .map(extractAndProcess(request))
                 .map(jmapResponseWriter::formatMethodResponse)
                 .orElseThrow(() -> new IllegalStateException("unknown method " + request.getMethodName()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
     
-    private Function<Method, Stream<JmapResponse>> extractAndProcess(AuthenticatedRequest request) {
+    private Function<Method, Flux<JmapResponse>> extractAndProcess(AuthenticatedRequest request) {
         MailboxSession mailboxSession = request.getMailboxSession();
         return (Method method) -> {
                     try {
                         JmapRequest jmapRequest = jmapRequestParser.extractJmapRequest(request, method.requestType());
-                        return method.process(jmapRequest, request.getMethodCallId(), mailboxSession);
+                        return method.process(jmapRequest, request.getMethodCallId(), mailboxSession)
+                            .onErrorResume(JmapFieldNotSupportedException.class, e -> errorNotImplemented(e, request));
                     } catch (IOException e) {
                         LOGGER.error("Error occured while parsing the request.", e);
                         if (e.getCause() instanceof JmapFieldNotSupportedException) {
                             return errorNotImplemented((JmapFieldNotSupportedException) e.getCause(), request);
                         }
                         return error(request, generateInvalidArgumentError(e.getMessage()));
-                    } catch (JmapFieldNotSupportedException e) {
-                        return errorNotImplemented(e, request);
                     }
                 };
     }
@@ -95,16 +97,16 @@ public class RequestHandler {
             .build();
     }
 
-    private Stream<JmapResponse> errorNotImplemented(JmapFieldNotSupportedException error, AuthenticatedRequest request) {
-        return Stream.of(
+    private Flux<JmapResponse> errorNotImplemented(JmapFieldNotSupportedException error, AuthenticatedRequest request) {
+        return Flux.just(
                 JmapResponse.builder()
                     .methodCallId(request.getMethodCallId())
                     .error(generateInvalidArgumentError("The field '" + error.getField() + "' of '" + error.getIssuer() + "' is not supported"))
                     .build());
     }
 
-    private Stream<JmapResponse> error(AuthenticatedRequest request, ErrorResponse error) {
-        return Stream.of(
+    private Flux<JmapResponse> error(AuthenticatedRequest request, ErrorResponse error) {
+        return Flux.just(
                 JmapResponse.builder()
                     .methodCallId(request.getMethodCallId())
                     .error(error)
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetFilterMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetFilterMethod.java
index 2cf6523..b6e82d7 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetFilterMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetFilterMethod.java
@@ -102,7 +102,7 @@ public class SetFilterMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(methodCallId);
         Preconditions.checkNotNull(mailboxSession);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java
index 56f6ed9..32291fe 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMailboxesMethod.java
@@ -59,7 +59,7 @@ public class SetMailboxesMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(methodCallId);
         Preconditions.checkNotNull(mailboxSession);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java
index 886896d..dd7b385 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesMethod.java
@@ -59,7 +59,7 @@ public class SetMessagesMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof SetMessagesRequest);
         SetMessagesRequest setMessagesRequest = (SetMessagesRequest) request;
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java
index 0f11529..215223c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethod.java
@@ -69,7 +69,7 @@ public class SetVacationResponseMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkNotNull(request);
         Preconditions.checkNotNull(methodCallId);
         Preconditions.checkNotNull(mailboxSession);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
index bbe3269..9973616 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
@@ -104,7 +104,7 @@ public class JMAPApiRoutes implements JMAPRoutes {
             requestAsJsonStream(request)
                 .map(InvocationRequest::deserialize)
                 .map(invocationRequest -> AuthenticatedRequest.decorate(invocationRequest, session))
-                .concatMap(this::handle)
+                .concatMap(requestHandler::handle)
                 .map(InvocationResponse::asProtocolSpecification);
 
         return sendResponses(response, responses);
@@ -125,12 +125,6 @@ public class JMAPApiRoutes implements JMAPRoutes {
                 .then());
     }
 
-    private Flux<? extends InvocationResponse> handle(AuthenticatedRequest request) {
-        return Mono.fromCallable(() -> requestHandler.handle(request))
-            .flatMapMany(Flux::fromStream)
-            .subscribeOn(Schedulers.elastic());
-    }
-
     private Flux<JsonNode[]> requestAsJsonStream(HttpServerRequest req) {
         return req.receive().aggregate().asInputStream()
             .map(inputStream -> {
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
index 2ac5c67..f0b9cca 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
@@ -88,7 +88,7 @@ public class GetMailboxesMethodTest {
 
         MailboxSession mailboxSession = mailboxManager.createSystemSession(USERNAME);
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -112,7 +112,7 @@ public class GetMailboxesMethodTest {
                 .build();
         MailboxSession session = MailboxSessionUtil.create(USERNAME);
 
-        List<JmapResponse> getMailboxesResponse = testee.process(getMailboxesRequest, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = testee.processToStream(getMailboxesRequest, methodCallId, session).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -142,7 +142,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -167,7 +167,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, userSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, userSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -188,7 +188,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -209,7 +209,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -230,7 +230,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -258,7 +258,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -288,7 +288,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -318,7 +318,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -357,7 +357,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
@@ -387,7 +387,7 @@ public class GetMailboxesMethodTest {
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()
                 .build();
 
-        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.process(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
+        List<JmapResponse> getMailboxesResponse = getMailboxesMethod.processToStream(getMailboxesRequest, methodCallId, mailboxSession).collect(Collectors.toList());
 
         assertThat(getMailboxesResponse)
                 .hasSize(1)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMessagesMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMessagesMethodTest.java
index 35c2bcc..41b2ad1 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMessagesMethodTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMessagesMethodTest.java
@@ -155,24 +155,24 @@ public class GetMessagesMethodTest {
     @Test
     public void processShouldThrowWhenNullRequest() {
         GetMessagesRequest request = null;
-        assertThatThrownBy(() -> testee.process(request, mock(MethodCallId.class), mock(MailboxSession.class))).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> testee.processToStream(request, mock(MethodCallId.class), mock(MailboxSession.class))).isInstanceOf(NullPointerException.class);
     }
 
     @Test
     public void processShouldThrowWhenNullSession() {
         MailboxSession mailboxSession = null;
-        assertThatThrownBy(() -> testee.process(mock(GetMessagesRequest.class), mock(MethodCallId.class), mailboxSession)).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> testee.processToStream(mock(GetMessagesRequest.class), mock(MethodCallId.class), mailboxSession)).isInstanceOf(NullPointerException.class);
     }
 
     @Test
     public void processShouldThrowWhenNullMethodCallId() {
         MethodCallId methodCallId = null;
-        assertThatThrownBy(() -> testee.process(mock(GetMessagesRequest.class), methodCallId, mock(MailboxSession.class))).isInstanceOf(NullPointerException.class);
+        assertThatThrownBy(() -> testee.processToStream(mock(GetMessagesRequest.class), methodCallId, mock(MailboxSession.class))).isInstanceOf(NullPointerException.class);
     }
 
     @Test
     public void processShouldThrowWhenRequestHasAccountId() {
-        assertThatThrownBy(() -> testee.process(
+        assertThatThrownBy(() -> testee.processToStream(
             GetMessagesRequest.builder().accountId("abc").build(), mock(MethodCallId.class), mock(MailboxSession.class))).isInstanceOf(NotImplementedException.class);
     }
 
@@ -190,7 +190,7 @@ public class GetMessagesMethodTest {
                 message3.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -221,7 +221,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -244,7 +244,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of())
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getProperties())
@@ -261,7 +261,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message1.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getProperties())
             .isEqualTo(Optional.of(MessageProperty.allOutputProperties()));
@@ -280,7 +280,7 @@ public class GetMessagesMethodTest {
 
         Set<MessageProperty> expected = Sets.newHashSet(MessageProperty.id, MessageProperty.subject);
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getProperties())
             .isEqualTo(Optional.of(expected));
@@ -299,7 +299,7 @@ public class GetMessagesMethodTest {
 
         Set<MessageProperty> expected = Sets.newHashSet(MessageProperty.id, MessageProperty.textBody);
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getProperties())
@@ -322,7 +322,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -350,7 +350,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -384,7 +384,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -418,7 +418,7 @@ public class GetMessagesMethodTest {
 
         Set<MessageProperty> expected = Sets.newHashSet(MessageProperty.id, MessageProperty.headers);
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         assertThat(result.get(0).getProperties())
@@ -444,7 +444,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("headers.from", "headers.heADER2"))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result)
             .hasSize(1)
@@ -481,7 +481,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("mailboxIds"))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         Method.Response response = result.get(0).getResponse();
@@ -518,7 +518,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("mailboxIds"))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         Method.Response response = result.get(0).getResponse();
@@ -553,7 +553,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("mailboxIds", "textBody"))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         Method.Response response = result.get(0).getResponse();
@@ -588,7 +588,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("mailboxIds", "to"))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1);
         Method.Response response = result.get(0).getResponse();
@@ -624,7 +624,7 @@ public class GetMessagesMethodTest {
             .properties(ImmutableList.of("mailboxIds"))
             .build();
 
-        List<JmapResponse> responses = testee.process(request, methodCallId, session).collect(Guavate.toImmutableList());
+        List<JmapResponse> responses = testee.processToStream(request, methodCallId, session).collect(Guavate.toImmutableList());
 
         assertThat(responses).hasSize(1);
         Method.Response response = responses.get(0).getResponse();
@@ -661,7 +661,7 @@ public class GetMessagesMethodTest {
                 message3.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -720,7 +720,7 @@ public class GetMessagesMethodTest {
                 message3.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
@@ -762,7 +762,7 @@ public class GetMessagesMethodTest {
             .ids(ImmutableList.of(message1.getMessageId()))
             .build();
 
-        List<JmapResponse> result = testee.process(request, methodCallId, session).collect(Collectors.toList());
+        List<JmapResponse> result = testee.processToStream(request, methodCallId, session).collect(Collectors.toList());
 
         assertThat(result).hasSize(1)
             .extracting(JmapResponse::getResponse)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethodTest.java
index 9b41160..242bc8c 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethodTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethodTest.java
@@ -105,7 +105,7 @@ public class GetVacationResponseMethodTest {
 
         GetVacationRequest getVacationRequest = GetVacationRequest.builder().build();
 
-        Stream<JmapResponse> result = testee.process(getVacationRequest, methodCallId, mailboxSession);
+        Stream<JmapResponse> result = testee.processToStream(getVacationRequest, methodCallId, mailboxSession);
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -137,7 +137,7 @@ public class GetVacationResponseMethodTest {
 
         GetVacationRequest getVacationRequest = GetVacationRequest.builder().build();
 
-        Stream<JmapResponse> result = testee.process(getVacationRequest, methodCallId, mailboxSession);
+        Stream<JmapResponse> result = testee.processToStream(getVacationRequest, methodCallId, mailboxSession);
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -171,7 +171,7 @@ public class GetVacationResponseMethodTest {
 
         GetVacationRequest getVacationRequest = GetVacationRequest.builder().build();
 
-        Stream<JmapResponse> result = testee.process(getVacationRequest, methodCallId, mailboxSession);
+        Stream<JmapResponse> result = testee.processToStream(getVacationRequest, methodCallId, mailboxSession);
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImplTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImplTest.java
index 6c496ed..67469eb 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImplTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/JmapResponseWriterImplTest.java
@@ -28,10 +28,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.james.jmap.draft.json.ObjectMapperFactory;
-import org.apache.james.jmap.draft.model.MethodCallId;
-import org.apache.james.jmap.draft.model.Property;
 import org.apache.james.jmap.draft.model.InvocationRequest;
 import org.apache.james.jmap.draft.model.InvocationResponse;
+import org.apache.james.jmap.draft.model.MethodCallId;
+import org.apache.james.jmap.draft.model.Property;
 import org.apache.james.mailbox.inmemory.InMemoryId;
 import org.apache.james.mailbox.inmemory.InMemoryMessageId;
 import org.junit.Before;
@@ -46,6 +46,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 public class JmapResponseWriterImplTest {
     private JmapResponseWriterImpl testee;
 
@@ -61,16 +63,17 @@ public class JmapResponseWriterImplTest {
         String expectedMethodCallId = "#1";
         String expectedId = "myId";
 
-        Stream<InvocationResponse> response = testee.formatMethodResponse(Stream.of(JmapResponse
-                .builder()
-                .methodCallId(MethodCallId.of(expectedMethodCallId))
-                .response(null)
-                .build()));
+        Stream<InvocationResponse> response = testee.formatMethodResponse(Flux.just(JmapResponse
+            .builder()
+            .methodCallId(MethodCallId.of(expectedMethodCallId))
+            .response(null)
+            .build()))
+            .toStream();
 
         List<InvocationResponse> responseList = response.collect(Collectors.toList());
         assertThat(responseList).hasSize(1)
-                .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("id").asText(), InvocationResponse::getMethodCallId)
-                .containsExactly(tuple(expectedMethod, expectedId, expectedMethodCallId));
+            .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("id").asText(), InvocationResponse::getMethodCallId)
+            .containsExactly(tuple(expectedMethod, expectedId, expectedMethodCallId));
     }
 
     @Test
@@ -82,24 +85,25 @@ public class JmapResponseWriterImplTest {
         responseClass.id = expectedId;
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
+            Flux.just(JmapResponse
                 .builder()
                 .responseName(Method.Response.name("unknownMethod"))
                 .methodCallId(MethodCallId.of(expectedMethodCallId))
                 .response(responseClass)
                 .build()))
-                .collect(Collectors.toList());
+            .collectList()
+            .block();
 
         assertThat(response).hasSize(1)
-                .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("id").asText(), InvocationResponse::getMethodCallId)
-                .containsExactly(tuple(Method.Response.name("unknownMethod"), expectedId, MethodCallId.of(expectedMethodCallId)));
+            .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("id").asText(), InvocationResponse::getMethodCallId)
+            .containsExactly(tuple(Method.Response.name("unknownMethod"), expectedId, MethodCallId.of(expectedMethodCallId)));
     }
 
     private static class ResponseClass implements Method.Response {
 
         @SuppressWarnings("unused")
         public String id;
-        
+
     }
 
     @Test
@@ -109,14 +113,15 @@ public class JmapResponseWriterImplTest {
         Property property = () -> "id";
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
+            Flux.just(JmapResponse
                 .builder()
                 .responseName(Method.Response.name("unknownMethod"))
                 .methodCallId(MethodCallId.of("#1"))
                 .properties(ImmutableSet.of(property))
                 .response(responseClass)
                 .build()))
-                .collect(Collectors.toList());
+            .collectList()
+            .block();
 
         assertThat(response).hasSize(1);
         JsonNode firstObject = Iterables.getOnlyElement(response).getResults().get("list").elements().next();
@@ -125,7 +130,6 @@ public class JmapResponseWriterImplTest {
     }
 
 
-
     @Test
     public void formatMethodResponseShouldNotFilterFieldsWhenSecondCallWithoutProperties() {
         ObjectResponseClass responseClass = new ObjectResponseClass();
@@ -134,22 +138,24 @@ public class JmapResponseWriterImplTest {
 
         @SuppressWarnings("unused")
         Stream<InvocationResponse> ignoredResponse = testee.formatMethodResponse(
-                Stream.of(JmapResponse
-                        .builder()
-                        .responseName(Method.Response.name("unknownMethod"))
-                        .methodCallId(MethodCallId.of("#1"))
-                        .properties(ImmutableSet.of(property))
-                        .response(responseClass)
-                        .build()));
+            Flux.just(JmapResponse
+                .builder()
+                .responseName(Method.Response.name("unknownMethod"))
+                .methodCallId(MethodCallId.of("#1"))
+                .properties(ImmutableSet.of(property))
+                .response(responseClass)
+                .build()))
+            .toStream();
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
+            Flux.just(JmapResponse
                 .builder()
                 .responseName(Method.Response.name("unknownMethod"))
                 .methodCallId(MethodCallId.of("#1"))
                 .response(responseClass)
                 .build()))
-                .collect(Collectors.toList());
+            .collect(Collectors.toList())
+            .block();
 
         assertThat(response).hasSize(1);
         JsonNode firstObject = Iterables.getOnlyElement(response).getResults().get("list").elements().next();
@@ -166,28 +172,29 @@ public class JmapResponseWriterImplTest {
         Property nameProperty = () -> "name";
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
-                            .builder()
-                            .responseName(Method.Response.name("unknownMethod"))
-                            .methodCallId(MethodCallId.of("#1"))
-                            .properties(ImmutableSet.of(idProperty, nameProperty))
-                            .response(responseClass)
-                            .build(),
-                        JmapResponse
-                            .builder()
-                            .responseName(Method.Response.name("unknownMethod"))
-                            .methodCallId(MethodCallId.of("#1"))
-                            .properties(ImmutableSet.of(idProperty))
-                            .response(responseClass)
-                            .build()))
-                .collect(Collectors.toList());
+            Flux.just(JmapResponse
+                    .builder()
+                    .responseName(Method.Response.name("unknownMethod"))
+                    .methodCallId(MethodCallId.of("#1"))
+                    .properties(ImmutableSet.of(idProperty, nameProperty))
+                    .response(responseClass)
+                    .build(),
+                JmapResponse
+                    .builder()
+                    .responseName(Method.Response.name("unknownMethod"))
+                    .methodCallId(MethodCallId.of("#1"))
+                    .properties(ImmutableSet.of(idProperty))
+                    .response(responseClass)
+                    .build()))
+            .collectList()
+            .block();
 
         assertThat(response).hasSize(2)
-                .extracting(x -> x.getResults().get("list").elements().next())
-                .extracting(
-                        x -> x.get("id").asText(),
-                        x -> Optional.ofNullable(x.get("name")).map(JsonNode::asText).orElse(null))
-                .containsExactly(tuple("id", "name"), tuple("id", null));
+            .extracting(x -> x.getResults().get("list").elements().next())
+            .extracting(
+                x -> x.get("id").asText(),
+                x -> Optional.ofNullable(x.get("name")).map(JsonNode::asText).orElse(null))
+            .containsExactly(tuple("id", "name"), tuple("id", null));
     }
 
     @SuppressWarnings("unused")
@@ -196,13 +203,13 @@ public class JmapResponseWriterImplTest {
         private static class Foo {
             public String id;
             public String name;
-            
+
             public Foo(String id, String name) {
                 this.id = id;
                 this.name = name;
             }
         }
-        
+
         public List<Foo> list;
     }
 
@@ -212,21 +219,22 @@ public class JmapResponseWriterImplTest {
 
         ObjectNode parameters = new ObjectNode(new JsonNodeFactory(false));
         parameters.put("id", "myId");
-        JsonNode[] nodes = new JsonNode[] { new ObjectNode(new JsonNodeFactory(false)).textNode("unknwonMethod"),
-                parameters,
-                new ObjectNode(new JsonNodeFactory(false)).textNode(expectedMethodCallId)};
+        JsonNode[] nodes = new JsonNode[]{new ObjectNode(new JsonNodeFactory(false)).textNode("unknwonMethod"),
+            parameters,
+            new ObjectNode(new JsonNodeFactory(false)).textNode(expectedMethodCallId)};
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
-                    .builder()
-                    .methodCallId(InvocationRequest.deserialize(nodes).getMethodCallId())
-                    .error()
-                    .build()))
-                .collect(Collectors.toList());
+            Flux.just(JmapResponse
+                .builder()
+                .methodCallId(InvocationRequest.deserialize(nodes).getMethodCallId())
+                .error()
+                .build()))
+            .collectList()
+            .block();
 
         assertThat(response).hasSize(1)
-                .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("type").asText(), InvocationResponse::getMethodCallId)
-                .containsExactly(tuple(ErrorResponse.ERROR_METHOD, ErrorResponse.DEFAULT_ERROR_MESSAGE, MethodCallId.of(expectedMethodCallId)));
+            .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("type").asText(), InvocationResponse::getMethodCallId)
+            .containsExactly(tuple(ErrorResponse.ERROR_METHOD, ErrorResponse.DEFAULT_ERROR_MESSAGE, MethodCallId.of(expectedMethodCallId)));
     }
 
     @Test
@@ -235,25 +243,26 @@ public class JmapResponseWriterImplTest {
 
         ObjectNode parameters = new ObjectNode(new JsonNodeFactory(false));
         parameters.put("id", "myId");
-        JsonNode[] nodes = new JsonNode[] { new ObjectNode(new JsonNodeFactory(false)).textNode("unknwonMethod"),
-                parameters,
-                new ObjectNode(new JsonNodeFactory(false)).textNode(expectedMethodCallId)};
+        JsonNode[] nodes = new JsonNode[]{new ObjectNode(new JsonNodeFactory(false)).textNode("unknwonMethod"),
+            parameters,
+            new ObjectNode(new JsonNodeFactory(false)).textNode(expectedMethodCallId)};
 
         List<InvocationResponse> response = testee.formatMethodResponse(
-                Stream.of(JmapResponse
+            Flux.just(JmapResponse
+                .builder()
+                .methodCallId(InvocationRequest.deserialize(nodes).getMethodCallId())
+                .error(ErrorResponse
                     .builder()
-                    .methodCallId(InvocationRequest.deserialize(nodes).getMethodCallId())
-                    .error(ErrorResponse
-                            .builder()
-                            .type("errorType")
-                            .description("complete description")
-                            .build())
-                    .build()))
-                .collect(Collectors.toList());
+                    .type("errorType")
+                    .description("complete description")
+                    .build())
+                .build()))
+            .collectList()
+            .block();
 
         assertThat(response).hasSize(1)
-                .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("type").asText(), x -> x.getResults().get("description").asText(), InvocationResponse::getMethodCallId)
-                .containsExactly(tuple(ErrorResponse.ERROR_METHOD, "errorType", "complete description", MethodCallId.of(expectedMethodCallId)));
+            .extracting(InvocationResponse::getResponseName, x -> x.getResults().get("type").asText(), x -> x.getResults().get("description").asText(), InvocationResponse::getMethodCallId)
+            .containsExactly(tuple(ErrorResponse.ERROR_METHOD, "errorType", "complete description", MethodCallId.of(expectedMethodCallId)));
     }
 
 }
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java
index 0eed4d8..2934f62 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/RequestHandlerTest.java
@@ -106,7 +106,7 @@ public class RequestHandlerTest {
         }
 
         @Override
-        public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+        public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
             Preconditions.checkArgument(request instanceof TestJmapRequest);
             TestJmapRequest typedRequest = (TestJmapRequest) request;
             return Stream.of(
@@ -193,7 +193,7 @@ public class RequestHandlerTest {
         }
         
         @Override
-        public Stream<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+        public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
             return null;
         }
     }
@@ -209,7 +209,8 @@ public class RequestHandlerTest {
                 new ObjectNode(new JsonNodeFactory(false)).textNode("#1")};
 
         List<InvocationResponse> responses = testee.handle(AuthenticatedRequest.decorate(InvocationRequest.deserialize(nodes), session))
-                .collect(Collectors.toList());
+                .collectList()
+                .block();
 
         assertThat(responses).hasSize(1)
                 .extracting(
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
index 1ba5182..cb8be95 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java
@@ -61,7 +61,7 @@ public class SetMailboxesMethodTest {
     public void processShouldThrowWhenNullJmapRequest() {
         MailboxSession session = mock(MailboxSession.class);
         JmapRequest nullJmapRequest = null;
-        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).process(nullJmapRequest, MethodCallId.of("methodCallId"), session))
+        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).processToStream(nullJmapRequest, MethodCallId.of("methodCallId"), session))
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -70,7 +70,7 @@ public class SetMailboxesMethodTest {
         MailboxSession session = mock(MailboxSession.class);
         JmapRequest jmapRequest = mock(JmapRequest.class);
         MethodCallId nullMethodCallId = null;
-        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).process(jmapRequest, nullMethodCallId, session))
+        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).processToStream(jmapRequest, nullMethodCallId, session))
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -78,7 +78,7 @@ public class SetMailboxesMethodTest {
     public void processShouldThrowWhenNullMailboxSession() {
         MailboxSession nullMailboxSession = null;
         JmapRequest jmapRequest = mock(JmapRequest.class);
-        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).process(jmapRequest, MethodCallId.of("methodCallId"), nullMailboxSession))
+        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).processToStream(jmapRequest, MethodCallId.of("methodCallId"), nullMailboxSession))
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -86,7 +86,7 @@ public class SetMailboxesMethodTest {
     public void processShouldThrowWhenJmapRequestTypeMismatch() {
         MailboxSession session = mock(MailboxSession.class);
         JmapRequest getMailboxesRequest = GetMailboxesRequest.builder().build();
-        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).process(getMailboxesRequest, MethodCallId.of("methodCallId"), session))
+        assertThatThrownBy(() -> new SetMailboxesMethod(NO_PROCESSOR, TIME_METRIC_FACTORY).processToStream(getMailboxesRequest, MethodCallId.of("methodCallId"), session))
             .isInstanceOf(IllegalArgumentException.class);
     }
 
@@ -110,7 +110,7 @@ public class SetMailboxesMethodTest {
 
         Stream<JmapResponse> actual =
             new SetMailboxesMethod(ImmutableSet.of(creatorProcessor), TIME_METRIC_FACTORY)
-                    .process(creationRequest, MethodCallId.of("methodCallId"), session);
+                    .processToStream(creationRequest, MethodCallId.of("methodCallId"), session);
 
         assertThat(actual).contains(jmapResponse);
     }
@@ -133,7 +133,7 @@ public class SetMailboxesMethodTest {
 
         Stream<JmapResponse> actual =
             new SetMailboxesMethod(ImmutableSet.of(destructorProcessor), TIME_METRIC_FACTORY)
-                    .process(destructionRequest, MethodCallId.of("methodCallId"), session);
+                    .processToStream(destructionRequest, MethodCallId.of("methodCallId"), session);
 
         assertThat(actual).contains(jmapResponse);
     }
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethodTest.java
index c45f3f4..50695c3 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethodTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetVacationResponseMethodTest.java
@@ -73,22 +73,22 @@ public class SetVacationResponseMethodTest {
 
     @Test(expected = NullPointerException.class)
     public void processShouldThrowOnNullRequest() {
-        testee.process(null, mock(MethodCallId.class), mock(MailboxSession.class));
+        testee.processToStream(null, mock(MethodCallId.class), mock(MailboxSession.class));
     }
 
     @Test(expected = NullPointerException.class)
     public void processShouldThrowOnNullMethodCallId() {
-        testee.process(mock(SetMailboxesRequest.class), null, mock(MailboxSession.class));
+        testee.processToStream(mock(SetMailboxesRequest.class), null, mock(MailboxSession.class));
     }
 
     @Test(expected = NullPointerException.class)
     public void processShouldThrowOnNullMailboxSession() {
-        testee.process(mock(SetMailboxesRequest.class), mock(MethodCallId.class), null);
+        testee.processToStream(mock(SetMailboxesRequest.class), mock(MethodCallId.class), null);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void processShouldThrowOnWrongRequestType() {
-        testee.process(mock(GetMailboxesRequest.class), mock(MethodCallId.class), mock(MailboxSession.class));
+        testee.processToStream(mock(GetMailboxesRequest.class), mock(MethodCallId.class), mock(MailboxSession.class));
     }
 
     @Test
@@ -97,7 +97,7 @@ public class SetVacationResponseMethodTest {
             .update(ImmutableMap.of())
             .build();
 
-        Stream<JmapResponse> result = testee.process(setVacationRequest, methodCallId, mock(MailboxSession.class));
+        Stream<JmapResponse> result = testee.processToStream(setVacationRequest, methodCallId, mock(MailboxSession.class));
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -120,7 +120,7 @@ public class SetVacationResponseMethodTest {
                 .build()))
             .build();
 
-        Stream<JmapResponse> result = testee.process(setVacationRequest, methodCallId, mock(MailboxSession.class));
+        Stream<JmapResponse> result = testee.processToStream(setVacationRequest, methodCallId, mock(MailboxSession.class));
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -148,7 +148,7 @@ public class SetVacationResponseMethodTest {
                     .build()))
             .build();
 
-        Stream<JmapResponse> result = testee.process(setVacationRequest, methodCallId, mock(MailboxSession.class));
+        Stream<JmapResponse> result = testee.processToStream(setVacationRequest, methodCallId, mock(MailboxSession.class));
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -178,7 +178,7 @@ public class SetVacationResponseMethodTest {
         when(notificationRegistry.flush(any()))
             .thenReturn(Mono.empty());
 
-        Stream<JmapResponse> result = testee.process(setVacationRequest, methodCallId, mailboxSession);
+        Stream<JmapResponse> result = testee.processToStream(setVacationRequest, methodCallId, mailboxSession);
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
@@ -205,7 +205,7 @@ public class SetVacationResponseMethodTest {
             .build();
         when(mailboxSession.getUser()).thenReturn(USERNAME);
 
-        Stream<JmapResponse> result = testee.process(setVacationRequest, methodCallId, mailboxSession);
+        Stream<JmapResponse> result = testee.processToStream(setVacationRequest, methodCallId, mailboxSession);
 
         JmapResponse expected = JmapResponse.builder()
             .methodCallId(methodCallId)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/JMAPApiRoutesTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/JMAPApiRoutesTest.java
index 2a31456..e8cafca 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/JMAPApiRoutesTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/JMAPApiRoutesTest.java
@@ -48,6 +48,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.restassured.RestAssured;
 import io.restassured.builder.RequestSpecBuilder;
 import io.restassured.http.ContentType;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.DisposableServer;
 import reactor.netty.http.server.HttpServer;
@@ -115,7 +116,7 @@ public class JMAPApiRoutesTest {
         json.put("type", "invalidArgument");
 
         when(requestHandler.handle(any()))
-            .thenReturn(Stream.of(new InvocationResponse(ErrorResponse.ERROR_METHOD, json, MethodCallId.of("#0"))));
+            .thenReturn(Flux.just(new InvocationResponse(ErrorResponse.ERROR_METHOD, json, MethodCallId.of("#0"))));
 
         given()
             .body("[[\"getAccounts\", {\"state\":false}, \"#0\"]]")
@@ -137,7 +138,7 @@ public class JMAPApiRoutesTest {
         arrayNode.add(list);
 
         when(requestHandler.handle(any()))
-            .thenReturn(Stream.of(new InvocationResponse(Method.Response.name("accounts"), json, MethodCallId.of("#0"))));
+            .thenReturn(Flux.just(new InvocationResponse(Method.Response.name("accounts"), json, MethodCallId.of("#0"))));
 
         given()
             .body("[[\"getAccounts\", {}, \"#0\"]]")


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org