You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/09 08:28:50 UTC

[james-project] branch master updated: JAMES-2813 make TerminationSubscriberContract less time-sensitive

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 03df179  JAMES-2813 make TerminationSubscriberContract less time-sensitive
     new 6eb6796  Merge remote-tracking branch 'mbaechler/fix-termination-subscriber-contract'
03df179 is described below

commit 03df179112e4deef2ec716517383a5f04f7fadb8
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Oct 7 17:18:57 2019 +0200

    JAMES-2813 make TerminationSubscriberContract less time-sensitive
---
 .../RabbitMQTerminationSubscriberTest.java         | 28 +++++++++++--------
 .../task/eventsourcing/TerminationSubscriber.scala |  2 +-
 .../TerminationSubscriberContract.java             | 32 ++++++++++++++--------
 3 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index a6a9ab9..ce0991d 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -21,7 +21,9 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -32,12 +34,12 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.eventsourcing.TerminationSubscriber;
 import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
-
-import com.github.steveash.guavate.Guavate;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.github.steveash.guavate.Guavate;
 import reactor.core.publisher.Flux;
-import reactor.core.scheduler.Schedulers;
 
 class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
     private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer();
@@ -59,15 +61,19 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
         TerminationSubscriber subscriber1 = subscriber();
         TerminationSubscriber subscriber2 = subscriber();
 
+        Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+        Flux<Event> secondListener = Flux.from(subscriber2.listenEvents());
+
         sendEvents(subscriber1, COMPLETED_EVENT);
 
-        List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2)
-            .subscribeOn(Schedulers.boundedElastic())
-            .flatMap(this::collectEvents)
-            .collectList()
-            .block();
-        assertThat(listenedEvents).hasSize(2);
-        assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT);
-        assertThat(listenedEvents.get(1)).containsExactly(COMPLETED_EVENT);
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+        List<Event> receivedEventsSecond = new ArrayList<>();
+        secondListener.subscribe(receivedEventsSecond::add);
+
+        Awaitility.await().atMost(ONE_MINUTE).until(() -> receivedEventsFirst.size() == 1 && receivedEventsSecond.size() == 1);
+
+        assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+        assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
     }
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
index af23af4..b0c705b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -36,7 +36,7 @@ trait TerminationSubscriber extends Subscriber {
 }
 
 class MemoryTerminationSubscriber extends TerminationSubscriber {
-  private val events = DirectProcessor.create[Event]()
+  private val events: DirectProcessor[Event] = DirectProcessor.create[Event]()
 
   override def addEvent(event: Event): Unit =
     events.onNext(event)
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 705b291..af7fda8 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -21,8 +21,10 @@
 package org.apache.james.task.eventsourcing;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.james.eventsourcing.Event;
@@ -31,7 +33,9 @@ import org.apache.james.task.Hostname;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
 import org.assertj.core.api.ListAssert;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -97,16 +101,20 @@ public interface TerminationSubscriberContract {
     default void multipleListeningEventsShouldShareEvents() {
         TerminationSubscriber subscriber = subscriber();
 
+        Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
+        Flux<Event> secondListener = Flux.from(subscriber.listenEvents());
+
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
 
-        List<List<Event>> listenedEvents = Flux.range(0, 2)
-            .subscribeOn(Schedulers.boundedElastic())
-            .flatMap(ignored -> collectEvents(subscriber))
-            .collectList()
-            .block();
-        assertThat(listenedEvents).hasSize(2);
-        assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
-        assertThat(listenedEvents.get(1)).isEqualTo(listenedEvents.get(0));
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+        List<Event> receivedEventsSecond = new ArrayList<>();
+        secondListener.subscribe(receivedEventsSecond::add);
+
+        Awaitility.await().atMost(ONE_MINUTE).until(() -> receivedEventsFirst.size() == 3 && receivedEventsSecond.size() == 3);
+
+        assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+        assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
     }
 
     @Test
@@ -116,19 +124,19 @@ public interface TerminationSubscriberContract {
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
 
         List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2)))
-            .then(Mono.defer(() -> collectEvents(subscriber)))
+            .then(Mono.defer(() -> collectEvents(subscriber.listenEvents())))
             .subscribeOn(Schedulers.boundedElastic())
             .block();
         assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT);
     }
 
     default ListAssert<Event> assertEvents(TerminationSubscriber subscriber) {
-        return assertThat(collectEvents(subscriber)
+        return assertThat(collectEvents(subscriber.listenEvents())
             .block());
     }
 
-    default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber) {
-        return Flux.from(subscriber.listenEvents())
+    default Mono<List<Event>> collectEvents(Publisher<Event> listener) {
+        return Flux.from(listener)
             .subscribeOn(Schedulers.boundedElastic())
             .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7)))
             .collectList();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org