You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/22 02:51:04 UTC

[james-project] 07/07: task/task-distributed - fixing NullPointerException when executeTask

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ed7660c8b12cc644933cce21bef55d60489a4748
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Thu Jul 28 00:44:55 2022 +0700

    task/task-distributed - fixing NullPointerException when executeTask
---
 .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java   | 7 +++++--
 .../eventsourcing/distributed/DistributedTaskManagerTest.java     | 8 +++++++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 60c9a3b5ae..1b3efd113f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -151,8 +151,11 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
-        return Mono.fromCallable(() -> TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()))
-            .flatMap(taskId -> deserialize(new String(delivery.getBody(), StandardCharsets.UTF_8), taskId)
+        return Mono.fromCallable(() -> delivery.getProperties().getHeaders())
+            .map(headers -> headers.get(TASK_ID))
+            .map(taskIdValue -> TaskId.fromString(taskIdValue.toString()))
+            .flatMap(taskId -> Mono.fromCallable(() -> new String(delivery.getBody(), StandardCharsets.UTF_8))
+                .flatMap(bodyValue -> deserialize(bodyValue, taskId))
                 .doOnNext(task -> delivery.ack())
                 .flatMap(task -> executeOnWorker(taskId, task)))
             .onErrorResume(error -> {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index de2a3caf4d..d220a5f451 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventSourcingSystem;
@@ -189,6 +190,8 @@ class DistributedTaskManagerTest implements TaskManagerContract {
 
     @BeforeEach
     void setUp(EventStore eventStore) {
+        memoryReferenceTaskStore = new MemoryReferenceTaskStore();
+        memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
         CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
         CassandraTaskExecutionDetailsProjectionDAO projectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
         this.executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(projectionDAO);
@@ -203,9 +206,12 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws Exception {
         terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
         workQueueSupplier.stopWorkQueues();
+        RabbitMQManagementAPI managementAPI = rabbitMQExtension.managementAPI();
+        managementAPI.listQueues()
+            .forEach(queue -> managementAPI.deleteQueue("/", queue.getName()));
     }
 
     public EventSourcingTaskManager taskManager() {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org