You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/08 08:01:02 UTC
[43/47] james-project git commit: MAILBOX-367 RabbitMQEvenBus should
publish events
MAILBOX-367 RabbitMQEvenBus should publish events
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b1e241c4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b1e241c4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b1e241c4
Branch: refs/heads/master
Commit: b1e241c4dc8e39174efa12eb400cd2a2f0cc633b
Parents: f1ac836
Author: datph <dp...@linagora.com>
Authored: Mon Jan 7 10:33:37 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 8 14:47:47 2019 +0700
----------------------------------------------------------------------
.../backend/rabbitmq/RabbitMQExtension.java | 9 +-
mailbox/event/event-rabbitmq/pom.xml | 101 ++++++++++++++++
.../james/mailbox/events/RabbitMQEventBus.java | 89 ++++++++++++++
.../events/RabbitMQEventBusPublishingTest.java | 121 +++++++++++++++++++
mailbox/pom.xml | 1 +
pom.xml | 7 +-
6 files changed, 325 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index d6d2ead..a64c95f 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -43,6 +43,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
private DockerRabbitMQ rabbitMQ;
private SimpleChannelPool simpleChannelPool;
+ private RabbitMQConnectionFactory connectionFactory;
@Override
public void beforeAll(ExtensionContext context) {
@@ -52,7 +53,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
- RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
+ connectionFactory = createRabbitConnectionFactory();
this.simpleChannelPool = new SimpleChannelPool(connectionFactory);
}
@@ -84,6 +85,10 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
return rabbitMQ;
}
+ public RabbitMQConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
private RabbitMQConnectionFactory createRabbitConnectionFactory() throws URISyntaxException {
RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder()
.amqpUri(rabbitMQ.amqpUri())
@@ -98,4 +103,4 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
rabbitMQConfiguration,
new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)));
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml
new file mode 100644
index 0000000..c671468
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>apache-james-mailbox</artifactId>
+ <groupId>org.apache.james</groupId>
+ <version>3.3.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>apache-james-mailbox-event-rabbitmq</artifactId>
+ <name>Apache James :: Mailbox :: Event :: RabbitMQ implementation</name>
+ <description>RabbitMQ implementation for the eventbus API</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apache-james-backends-rabbitmq</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apache-james-backends-rabbitmq</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apache-james-mailbox-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apache-james-mailbox-api</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apache-james-mailbox-event-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.rabbitmq</groupId>
+ <artifactId>reactor-rabbitmq</artifactId>
+ <version>1.0.0.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
new file mode 100644
index 0000000..4ce792a
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+public class RabbitMQEventBus implements EventBus {
+ static final String MAILBOX_EVENT = "mailboxEvent";
+ static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
+ static final String EMPTY_ROUTING_KEY = "";
+
+ private static final boolean DURABLE = true;
+ private static final String DIRECT_EXCHANGE = "direct";
+
+ private final EventSerializer eventSerializer;
+ private final Sender sender;
+
+ RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
+ this.eventSerializer = eventSerializer;
+ SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
+ this.sender = RabbitFlux.createSender(senderOption);
+ }
+
+ Mono<Void> start() {
+ return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .durable(DURABLE)
+ .type(DIRECT_EXCHANGE))
+ .subscribeOn(Schedulers.elastic())
+ .then();
+ }
+
+ @Override
+ public Registration register(MailboxListener listener, RegistrationKey key) {
+ throw new NotImplementedException("will implement latter");
+ }
+
+ @Override
+ public Registration register(MailboxListener listener, Group group) {
+ throw new NotImplementedException("will implement latter");
+ }
+
+ @Override
+ public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
+ Mono<OutboundMessage> outboundMessage = Mono.just(event)
+ .publishOn(Schedulers.parallel())
+ .map(this::serializeEvent)
+ .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+
+ Mono<Void> publishMono = sender.send(outboundMessage).cache();
+ publishMono.subscribe();
+ return publishMono;
+ }
+
+ private byte[] serializeEvent(Event event) {
+ return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
new file mode 100644
index 0000000..50a37ca
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
@@ -0,0 +1,121 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusContract.EVENT;
+import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusPublishingTest {
+ private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+
+ @RegisterExtension
+ static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+ private RabbitMQEventBus eventBus;
+ private EventSerializer eventSerializer;
+ private RabbitMQConnectionFactory connectionFactory;
+
+ @BeforeEach
+ void setUp() {
+ connectionFactory = rabbitMQExtension.getConnectionFactory();
+
+ eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory());
+ eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+ eventBus.start().block();
+
+ createQueue();
+ }
+
+ private void createQueue() {
+ SenderOptions senderOption = new SenderOptions()
+ .connectionMono(Mono.fromSupplier(connectionFactory::create));
+ Sender sender = RabbitFlux.createSender(senderOption);
+
+ sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS))
+ .block();
+ sender.bind(BindingSpecification.binding()
+ .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .queue(MAILBOX_WORK_QUEUE_NAME)
+ .routingKey(EMPTY_ROUTING_KEY))
+ .block();
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQ() {
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+ eventBus.dispatch(EVENT, NO_KEYS);
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+
+ private Event dequeueEvent() {
+ RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
+ Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+ byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+ .blockFirst()
+ .getBody();
+
+ return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+ .get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/pom.xml b/mailbox/pom.xml
index f7f7869..e363b7e 100644
--- a/mailbox/pom.xml
+++ b/mailbox/pom.xml
@@ -42,6 +42,7 @@
<module>elasticsearch</module>
<module>event/event-memory</module>
+ <module>event/event-rabbitmq</module>
<module>event/json</module>
<module>jpa</module>
http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1cc4a2d..56486ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -762,6 +762,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-event-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>apache-james-mailbox-event-memory</artifactId>
<version>${project.version}</version>
</dependency>
@@ -3376,4 +3381,4 @@
-->
</plugins>
</reporting>
-</project>
+</project>
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org