You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:40 UTC

[james-project] 05/07: JAMES-2813 replace exclusive consumer usage with a queue with single active consumer

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8ef3cabdcd3f47c42a66bc405b5c0096ef6afd3f
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:33:22 2019 +0200

    JAMES-2813 replace exclusive consumer usage with a queue with single active consumer
---
 .../distributed/RabbitMQExclusiveConsumer.java     | 174 ---------------------
 .../distributed/RabbitMQWorkQueue.java             |  13 +-
 2 files changed, 7 insertions(+), 180 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
deleted file mode 100644
index e6d4e1c..0000000
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.task.eventsourcing.distributed;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.CancelCallback;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.DeliverCallback;
-import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.RabbitFluxException;
-import reactor.rabbitmq.ReceiverOptions;
-
-/**
- * Taken from {@link reactor.rabbitmq.Receiver}
- * In order to be able to set the `exclusive` parameter to `true`
- * to the `channel.basicConsume` method.
- *
- * @deprecated to remove once the parallel execution of task has been implemented
- */
-@Deprecated
-public class RabbitMQExclusiveConsumer implements Closeable {
-    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new RabbitMQExclusiveConsumer.ChannelCreationFunction();
-    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQExclusiveConsumer.class);
-
-    private static final boolean NON_LOCAL = true;
-    private static final boolean EXCLUSIVE = true;
-    private static final boolean AUTO_ACK = true;
-
-    private static class ChannelCreationFunction implements Function<Connection, Channel> {
-
-        @Override
-        public Channel apply(Connection connection) {
-            try {
-                return connection.createChannel();
-            } catch (IOException e) {
-                throw new RabbitFluxException("Error while creating channel", e);
-            }
-        }
-    }
-
-    private Mono<? extends Connection> connectionMono;
-    private final AtomicBoolean hasConnection;
-    private final Scheduler connectionSubscriptionScheduler;
-    private final boolean privateConnectionSubscriptionScheduler;
-
-    public RabbitMQExclusiveConsumer(ReceiverOptions options) {
-        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
-        this.connectionSubscriptionScheduler = privateConnectionSubscriptionScheduler ?
-            createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
-        hasConnection = new AtomicBoolean(false);
-        this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono() :
-            Mono.fromCallable(() -> options.getConnectionFactory().newConnection())
-                .doOnSubscribe(c -> hasConnection.set(true))
-                .subscribeOn(this.connectionSubscriptionScheduler)
-                .cache();
-    }
-
-    protected Scheduler createScheduler(String name) {
-        return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name);
-    }
-
-    public Flux<AcknowledgableDelivery> consumeExclusiveManualAck(final String queue, ConsumeOptions options) {
-        // TODO track flux so it can be disposed when the sender is closed?
-        // could be also developer responsibility
-        return Flux.create(emitter -> connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
-            try {
-                if (options.getQos() != 0) {
-                    channel.basicQos(options.getQos());
-                }
-
-                DeliverCallback deliverCallback = (consumerTag, message) -> {
-                    AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, channel, options.getExceptionHandler());
-                    if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery)) {
-                        emitter.next(delivery);
-                    }
-                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), message)) {
-                        emitter.complete();
-                    }
-                };
-
-                AtomicBoolean basicCancel = new AtomicBoolean(true);
-                CancelCallback cancelCallback = consumerTag -> {
-                    LOGGER.info("Flux consumer {} has been cancelled", consumerTag);
-                    basicCancel.set(false);
-                    emitter.complete();
-                };
-
-                completeOnChannelShutdown(channel, emitter);
-
-                Map<String, Object> arguments = ImmutableMap.of();
-                final String consumerTag = channel.basicConsume(queue, !AUTO_ACK, UUID.randomUUID().toString(), !NON_LOCAL, EXCLUSIVE, arguments, deliverCallback, cancelCallback);
-                AtomicBoolean cancelled = new AtomicBoolean(false);
-                LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
-                emitter.onDispose(() -> {
-                    LOGGER.info("Cancelling consumer {} consuming from {}", consumerTag, queue);
-                    if (cancelled.compareAndSet(false, true)) {
-                        try {
-                            if (channel.isOpen() && channel.getConnection().isOpen()) {
-                                if (basicCancel.compareAndSet(true, false)) {
-                                    channel.basicCancel(consumerTag);
-                                }
-                                channel.close();
-                            }
-                        } catch (TimeoutException | IOException e) {
-                            // Not sure what to do, not much we can do,
-                            // logging should be enough.
-                            // Maybe one good reason to introduce an exception handler to choose more easily.
-                            LOGGER.warn("Error while closing channel: " + e.getMessage());
-                        }
-                    }
-                });
-            } catch (IOException e) {
-                throw new RabbitFluxException(e);
-            }
-        }, emitter::error), options.getOverflowStrategy());
-    }
-
-    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
-        channel.addShutdownListener(reason -> {
-            if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
-                emitter.complete();
-            }
-        });
-    }
-
-    public void close() {
-        if (hasConnection.getAndSet(false)) {
-            try {
-                // FIXME use timeout on block (should be a parameter of the Receiver)
-                connectionMono.block().close();
-            } catch (IOException e) {
-                throw new RabbitFluxException(e);
-            }
-        }
-        if (privateConnectionSubscriptionScheduler) {
-            this.connectionSubscriptionScheduler.dispose();
-        }
-    }
-}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 62a2f1b..54d8b03 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.UUID;
 
+import org.apache.james.backends.rabbitmq.Constants;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
@@ -70,7 +71,8 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private final TaskManagerWorker worker;
     private final ReactorRabbitMQChannelPool channelPool;
     private final JsonTaskSerializer taskSerializer;
-    private RabbitMQExclusiveConsumer receiver;
+    private Sender sender;
+    private Receiver receiver;
     private UnicastProcessor<TaskId> sendCancelRequestsQueue;
     private Disposable sendCancelRequestsQueueHandle;
     private Disposable receiverHandle;
@@ -92,15 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private void startWorkqueue() {
         channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
         channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
 
         consumeWorkqueue();
     }
 
     private void consumeWorkqueue() {
-        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
-        receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+        receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
+        receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
             .flatMap(this::executeTask)
             .subscribe();
@@ -195,10 +197,9 @@ public class RabbitMQWorkQueue implements WorkQueue {
     @Override
     public void close() {
         Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
-        Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
+        Optional.ofNullable(receiver).ifPresent(Receiver::close);
         Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
-        Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
         Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org