You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2022/09/21 19:21:59 UTC
[james-project] 02/04: james-3805 Fix unstable tests
This is an automated email from the ASF dual-hosted git repository.
jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a58e3fced0a9205bd63fd4235c09a54428b28e9a
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 13 22:06:46 2022 +0200
james-3805 Fix unstable tests
Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
.../james/queue/pulsar/PulsarMailQueue.scala | 24 ++++++++++++++--------
.../james/queue/pulsar/PulsarMailQueueTest.java | 14 ++++++++-----
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 3215c727cb..58caee322c 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -22,10 +22,8 @@ package org.apache.james.queue.pulsar
import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
import java.util.concurrent.TimeUnit
import java.util.{Date, UUID}
-
import javax.mail.MessagingException
import javax.mail.internet.MimeMessage
-
import scala.concurrent._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -46,7 +44,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, SubscriptionType}
import org.reactivestreams.Publisher
-
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, SourceQueueWithComplete, StreamConverters}
import akka.stream.{Attributes, OverflowStrategy}
@@ -55,9 +52,11 @@ import akka.{Done, NotUsed}
import com.sksamuel.pulsar4s._
import com.sksamuel.pulsar4s.akka.streams
import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
-
+import org.slf4j.LoggerFactory
import play.api.libs.json._
+import scala.util.Failure
+
private[pulsar] object serializers {
implicit val headerFormat: Format[Header] = Json.format[Header]
implicit val enqueueIdFormat: Format[EnqueueId] = new Format[EnqueueId] {
@@ -99,6 +98,8 @@ class PulsarMailQueue(
type MessageAsJson = String
+ private val logger = LoggerFactory.getLogger(this.getClass)
+
private val awaitTimeout = 10.seconds
gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
@@ -250,9 +251,11 @@ class PulsarMailQueue(
).ask[(Option[MailMetadata], Option[MimeMessagePartsId], CommittableMessage[String])](filterActor)
.flatMapConcat {
case (None, Some(partsId), committableMessage) =>
- committableMessage.ack()
- deleteMimeMessage(partsId)
- .flatMapConcat(_ => Source.empty)
+ Source.lazyFuture(() => committableMessage.ack())
+ .flatMapConcat(_ =>
+ deleteMimeMessage(partsId)
+ .flatMapConcat(_ => Source.empty)
+ )
case (Some(metadata), _, committableMessage) =>
val partsId = metadata.partsId
Source
@@ -270,7 +273,12 @@ class PulsarMailQueue(
if (success) {
dequeueMetrics.increment()
Await.ready(message.ack(cumulative = false), awaitTimeout)
- deleteMimeMessage(partsId).run()
+ val eventualDone = deleteMimeMessage(partsId).run()
+
+ eventualDone.onComplete {
+ case Failure(e) => logger.error("Failed to delete parts {} for mail {}", partsId, mail.getName(), e)
+ case _ => logger.trace("Deleted parts {} for mail {}", partsId, mail.getName())
+ }
} else {
Await.ready(message.nack(), awaitTimeout)
}
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 36d118b09f..8bbd2524b3 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.pulsar;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -33,6 +34,7 @@ import javax.mail.internet.MimeMessage;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.james.backends.pulsar.DockerPulsarExtension;
import org.apache.james.backends.pulsar.PulsarConfiguration;
+import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.Store;
@@ -61,6 +63,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
import com.sksamuel.pulsar4s.ConsumerMessage;
@@ -257,8 +260,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
}
- @Disabled("JAMES-3805 PulsarMailQueueTest::removeShouldRemoveMailFromStoreWhenFilteredOut is unstable")
- // https://ci-builds.apache.org/job/james/job/ApacheJames/job/PR-1109/12/testReport/junit/org.apache.james.queue.pulsar/PulsarMailQueueTest/removeShouldRemoveMailFromStoreWhenFilteredOut/
@Test
void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
enQueue(defaultMail()
@@ -267,7 +268,11 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
enQueue(defaultMail()
.name("name2")
.build());
+ enQueue(defaultMail()
+ .name("name3")
+ .build());
+ //this won't delete the mail from the store until we try a dequeue
getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
awaitRemove();
@@ -276,10 +281,9 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
- .containsExactly("name1");
+ .containsExactly("name1", "name3");
- MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
- mailQueueItem.done(true);
+ Flux.from(getMailQueue().deQueue()).take(2).doOnNext(Throwing.consumer(x -> x.done(true))).blockLast();
Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org