You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/17 02:24:19 UTC
[james-project] 05/31: JAMES-3305 Task manager deserialization
error handling
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aebfaa90b906e656641f9348844187aa79e44d00
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Tue Jul 14 14:23:43 2020 +0700
JAMES-3305 Task manager deserialization error handling
---
.../distributed/RabbitMQWorkQueue.java | 19 ++--
.../distributed/DistributedTaskManagerTest.java | 122 ++++++++++++++++++++-
.../distributed/RabbitMQWorkQueueTest.java | 5 +-
3 files changed, 133 insertions(+), 13 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index f1daf1f..6f5e22e 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -21,6 +21,7 @@
package org.apache.james.task.eventsourcing.distributed;
import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -129,13 +130,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
}
private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
- TaskId taskId = TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString());
- return Mono.fromCallable(() -> {
- delivery.ack();
- return new String(delivery.getBody(), StandardCharsets.UTF_8);
- }).flatMap(json ->
- deserialize(json, taskId)
- .flatMap(task -> executeOnWorker(taskId, task)));
+ return Mono.fromCallable(() -> TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()))
+ .flatMap(taskId -> deserialize(new String(delivery.getBody(), StandardCharsets.UTF_8), taskId)
+ .doOnNext(task -> delivery.ack())
+ .flatMap(task -> executeOnWorker(taskId, task)))
+ .onErrorResume(error -> {
+ LOGGER.error("Unable to process {} {}", TASK_ID, delivery.getProperties().getHeaders().get(TASK_ID), error);
+ delivery.nack(!REQUEUE);
+ return Mono.empty();
+ });
}
private Mono<Task> deserialize(String json, TaskId taskId) {
@@ -225,4 +228,4 @@ public class RabbitMQWorkQueue implements WorkQueue {
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
}
-}
+}
\ No newline at end of file
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index db84283..37e4b7d 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -19,17 +19,27 @@
package org.apache.james.task.eventsourcing.distributed;
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.ROUTING_KEY;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_SECONDS;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.Scenario;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -56,6 +66,8 @@ import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.FailedTask;
+import org.apache.james.task.FailsDeserializationTask;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
@@ -63,6 +75,7 @@ import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManager;
import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.TaskWithId;
import org.apache.james.task.WorkQueue;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
@@ -78,9 +91,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import com.rabbitmq.client.AMQP;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
class DistributedTaskManagerTest implements TaskManagerContract {
@@ -114,6 +131,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
static final DTOConverter<TaskExecutionDetails.AdditionalInformation, AdditionalInformationDTO> TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER = DTOConverter.of(ADDITIONAL_INFORMATION_MODULE);
static final Hostname HOSTNAME = new Hostname("foo");
static final Hostname HOSTNAME_2 = new Hostname("bar");
+ static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+ static final Task TASK = new CompletedTask();
+ static final TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
@RegisterExtension
static final RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
@@ -132,6 +152,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules =
ImmutableSet.of(
+ TestTaskDTOModules.FAILS_DESERIALIZATION_TASK_MODULE,
TestTaskDTOModules.COMPLETED_TASK_MODULE,
TestTaskDTOModules.FAILED_TASK_MODULE,
TestTaskDTOModules.THROWING_TASK_MODULE,
@@ -337,7 +358,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
+ void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) {
try (EventSourcingTaskManager taskManager1 = taskManager();
EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2);
@@ -364,6 +385,105 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
}
+ @Test
+ void shouldNotCrashWhenBadMessage() {
+ TaskManager taskManager = taskManager(HOSTNAME);
+
+ taskManager.submit(new FailsDeserializationTask());
+
+ TaskId id = taskManager.submit(TASK);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager);
+ }
+
+ @Test
+ void shouldNotCrashWhenBadMessages() {
+ TaskManager taskManager = taskManager(HOSTNAME);
+
+ IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask()));
+
+ TaskId id = taskManager.submit(TASK);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager);
+ }
+
+ @Test
+ void shouldNotCrashWhenInvalidHeader() throws Exception {
+ TaskManager taskManager = taskManager(HOSTNAME);
+
+ AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .headers(ImmutableMap.of("abc", TASK_WITH_ID.getId().asString()))
+ .build();
+
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+ ROUTING_KEY, badProperties, taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8))))
+ .block();
+
+ TaskId taskId = taskManager.submit(TASK);
+
+ await().atMost(FIVE_SECONDS).until(() -> taskManager.list(TaskManager.Status.COMPLETED).size() == 1);
+
+ assertThat(taskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.COMPLETED);
+ }
+
+ @Test
+ void shouldNotCrashWhenInvalidTaskId() throws Exception {
+ TaskManager taskManager = taskManager(HOSTNAME);
+
+ AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .headers(ImmutableMap.of("taskId", "BAD_ID"))
+ .build();
+
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+ ROUTING_KEY, badProperties, taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8))))
+ .block();
+
+ TaskId taskId = taskManager.submit(TASK);
+
+ await().atMost(FIVE_SECONDS).until(() -> taskManager.list(TaskManager.Status.COMPLETED).size() == 1);
+
+ assertThat(taskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.COMPLETED);
+ }
+
+ @Test
+ void shouldNotCrashWhenErrorHandlingFails(CassandraCluster cassandra) throws Exception {
+ TaskManager taskManager = taskManager(HOSTNAME);
+
+ cassandra.getConf().printStatements();
+ cassandra.getConf().registerScenario(Scenario.combine(
+ executeNormally()
+ .times(2) // submit + inProgress
+ .whenQueryStartsWith("INSERT INTO eventStore"),
+ executeNormally()
+ .times(2) // submit + inProgress
+ .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection"),
+ fail()
+ .forever()
+ .whenQueryStartsWith("INSERT INTO eventStore"),
+ fail()
+ .forever()
+ .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection")));
+ taskManager.submit(new FailedTask());
+
+ Thread.sleep(1000);
+
+ cassandra.getConf().registerScenario(Scenario.NOTHING);
+
+ TaskId id2 = taskManager.submit(new CompletedTask());
+
+ awaitUntilTaskHasStatus(id2, TaskManager.Status.COMPLETED, taskManager);
+ }
+
private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager, Hostname node) {
return hostNameByTaskManager
.values()
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 4eb0323..5832863 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
import static org.awaitility.Duration.TWO_SECONDS;
-import static org.mockito.Mockito.spy;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@@ -67,7 +66,7 @@ class RabbitMQWorkQueueTest {
@BeforeEach
void setUp() {
- worker = spy(new ImmediateWorker());
+ worker = new ImmediateWorker();
serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
testee.start();
@@ -160,7 +159,5 @@ class RabbitMQWorkQueueTest {
assertThatThrownBy(() -> await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class);
assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).doesNotThrowAnyException();
-
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org