You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:42 UTC
[james-project] 06/30: JAMES-2937 Get rid of simple channelPool
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7423ee71cba86f767d82c7d3dcca50af9d574caa
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 31 13:46:15 2019 +0700
JAMES-2937 Get rid of simple channelPool
---
.../backends/rabbitmq/RabbitMQChannelPool.java | 55 ---------
.../james/backends/rabbitmq/SimpleChannelPool.java | 128 ---------------------
.../james/modules/rabbitmq/RabbitMQModule.java | 5 -
3 files changed, 188 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java
deleted file mode 100644
index a544efb..0000000
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import reactor.core.publisher.Flux;
-import reactor.rabbitmq.AcknowledgableDelivery;
-
-public interface RabbitMQChannelPool {
- class ConnectionFailedException extends RuntimeException {
- public ConnectionFailedException(Throwable cause) {
- super(cause);
- }
- }
-
- @FunctionalInterface
- interface RabbitFunction<T, E extends Throwable> {
- T execute(Channel channel) throws E;
- }
-
- @FunctionalInterface
- interface RabbitConsumer<E extends Throwable> {
- void execute(Channel channel) throws E;
- }
-
- <T, E extends Throwable> T execute(RabbitFunction<T, E> f)
- throws E, ConnectionFailedException;
-
-
- <E extends Throwable> void execute(RabbitConsumer<E> f)
- throws E, ConnectionFailedException;
-
- Flux<AcknowledgableDelivery> receive(String queueName);
-
- boolean tryConnection();
-
- void close() throws Exception;
-}
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java
deleted file mode 100644
index c322af7..0000000
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.rabbitmq;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.PreDestroy;
-import javax.inject.Inject;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.annotations.VisibleForTesting;
-import com.rabbitmq.client.Channel;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-
-public class SimpleChannelPool implements RabbitMQChannelPool {
- private final AtomicReference<Channel> channelReference;
- private final Receiver rabbitFlux;
- private final SimpleConnectionPool connectionPool;
-
- @Inject
- @VisibleForTesting
- SimpleChannelPool(SimpleConnectionPool connectionPool) {
- this.connectionPool = connectionPool;
- this.channelReference = new AtomicReference<>();
- this.rabbitFlux = RabbitFlux
- .createReceiver(new ReceiverOptions().connectionMono(connectionPool.getResilientConnection()));
- }
-
- @Override
- public Flux<AcknowledgableDelivery> receive(String queueName) {
- return rabbitFlux.consumeManualAck(queueName);
- }
-
- @Override
- public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException {
- return f.execute(getResilientChannel().block());
- }
-
- @Override
- public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException {
- f.execute(getResilientChannel().block());
- }
-
- @PreDestroy
- @Override
- public void close() {
- Optional.ofNullable(channelReference.get())
- .filter(Channel::isOpen)
- .ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing());
-
- try {
- rabbitFlux.close();
- } catch (Throwable ignored) {
- //ignore exception during close
- }
- }
-
- private Mono<Channel> getResilientChannel() {
- int numRetries = 100;
- Duration initialDelay = Duration.ofMillis(100);
- Duration forever = Duration.ofMillis(Long.MAX_VALUE);
- return Mono.defer(this::getOpenChannel)
- .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic());
- }
-
- private Mono<Channel> getOpenChannel() {
- Channel previous = channelReference.get();
- return Mono.justOrEmpty(previous)
- .publishOn(Schedulers.boundedElastic())
- .filter(Channel::isOpen)
- .switchIfEmpty(connectionPool.getResilientConnection()
- .flatMap(connection -> Mono.fromCallable(connection::createChannel)))
- .flatMap(current -> replaceCurrentChannel(previous, current))
- .onErrorMap(t -> new RuntimeException("unable to create and register a new Channel", t));
- }
-
- private Mono<Channel> replaceCurrentChannel(Channel previous, Channel current) {
- if (channelReference.compareAndSet(previous, current)) {
- return Mono.just(current);
- } else {
- try {
- current.close();
- } catch (IOException | TimeoutException e) {
- //error below
- }
- return Mono.error(new RuntimeException("unable to create and register a new Channel"));
- }
- }
-
- @Override
- public boolean tryConnection() {
- try {
- return connectionPool.tryConnection() &&
- getOpenChannel()
- .blockOptional()
- .isPresent();
- } catch (Throwable t) {
- return false;
- }
- }
-}
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 4226aea..e887d6a 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -26,11 +26,9 @@ import javax.inject.Singleton;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleChannelPool;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.queue.api.MailQueueFactory;
@@ -79,9 +77,6 @@ public class RabbitMQModule extends AbstractModule {
bind(CassandraMailQueueMailDelete.class).in(Scopes.SINGLETON);
bind(CassandraMailQueueMailStore.class).in(Scopes.SINGLETON);
- bind(SimpleChannelPool.class).in(Scopes.SINGLETON);
- bind(RabbitMQChannelPool.class).to(SimpleChannelPool.class);
-
Multibinder<CassandraModule> cassandraModuleBinder = Multibinder.newSetBinder(binder(), CassandraModule.class);
cassandraModuleBinder.addBinding().toInstance(CassandraMailQueueViewModule.MODULE);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org