You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/22 02:51:04 UTC
[james-project] 07/07: task/task-distributed - fixing NullPointerException when executeTask
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ed7660c8b12cc644933cce21bef55d60489a4748
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Thu Jul 28 00:44:55 2022 +0700
task/task-distributed - fixing NullPointerException when executeTask
---
.../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 7 +++++--
.../eventsourcing/distributed/DistributedTaskManagerTest.java | 8 +++++++-
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 60c9a3b5ae..1b3efd113f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -151,8 +151,11 @@ public class RabbitMQWorkQueue implements WorkQueue {
}
private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
- return Mono.fromCallable(() -> TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()))
- .flatMap(taskId -> deserialize(new String(delivery.getBody(), StandardCharsets.UTF_8), taskId)
+ return Mono.fromCallable(() -> delivery.getProperties().getHeaders())
+ .map(headers -> headers.get(TASK_ID))
+ .map(taskIdValue -> TaskId.fromString(taskIdValue.toString()))
+ .flatMap(taskId -> Mono.fromCallable(() -> new String(delivery.getBody(), StandardCharsets.UTF_8))
+ .flatMap(bodyValue -> deserialize(bodyValue, taskId))
.doOnNext(task -> delivery.ack())
.flatMap(task -> executeOnWorker(taskId, task)))
.onErrorResume(error -> {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index de2a3caf4d..d220a5f451 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventSourcingSystem;
@@ -189,6 +190,8 @@ class DistributedTaskManagerTest implements TaskManagerContract {
@BeforeEach
void setUp(EventStore eventStore) {
+ memoryReferenceTaskStore = new MemoryReferenceTaskStore();
+ memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
CassandraTaskExecutionDetailsProjectionDAO projectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
this.executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(projectionDAO);
@@ -203,9 +206,12 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@AfterEach
- void tearDown() {
+ void tearDown() throws Exception {
terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
workQueueSupplier.stopWorkQueues();
+ RabbitMQManagementAPI managementAPI = rabbitMQExtension.managementAPI();
+ managementAPI.listQueues()
+ .forEach(queue -> managementAPI.deleteQueue("/", queue.getName()));
}
public EventSourcingTaskManager taskManager() {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org