You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2022/09/21 19:21:59 UTC

[james-project] 02/04: james-3805 Fix unstable tests

This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a58e3fced0a9205bd63fd4235c09a54428b28e9a
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 13 22:06:46 2022 +0200

    james-3805 Fix unstable tests
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 .../james/queue/pulsar/PulsarMailQueue.scala       | 24 ++++++++++++++--------
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 14 ++++++++-----
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 3215c727cb..58caee322c 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -22,10 +22,8 @@ package org.apache.james.queue.pulsar
 import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
 import java.util.concurrent.TimeUnit
 import java.util.{Date, UUID}
-
 import javax.mail.MessagingException
 import javax.mail.internet.MimeMessage
-
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -46,7 +44,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
 import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, SubscriptionType}
 import org.reactivestreams.Publisher
-
 import akka.actor.{ActorRef, ActorSystem}
 import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, SourceQueueWithComplete, StreamConverters}
 import akka.stream.{Attributes, OverflowStrategy}
@@ -55,9 +52,11 @@ import akka.{Done, NotUsed}
 import com.sksamuel.pulsar4s._
 import com.sksamuel.pulsar4s.akka.streams
 import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
-
+import org.slf4j.LoggerFactory
 import play.api.libs.json._
 
+import scala.util.Failure
+
 private[pulsar] object serializers {
   implicit val headerFormat: Format[Header] = Json.format[Header]
   implicit val enqueueIdFormat: Format[EnqueueId] = new Format[EnqueueId] {
@@ -99,6 +98,8 @@ class PulsarMailQueue(
 
   type MessageAsJson = String
 
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
   private val awaitTimeout = 10.seconds
 
   gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
@@ -250,9 +251,11 @@ class PulsarMailQueue(
     ).ask[(Option[MailMetadata], Option[MimeMessagePartsId], CommittableMessage[String])](filterActor)
       .flatMapConcat {
         case (None, Some(partsId), committableMessage) =>
-          committableMessage.ack()
-          deleteMimeMessage(partsId)
-            .flatMapConcat(_ => Source.empty)
+          Source.lazyFuture(() => committableMessage.ack())
+            .flatMapConcat(_ =>
+              deleteMimeMessage(partsId)
+                .flatMapConcat(_ => Source.empty)
+            )
         case (Some(metadata), _, committableMessage) =>
           val partsId = metadata.partsId
           Source
@@ -270,7 +273,12 @@ class PulsarMailQueue(
       if (success) {
         dequeueMetrics.increment()
         Await.ready(message.ack(cumulative = false), awaitTimeout)
-        deleteMimeMessage(partsId).run()
+        val eventualDone = deleteMimeMessage(partsId).run()
+
+        eventualDone.onComplete {
+          case Failure(e) => logger.error("Failed to delete parts {} for mail {}", partsId, mail.getName(), e)
+          case _ => logger.trace("Deleted parts {} for mail {}", partsId, mail.getName())
+        }
       } else {
         Await.ready(message.nack(), awaitTimeout)
       }
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 36d118b09f..8bbd2524b3 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.pulsar;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -33,6 +34,7 @@ import javax.mail.internet.MimeMessage;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.james.backends.pulsar.DockerPulsarExtension;
 import org.apache.james.backends.pulsar.PulsarConfiguration;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
@@ -61,6 +63,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.sksamuel.pulsar4s.ConsumerMessage;
@@ -257,8 +260,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
     }
 
-    @Disabled("JAMES-3805 PulsarMailQueueTest::removeShouldRemoveMailFromStoreWhenFilteredOut is unstable")
-    // https://ci-builds.apache.org/job/james/job/ApacheJames/job/PR-1109/12/testReport/junit/org.apache.james.queue.pulsar/PulsarMailQueueTest/removeShouldRemoveMailFromStoreWhenFilteredOut/
     @Test
     void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
         enQueue(defaultMail()
@@ -267,7 +268,11 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         enQueue(defaultMail()
                 .name("name2")
                 .build());
+        enQueue(defaultMail()
+                .name("name3")
+                .build());
 
+        //this won't delete the mail from the store until we try a dequeue
         getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
 
         awaitRemove();
@@ -276,10 +281,9 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
                 .toIterable()
                 .extracting(ManageableMailQueue.MailQueueItemView::getMail)
                 .extracting(Mail::getName)
-                .containsExactly("name1");
+                .containsExactly("name1", "name3");
 
-        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
-        mailQueueItem.done(true);
+        Flux.from(getMailQueue().deQueue()).take(2).doOnNext(Throwing.consumer(x -> x.done(true))).blockLast();
         Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org