You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:42 UTC

[james-project] 06/30: JAMES-2937 Get rid of simple channelPool

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7423ee71cba86f767d82c7d3dcca50af9d574caa
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 31 13:46:15 2019 +0700

    JAMES-2937 Get rid of simple channelPool
---
 .../backends/rabbitmq/RabbitMQChannelPool.java     |  55 ---------
 .../james/backends/rabbitmq/SimpleChannelPool.java | 128 ---------------------
 .../james/modules/rabbitmq/RabbitMQModule.java     |   5 -
 3 files changed, 188 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java
deleted file mode 100644
index a544efb..0000000
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import reactor.core.publisher.Flux;
-import reactor.rabbitmq.AcknowledgableDelivery;
-
-public interface RabbitMQChannelPool {
-    class ConnectionFailedException extends RuntimeException {
-        public ConnectionFailedException(Throwable cause) {
-            super(cause);
-        }
-    }
-
-    @FunctionalInterface
-    interface RabbitFunction<T, E extends Throwable> {
-        T execute(Channel channel) throws E;
-    }
-
-    @FunctionalInterface
-    interface RabbitConsumer<E extends Throwable> {
-        void execute(Channel channel) throws E;
-    }
-
-    <T, E extends Throwable> T execute(RabbitFunction<T, E> f)
-        throws E, ConnectionFailedException;
-
-
-    <E extends Throwable> void execute(RabbitConsumer<E> f)
-        throws E, ConnectionFailedException;
-
-    Flux<AcknowledgableDelivery> receive(String queueName);
-
-    boolean tryConnection();
-
-    void close() throws Exception;
-}
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java
deleted file mode 100644
index c322af7..0000000
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.rabbitmq;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.PreDestroy;
-import javax.inject.Inject;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.annotations.VisibleForTesting;
-import com.rabbitmq.client.Channel;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-
-public class SimpleChannelPool implements RabbitMQChannelPool {
-    private final AtomicReference<Channel> channelReference;
-    private final Receiver rabbitFlux;
-    private final SimpleConnectionPool connectionPool;
-
-    @Inject
-    @VisibleForTesting
-    SimpleChannelPool(SimpleConnectionPool connectionPool) {
-        this.connectionPool = connectionPool;
-        this.channelReference = new AtomicReference<>();
-        this.rabbitFlux = RabbitFlux
-            .createReceiver(new ReceiverOptions().connectionMono(connectionPool.getResilientConnection()));
-    }
-
-    @Override
-    public Flux<AcknowledgableDelivery> receive(String queueName) {
-        return rabbitFlux.consumeManualAck(queueName);
-    }
-
-    @Override
-    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException {
-        return f.execute(getResilientChannel().block());
-    }
-
-    @Override
-    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException {
-        f.execute(getResilientChannel().block());
-    }
-
-    @PreDestroy
-    @Override
-    public void close() {
-        Optional.ofNullable(channelReference.get())
-            .filter(Channel::isOpen)
-            .ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing());
-
-        try {
-            rabbitFlux.close();
-        } catch (Throwable ignored) {
-            //ignore exception during close
-        }
-    }
-
-    private Mono<Channel> getResilientChannel() {
-        int numRetries = 100;
-        Duration initialDelay = Duration.ofMillis(100);
-        Duration forever = Duration.ofMillis(Long.MAX_VALUE);
-        return Mono.defer(this::getOpenChannel)
-            .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic());
-    }
-
-    private Mono<Channel> getOpenChannel() {
-        Channel previous = channelReference.get();
-        return Mono.justOrEmpty(previous)
-            .publishOn(Schedulers.boundedElastic())
-            .filter(Channel::isOpen)
-            .switchIfEmpty(connectionPool.getResilientConnection()
-                .flatMap(connection -> Mono.fromCallable(connection::createChannel)))
-            .flatMap(current -> replaceCurrentChannel(previous, current))
-            .onErrorMap(t -> new RuntimeException("unable to create and register a new Channel", t));
-    }
-
-    private Mono<Channel> replaceCurrentChannel(Channel previous, Channel current) {
-        if (channelReference.compareAndSet(previous, current)) {
-            return Mono.just(current);
-        } else {
-            try {
-                current.close();
-            } catch (IOException | TimeoutException e) {
-                //error below
-            }
-            return Mono.error(new RuntimeException("unable to create and register a new Channel"));
-        }
-    }
-
-    @Override
-    public boolean tryConnection() {
-        try {
-            return connectionPool.tryConnection() &&
-                getOpenChannel()
-                    .blockOptional()
-                    .isPresent();
-        } catch (Throwable t) {
-            return false;
-        }
-    }
-}
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 4226aea..e887d6a 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -26,11 +26,9 @@ import javax.inject.Singleton;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleChannelPool;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -79,9 +77,6 @@ public class RabbitMQModule extends AbstractModule {
         bind(CassandraMailQueueMailDelete.class).in(Scopes.SINGLETON);
         bind(CassandraMailQueueMailStore.class).in(Scopes.SINGLETON);
 
-        bind(SimpleChannelPool.class).in(Scopes.SINGLETON);
-        bind(RabbitMQChannelPool.class).to(SimpleChannelPool.class);
-
         Multibinder<CassandraModule> cassandraModuleBinder = Multibinder.newSetBinder(binder(), CassandraModule.class);
         cassandraModuleBinder.addBinding().toInstance(CassandraMailQueueViewModule.MODULE);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org