You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/11 09:58:15 UTC
[2/8] james-project git commit: MAILBOX-368 RabbitMQEventBus group
registration
MAILBOX-368 RabbitMQEventBus group registration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ab2fa642
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ab2fa642
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ab2fa642
Branch: refs/heads/master
Commit: ab2fa64264a67c702575a1c50fddeecfd9080027
Parents: 16ac299
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:21:15 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
mailbox/event/event-rabbitmq/pom.xml | 5 +
.../james/mailbox/events/EventDispatcher.java | 69 +++++++++
.../james/mailbox/events/GroupRegistration.java | 152 +++++++++++++++++++
.../events/GroupRegistrationHandler.java | 70 +++++++++
.../james/mailbox/events/RabbitMQEventBus.java | 54 +++----
.../events/RabbitMQEventBusPublishingTest.java | 121 ---------------
.../mailbox/events/RabbitMQEventBusTest.java | 148 ++++++++++++++++++
7 files changed, 467 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml
index c671468..0085059 100644
--- a/mailbox/event/event-rabbitmq/pom.xml
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -91,6 +91,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
new file mode 100644
index 0000000..b01fc53
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+public class EventDispatcher {
+ private final EventSerializer eventSerializer;
+ private final Sender sender;
+
+ EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+ this.eventSerializer = eventSerializer;
+ this.sender = sender;
+ }
+
+ void start() {
+ sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .durable(DURABLE)
+ .type(DIRECT_EXCHANGE))
+ .block();
+ }
+
+ Mono<Void> dispatch(Event event) {
+ Mono<OutboundMessage> outboundMessage = Mono.just(event)
+ .publishOn(Schedulers.parallel())
+ .map(this::serializeEvent)
+ .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+
+ return sender.send(outboundMessage)
+ .subscribeWith(MonoProcessor.create());
+ }
+
+ private byte[] serializeEvent(Event event) {
+ return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
new file mode 100644
index 0000000..36db6ae
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import play.api.libs.json.JsResult;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistration implements Registration {
+ static class WorkQueueName {
+ @VisibleForTesting
+ static WorkQueueName of(Class<? extends Group> clazz) {
+ return new WorkQueueName(clazz.getName());
+ }
+
+ static WorkQueueName of(Group group) {
+ return of(group.getClass());
+ }
+
+ static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
+
+ private final String name;
+
+ private WorkQueueName(String name) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Queue name must be specified");
+ this.name = name;
+ }
+
+ String asString() {
+ return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name;
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+
+ private final MailboxListener mailboxListener;
+ private final WorkQueueName queueName;
+ private final Receiver receiver;
+ private final Runnable unregisterGroup;
+ private final Sender sender;
+ private final EventSerializer eventSerializer;
+ private Optional<Disposable> receiverSubscriber;
+
+ GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+ MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+ this.eventSerializer = eventSerializer;
+ this.mailboxListener = mailboxListener;
+ this.queueName = WorkQueueName.of(group);
+ this.sender = sender;
+ this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+ this.receiverSubscriber = Optional.empty();
+ this.unregisterGroup = unregisterGroup;
+ }
+
+ GroupRegistration start() {
+ createGroupWorkQueue()
+ .doOnSuccess(any -> this.subscribeWorkQueue())
+ .block();
+ return this;
+ }
+
+ private Mono<Void> createGroupWorkQueue() {
+ return Flux.concat(
+ sender.declareQueue(QueueSpecification.queue(queueName.asString())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .queue(queueName.asString())
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then();
+ }
+
+ private void subscribeWorkQueue() {
+ receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+ .subscribeOn(Schedulers.parallel())
+ .map(Delivery::getBody)
+ .filter(Objects::nonNull)
+ .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
+ .map(eventSerializer::fromJson)
+ .map(JsResult::get)
+ .subscribeOn(Schedulers.elastic())
+ .subscribe(event -> deliverEvent(mailboxListener, event)));
+ }
+
+ private void deliverEvent(MailboxListener mailboxListener, Event event) {
+ try {
+ mailboxListener.event(event);
+ } catch (Exception e) {
+ LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
+ }
+ }
+
+ @Override
+ public void unregister() {
+ receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
+ .ifPresent(subscriber -> subscriber.dispose());
+ receiver.close();
+ unregisterGroup.run();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
new file mode 100644
index 0000000..c0f4339
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.MailboxListener;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistrationHandler {
+ private final Map<Group, GroupRegistration> groupRegistrations;
+ private final EventSerializer eventSerializer;
+ private final Sender sender;
+ private final Mono<Connection> connectionMono;
+
+ GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+ this.eventSerializer = eventSerializer;
+ this.sender = sender;
+ this.connectionMono = connectionMono;
+ this.groupRegistrations = new ConcurrentHashMap<>();
+ }
+
+ void stop() {
+ groupRegistrations.values().forEach(GroupRegistration::unregister);
+ }
+
+ Registration register(MailboxListener listener, Group group) {
+ return groupRegistrations
+ .compute(group, (groupToRegister, oldGroupRegistration) -> {
+ if (oldGroupRegistration != null) {
+ throw new GroupAlreadyRegistered(group);
+ }
+ return newGroupRegistration(listener, groupToRegister);
+ })
+ .start();
+ }
+
+ private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
+ return new GroupRegistration(
+ connectionMono,
+ sender,
+ eventSerializer,
+ listener,
+ group,
+ () -> groupRegistrations.remove(group));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 4ce792a..accd6cc 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -19,19 +19,19 @@
package org.apache.james.mailbox.events;
-import java.nio.charset.StandardCharsets;
import java.util.Set;
+import javax.annotation.PreDestroy;
+
import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
+import com.rabbitmq.client.Connection;
+
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
@@ -39,26 +39,26 @@ import reactor.rabbitmq.SenderOptions;
public class RabbitMQEventBus implements EventBus {
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
- static final String EMPTY_ROUTING_KEY = "";
-
- private static final boolean DURABLE = true;
- private static final String DIRECT_EXCHANGE = "direct";
- private final EventSerializer eventSerializer;
private final Sender sender;
+ private final GroupRegistrationHandler groupRegistrationHandler;
+ private final EventDispatcher eventDispatcher;
RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
- this.eventSerializer = eventSerializer;
- SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
- this.sender = RabbitFlux.createSender(senderOption);
+ Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+ this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+ this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+ this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+ }
+
+ public void start() {
+ eventDispatcher.start();
}
- Mono<Void> start() {
- return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .durable(DURABLE)
- .type(DIRECT_EXCHANGE))
- .subscribeOn(Schedulers.elastic())
- .then();
+ @PreDestroy
+ public void stop() {
+ groupRegistrationHandler.stop();
+ sender.close();
}
@Override
@@ -68,22 +68,14 @@ public class RabbitMQEventBus implements EventBus {
@Override
public Registration register(MailboxListener listener, Group group) {
- throw new NotImplementedException("will implement latter");
+ return groupRegistrationHandler.register(listener, group);
}
@Override
public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
- Mono<OutboundMessage> outboundMessage = Mono.just(event)
- .publishOn(Schedulers.parallel())
- .map(this::serializeEvent)
- .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
-
- Mono<Void> publishMono = sender.send(outboundMessage).cache();
- publishMono.subscribe();
- return publishMono;
- }
-
- private byte[] serializeEvent(Event event) {
- return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+ if (!event.isNoop()) {
+ return eventDispatcher.dispatch(event);
+ }
+ return Mono.empty();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
deleted file mode 100644
index 50a37ca..0000000
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
-import static org.apache.james.mailbox.events.EventBusContract.EVENT;
-import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
-import org.apache.james.backend.rabbitmq.RabbitMQExtension;
-import org.apache.james.event.json.EventSerializer;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.model.TestId;
-import org.apache.james.mailbox.model.TestMessageId;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import com.google.common.collect.ImmutableSet;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
-
-class RabbitMQEventBusPublishingTest {
- private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
-
- @RegisterExtension
- static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
-
- private RabbitMQEventBus eventBus;
- private EventSerializer eventSerializer;
- private RabbitMQConnectionFactory connectionFactory;
-
- @BeforeEach
- void setUp() {
- connectionFactory = rabbitMQExtension.getConnectionFactory();
-
- eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory());
- eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
- eventBus.start().block();
-
- createQueue();
- }
-
- private void createQueue() {
- SenderOptions senderOption = new SenderOptions()
- .connectionMono(Mono.fromSupplier(connectionFactory::create));
- Sender sender = RabbitFlux.createSender(senderOption);
-
- sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
- .arguments(NO_ARGUMENTS))
- .block();
- sender.bind(BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .queue(MAILBOX_WORK_QUEUE_NAME)
- .routingKey(EMPTY_ROUTING_KEY))
- .block();
- }
-
- @Test
- void dispatchShouldPublishSerializedEventToRabbitMQ() {
- eventBus.dispatch(EVENT, NO_KEYS).block();
-
- assertThat(dequeueEvent()).isEqualTo(EVENT);
- }
-
- @Test
- void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
- eventBus.dispatch(EVENT, NO_KEYS);
-
- assertThat(dequeueEvent()).isEqualTo(EVENT);
- }
-
-
- private Event dequeueEvent() {
- RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
- Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
- byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
- .blockFirst()
- .getBody();
-
- return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
- .get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
new file mode 100644
index 0000000..b39fb42
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+
+ @RegisterExtension
+ static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+ private RabbitMQEventBus eventBus;
+ private Sender sender;
+ private RabbitMQConnectionFactory connectionFactory;
+ private EventSerializer eventSerializer;
+
+ @BeforeEach
+ void setUp() {
+ connectionFactory = rabbitMQExtension.getConnectionFactory();
+ Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache();
+
+ TestId.Factory mailboxIdFactory = new TestId.Factory();
+ eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
+
+ eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+ eventBus.start();
+ sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+ }
+
+ @AfterEach
+ void tearDown() {
+ eventBus.stop();
+ ALL_GROUPS.stream()
+ .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
+ .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+ }
+
+ @Override
+ public EventBus eventBus() {
+ return eventBus;
+ }
+
+ @Nested
+ class PublishingTest {
+ private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+
+ @BeforeEach
+ void setUp() {
+ createQueue();
+ }
+
+ private void createQueue() {
+ SenderOptions senderOption = new SenderOptions()
+ .connectionMono(Mono.fromSupplier(connectionFactory::create));
+ Sender sender = RabbitFlux.createSender(senderOption);
+
+ sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS))
+ .block();
+ sender.bind(BindingSpecification.binding()
+ .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .queue(MAILBOX_WORK_QUEUE_NAME)
+ .routingKey(EMPTY_ROUTING_KEY))
+ .block();
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQ() {
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+ eventBus.dispatch(EVENT, NO_KEYS);
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+ private Event dequeueEvent() {
+ RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
+ Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+ byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+ .blockFirst()
+ .getBody();
+
+ return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+ .get();
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org