You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:40 UTC
[james-project] 05/07: JAMES-2813 replace exclusive consumer usage
with a queue with single active consumer
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8ef3cabdcd3f47c42a66bc405b5c0096ef6afd3f
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:33:22 2019 +0200
JAMES-2813 replace exclusive consumer usage with a queue with single active consumer
---
.../distributed/RabbitMQExclusiveConsumer.java | 174 ---------------------
.../distributed/RabbitMQWorkQueue.java | 13 +-
2 files changed, 7 insertions(+), 180 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
deleted file mode 100644
index e6d4e1c..0000000
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.task.eventsourcing.distributed;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.CancelCallback;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.DeliverCallback;
-import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.RabbitFluxException;
-import reactor.rabbitmq.ReceiverOptions;
-
-/**
- * Taken from {@link reactor.rabbitmq.Receiver}
- * In order to be able to set the `exclusive` parameter to `true`
- * to the `channel.basicConsume` method.
- *
- * @deprecated to remove once the parallel execution of task has been implemented
- */
-@Deprecated
-public class RabbitMQExclusiveConsumer implements Closeable {
- private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new RabbitMQExclusiveConsumer.ChannelCreationFunction();
- private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQExclusiveConsumer.class);
-
- private static final boolean NON_LOCAL = true;
- private static final boolean EXCLUSIVE = true;
- private static final boolean AUTO_ACK = true;
-
- private static class ChannelCreationFunction implements Function<Connection, Channel> {
-
- @Override
- public Channel apply(Connection connection) {
- try {
- return connection.createChannel();
- } catch (IOException e) {
- throw new RabbitFluxException("Error while creating channel", e);
- }
- }
- }
-
- private Mono<? extends Connection> connectionMono;
- private final AtomicBoolean hasConnection;
- private final Scheduler connectionSubscriptionScheduler;
- private final boolean privateConnectionSubscriptionScheduler;
-
- public RabbitMQExclusiveConsumer(ReceiverOptions options) {
- this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
- this.connectionSubscriptionScheduler = privateConnectionSubscriptionScheduler ?
- createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
- hasConnection = new AtomicBoolean(false);
- this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono() :
- Mono.fromCallable(() -> options.getConnectionFactory().newConnection())
- .doOnSubscribe(c -> hasConnection.set(true))
- .subscribeOn(this.connectionSubscriptionScheduler)
- .cache();
- }
-
- protected Scheduler createScheduler(String name) {
- return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name);
- }
-
- public Flux<AcknowledgableDelivery> consumeExclusiveManualAck(final String queue, ConsumeOptions options) {
- // TODO track flux so it can be disposed when the sender is closed?
- // could be also developer responsibility
- return Flux.create(emitter -> connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
- try {
- if (options.getQos() != 0) {
- channel.basicQos(options.getQos());
- }
-
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, channel, options.getExceptionHandler());
- if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery)) {
- emitter.next(delivery);
- }
- if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), message)) {
- emitter.complete();
- }
- };
-
- AtomicBoolean basicCancel = new AtomicBoolean(true);
- CancelCallback cancelCallback = consumerTag -> {
- LOGGER.info("Flux consumer {} has been cancelled", consumerTag);
- basicCancel.set(false);
- emitter.complete();
- };
-
- completeOnChannelShutdown(channel, emitter);
-
- Map<String, Object> arguments = ImmutableMap.of();
- final String consumerTag = channel.basicConsume(queue, !AUTO_ACK, UUID.randomUUID().toString(), !NON_LOCAL, EXCLUSIVE, arguments, deliverCallback, cancelCallback);
- AtomicBoolean cancelled = new AtomicBoolean(false);
- LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
- emitter.onDispose(() -> {
- LOGGER.info("Cancelling consumer {} consuming from {}", consumerTag, queue);
- if (cancelled.compareAndSet(false, true)) {
- try {
- if (channel.isOpen() && channel.getConnection().isOpen()) {
- if (basicCancel.compareAndSet(true, false)) {
- channel.basicCancel(consumerTag);
- }
- channel.close();
- }
- } catch (TimeoutException | IOException e) {
- // Not sure what to do, not much we can do,
- // logging should be enough.
- // Maybe one good reason to introduce an exception handler to choose more easily.
- LOGGER.warn("Error while closing channel: " + e.getMessage());
- }
- }
- });
- } catch (IOException e) {
- throw new RabbitFluxException(e);
- }
- }, emitter::error), options.getOverflowStrategy());
- }
-
- protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
- channel.addShutdownListener(reason -> {
- if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
- emitter.complete();
- }
- });
- }
-
- public void close() {
- if (hasConnection.getAndSet(false)) {
- try {
- // FIXME use timeout on block (should be a parameter of the Receiver)
- connectionMono.block().close();
- } catch (IOException e) {
- throw new RabbitFluxException(e);
- }
- }
- if (privateConnectionSubscriptionScheduler) {
- this.connectionSubscriptionScheduler.dispose();
- }
- }
-}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 62a2f1b..54d8b03 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.UUID;
+import org.apache.james.backends.rabbitmq.Constants;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
@@ -70,7 +71,8 @@ public class RabbitMQWorkQueue implements WorkQueue {
private final TaskManagerWorker worker;
private final ReactorRabbitMQChannelPool channelPool;
private final JsonTaskSerializer taskSerializer;
- private RabbitMQExclusiveConsumer receiver;
+ private Sender sender;
+ private Receiver receiver;
private UnicastProcessor<TaskId> sendCancelRequestsQueue;
private Disposable sendCancelRequestsQueueHandle;
private Disposable receiverHandle;
@@ -92,15 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
private void startWorkqueue() {
channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
- channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+ channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
consumeWorkqueue();
}
private void consumeWorkqueue() {
- receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
- receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+ receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
+ receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
.subscribeOn(Schedulers.boundedElastic())
.flatMap(this::executeTask)
.subscribe();
@@ -195,10 +197,9 @@ public class RabbitMQWorkQueue implements WorkQueue {
@Override
public void close() {
Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
- Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
+ Optional.ofNullable(receiver).ifPresent(Receiver::close);
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
- Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org