You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/09 08:28:50 UTC
[james-project] branch master updated: JAMES-2813 make
TerminationSubscriberContract less time-sensitive
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 03df179 JAMES-2813 make TerminationSubscriberContract less time-sensitive
new 6eb6796 Merge remote-tracking branch 'mbaechler/fix-termination-subscriber-contract'
03df179 is described below
commit 03df179112e4deef2ec716517383a5f04f7fadb8
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Oct 7 17:18:57 2019 +0200
JAMES-2813 make TerminationSubscriberContract less time-sensitive
---
.../RabbitMQTerminationSubscriberTest.java | 28 +++++++++++--------
.../task/eventsourcing/TerminationSubscriber.scala | 2 +-
.../TerminationSubscriberContract.java | 32 ++++++++++++++--------
3 files changed, 38 insertions(+), 24 deletions(-)
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index a6a9ab9..ce0991d 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -21,7 +21,9 @@
package org.apache.james.task.eventsourcing.distributed;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -32,12 +34,12 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
-
-import com.github.steveash.guavate.Guavate;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.github.steveash.guavate.Guavate;
import reactor.core.publisher.Flux;
-import reactor.core.scheduler.Schedulers;
class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer();
@@ -59,15 +61,19 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
TerminationSubscriber subscriber1 = subscriber();
TerminationSubscriber subscriber2 = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+ Flux<Event> secondListener = Flux.from(subscriber2.listenEvents());
+
sendEvents(subscriber1, COMPLETED_EVENT);
- List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2)
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(this::collectEvents)
- .collectList()
- .block();
- assertThat(listenedEvents).hasSize(2);
- assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT);
- assertThat(listenedEvents.get(1)).containsExactly(COMPLETED_EVENT);
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+ List<Event> receivedEventsSecond = new ArrayList<>();
+ secondListener.subscribe(receivedEventsSecond::add);
+
+ Awaitility.await().atMost(ONE_MINUTE).until(() -> receivedEventsFirst.size() == 1 && receivedEventsSecond.size() == 1);
+
+ assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+ assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
}
}
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
index af23af4..b0c705b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -36,7 +36,7 @@ trait TerminationSubscriber extends Subscriber {
}
class MemoryTerminationSubscriber extends TerminationSubscriber {
- private val events = DirectProcessor.create[Event]()
+ private val events: DirectProcessor[Event] = DirectProcessor.create[Event]()
override def addEvent(event: Event): Unit =
events.onNext(event)
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 705b291..af7fda8 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -21,8 +21,10 @@
package org.apache.james.task.eventsourcing;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import org.apache.james.eventsourcing.Event;
@@ -31,7 +33,9 @@ import org.apache.james.task.Hostname;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.assertj.core.api.ListAssert;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -97,16 +101,20 @@ public interface TerminationSubscriberContract {
default void multipleListeningEventsShouldShareEvents() {
TerminationSubscriber subscriber = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
+ Flux<Event> secondListener = Flux.from(subscriber.listenEvents());
+
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
- List<List<Event>> listenedEvents = Flux.range(0, 2)
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(ignored -> collectEvents(subscriber))
- .collectList()
- .block();
- assertThat(listenedEvents).hasSize(2);
- assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
- assertThat(listenedEvents.get(1)).isEqualTo(listenedEvents.get(0));
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+ List<Event> receivedEventsSecond = new ArrayList<>();
+ secondListener.subscribe(receivedEventsSecond::add);
+
+ Awaitility.await().atMost(ONE_MINUTE).until(() -> receivedEventsFirst.size() == 3 && receivedEventsSecond.size() == 3);
+
+ assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
+ assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
}
@Test
@@ -116,19 +124,19 @@ public interface TerminationSubscriberContract {
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2)))
- .then(Mono.defer(() -> collectEvents(subscriber)))
+ .then(Mono.defer(() -> collectEvents(subscriber.listenEvents())))
.subscribeOn(Schedulers.boundedElastic())
.block();
assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT);
}
default ListAssert<Event> assertEvents(TerminationSubscriber subscriber) {
- return assertThat(collectEvents(subscriber)
+ return assertThat(collectEvents(subscriber.listenEvents())
.block());
}
- default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber) {
- return Flux.from(subscriber.listenEvents())
+ default Mono<List<Event>> collectEvents(Publisher<Event> listener) {
+ return Flux.from(listener)
.subscribeOn(Schedulers.boundedElastic())
.take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7)))
.collectList();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org