You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2022/09/21 19:21:58 UTC

[james-project] 01/04: james-3805 fix failing test

This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 14cdaadde7f764e5a3e47677a333b4e8890337b1
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 6 22:48:47 2022 +0200

    james-3805 fix failing test
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 .../backends/pulsar/DockerPulsarExtension.java     |  6 ++-
 .../apache/james/queue/api/MailQueueContract.java  |  6 ++-
 .../james/queue/pulsar/PulsarMailQueue.scala       | 45 ++++++++++------------
 .../pulsar/PulsarMailQueueConfiguration.scala      | 12 ++++++
 .../queue/pulsar/PulsarMailQueueFactory.scala      | 15 ++++----
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 25 ++++++------
 6 files changed, 60 insertions(+), 49 deletions(-)

diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
index 138cb14150..957de0c7ed 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
@@ -65,8 +65,10 @@ public class DockerPulsarExtension implements
     }
 
     PulsarConfiguration pulsarConfiguration() {
-        return new PulsarConfiguration(container.getPulsarBrokerUrl(),
-                container.getHttpServiceUrl(), new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
+        return new PulsarConfiguration(
+            container.getPulsarBrokerUrl(),
+            container.getHttpServiceUrl(),
+            new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
     }
 
     public PulsarConfiguration getConfiguration() {
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index e237d5ea3d..dc32a86428 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -67,6 +67,10 @@ public interface MailQueueContract {
 
     MailQueue getMailQueue();
 
+    default int getMailQueueMaxConcurrency() {
+        return Integer.MAX_VALUE;
+    }
+
     default void enQueue(Mail mail) throws MailQueue.MailQueueException {
         getMailQueue().enQueue(mail);
     }
@@ -571,7 +575,7 @@ public interface MailQueueContract {
         Flux.range(0, nbMails)
             .flatMap(Throwing.function(i -> testee.enqueueReactive(defaultMail()
                 .name("name" + i)
-                .build())))
+                .build())), getMailQueueMaxConcurrency())
             .blockLast();
 
         ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>();
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index c332149da3..3215c727cb 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
 import scala.jdk.DurationConverters._
 import scala.math.Ordered.orderingToOrdered
 
-import org.apache.james.backends.pulsar.{PulsarConfiguration, PulsarReader}
+import org.apache.james.backends.pulsar.PulsarReader
 import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
 import org.apache.james.core.{MailAddress, MaybeSender}
@@ -85,8 +85,7 @@ private[pulsar] object schemas {
  * A filter cannot remove messages that are enqueued after the call to the `remove` method.
  */
 class PulsarMailQueue(
-  name: MailQueueName,
-  config: PulsarConfiguration,
+  config: PulsarMailQueueConfiguration,
   blobIdFactory: BlobId.Factory,
   mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
   mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -100,29 +99,27 @@ class PulsarMailQueue(
 
   type MessageAsJson = String
 
-  private val enqueueBufferSize = 10
-  private val requeueBufferSize = 10
   private val awaitTimeout = 10.seconds
 
-  gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + name, () => getSize)
-  private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString)
-  private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString)
+  gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
+  private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + config.name.asString)
+  private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + config.name.asString)
 
   private implicit val implicitSystem: ActorSystem = system
   private implicit val ec: ExecutionContextExecutor = system.dispatcher
   private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
-  private implicit val client: PulsarAsyncClient = PulsarClient(config.brokerUri)
+  private implicit val client: PulsarAsyncClient = PulsarClient(config.pulsar.brokerUri)
   private val admin = {
     val builder = PulsarAdmin.builder()
-    builder.serviceHttpUrl(config.adminUri).build()
+    builder.serviceHttpUrl(config.pulsar.adminUri).build()
   }
 
-  private val outTopic = Topic(s"persistent://${config.namespace.asString}/James-${name.asString()}")
-  private val scheduledTopic = Topic(s"persistent://${config.namespace.asString}/${name.asString()}-scheduled")
-  private val filterTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-${name.asString()}")
-  private val filterScheduledTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-scheduled-${name.asString()}")
-  private val subscription = Subscription("subscription-" + name.asString())
-  private val scheduledSubscription = Subscription("scheduled-subscription-" + name.asString())
+  private val outTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}")
+  private val scheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled")
+  private val filterTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-${config.name.asString()}")
+  private val filterScheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-scheduled-${config.name.asString()}")
+  private val subscription = Subscription("subscription-" + config.name.asString())
+  private val scheduledSubscription = Subscription("scheduled-subscription-" + config.name.asString())
 
   private val outTopicProducer = client.producer(ProducerConfig(outTopic, enableBatching = Some(false)))
   private val scheduledTopicProducer = client.producer(ProducerConfig(scheduledTopic, enableBatching = Some(false)))
@@ -178,7 +175,7 @@ class PulsarMailQueue(
    */
   private val enqueueFlow: RunnableGraph[SourceQueueWithComplete[(Mail, Duration, Promise[Done])]] =
     Source
-      .queue[(Mail, Duration, Promise[Done])](enqueueBufferSize, OverflowStrategy.backpressure)
+      .queue[(Mail, Duration, Promise[Done])](config.enqueueBufferSize, OverflowStrategy.backpressure, config.maxEnqueueConcurrency)
       .flatMapConcat(saveMail.tupled)
       .via(buildProducerMessage)
       .wireTap(_ => enqueueMetric.increment())
@@ -190,7 +187,7 @@ class PulsarMailQueue(
    * Scheduled messages go through this source when delay expires
    */
   private val requeueFlow: RunnableGraph[SourceQueueWithComplete[ProducerMessage[MessageAsJson]]] = Source
-    .queue[ProducerMessage[MessageAsJson]](requeueBufferSize, OverflowStrategy.backpressure)
+    .queue[ProducerMessage[MessageAsJson]](config.requeueBufferSize, OverflowStrategy.backpressure)
     .via(debugLogger("requeue"))
     .to(sinkOf(outTopicProducer))
 
@@ -230,7 +227,7 @@ class PulsarMailQueue(
     streams.committableSource(consumer)
       .via(filteringFlow(filterStage))
       .map { case (mail, partsId, message) => new PulsarMailQueueItem(mail, partsId, message) }
-      .map(mailQueueItemDecoratorFactory.decorate(_, name))
+      .map(mailQueueItemDecoratorFactory.decorate(_, config.name))
       .alsoTo(counter)
       // akka streams virtual publisher handles a subscription timeout to the
       // exposed publisher which will terminate the stream if the timeout is not
@@ -294,13 +291,13 @@ class PulsarMailQueue(
   private val filtersCommandFlowControl: Control =
     filtersCommandFlow(
       filterTopic,
-      Subscription("filter-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+      Subscription("filter-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
       filterStage
     ).run()
   private val scheduledFiltersCommandFlowControl: Control =
     filtersCommandFlow(
       filterScheduledTopic,
-      Subscription("filter-scheduled-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+      Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
       filterScheduledStage
     ).run()
 
@@ -345,7 +342,7 @@ class PulsarMailQueue(
   /**
    * @inheritdoc
    */
-  override val getName: MailQueueName = name
+  override val getName: MailQueueName = config.name
 
   /**
    * @inheritdoc
@@ -359,7 +356,7 @@ class PulsarMailQueue(
 
   private def syncEnqueue(mail: Mail, delay: Duration): Unit = {
     metricFactory.decorateSupplierWithTimerMetric(
-      ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+      ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
       () => Await.result(internalEnqueue(mail, delay), awaitTimeout)
     )
   }
@@ -369,7 +366,7 @@ class PulsarMailQueue(
    */
   override def enqueueReactive(mail: Mail): Publisher[Void] = {
     metricFactory.decoratePublisherWithTimerMetric(
-      ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+      ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
       Source.lazyFuture(() => internalEnqueue(mail, Duration.Undefined)).runWith(Sink.asPublisher[Void](fanout = true))
     )
   }
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
new file mode 100644
index 0000000000..06891ae685
--- /dev/null
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
@@ -0,0 +1,12 @@
+package org.apache.james.queue.pulsar
+
+import org.apache.james.backends.pulsar.PulsarConfiguration
+import org.apache.james.queue.api.MailQueueName
+
+case class PulsarMailQueueConfiguration(
+  name: MailQueueName,
+  pulsar: PulsarConfiguration,
+  maxEnqueueConcurrency: Int = 10,
+  enqueueBufferSize: Int = 10,
+  requeueBufferSize: Int = 10
+)
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
index 96fe42e6b2..fda7581ac2 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
@@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 import scala.util.Try
 
-class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
+class PulsarMailQueueFactory @Inject()(pulsarConfiguration: PulsarConfiguration,
   blobIdFactory: BlobId.Factory,
   mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
   mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -46,7 +46,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
 ) extends MailQueueFactory[PulsarMailQueue] {
   private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = new AtomicReference(Map.empty)
   private val admin =
-    PulsarAdmin.builder().serviceHttpUrl(config.adminUri).build()
+    PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build()
 
   private val system: ActorSystem = ActorSystem("pulsar-mailqueue")
 
@@ -60,7 +60,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
   }
 
   override def getQueue(name: MailQueueName, count: MailQueueFactory.PrefetchCount): Optional[PulsarMailQueue] = {
-    Try(admin.topics().getInternalInfo(s"persistent://${config.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
+    Try(admin.topics().getInternalInfo(s"persistent://${pulsarConfiguration.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
       createQueue(name, count)
     ).toJava
   }
@@ -69,8 +69,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
     queues.updateAndGet(map => {
       val queue = map.get(name)
         .fold(new PulsarMailQueue(
-          name,
-          config,
+          PulsarMailQueueConfiguration(name, pulsarConfiguration),
           blobIdFactory,
           mimeMessageStore,
           mailQueueItemDecoratorFactory,
@@ -84,10 +83,10 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
 
   override def listCreatedMailQueues(): util.Set[MailQueueName] =
     admin.topics()
-      .getList(config.namespace.asString)
+      .getList(pulsarConfiguration.namespace.asString)
       .asScala
-      .filter(_.startsWith(s"persistent://${config.namespace.asString}/James-"))
-      .map(_.replace(s"persistent://${config.namespace.asString}/James-", ""))
+      .filter(_.startsWith(s"persistent://${pulsarConfiguration.namespace.asString}/James-"))
+      .map(_.replace(s"persistent://${pulsarConfiguration.namespace.asString}/James-", ""))
       .map(MailQueueName.of)
       .toSet
       .asJava
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index dfdb5ba67d..36d118b09f 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -73,6 +73,7 @@ import scala.jdk.javaapi.OptionConverters;
 @ExtendWith(DockerPulsarExtension.class)
 public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract {
 
+    int MAX_CONCURRENCY = 10;
     PulsarMailQueue mailQueue;
 
     private HashBlobId.Factory blobIdFactory;
@@ -80,7 +81,7 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
     private MailQueueItemDecoratorFactory factory;
     private MailQueueName mailQueueName;
     private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
-    private PulsarConfiguration config;
+    private PulsarConfiguration pulsarConfiguration;
     private ActorSystem system;
     private MemoryBlobStoreDAO memoryBlobStore;
 
@@ -119,16 +120,22 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         return mailQueue;
     }
 
+    @Override
+    public int getMailQueueMaxConcurrency() {
+        return MAX_CONCURRENCY;
+    }
+
     @Override
     public ManageableMailQueue getManageableMailQueue() {
         return mailQueue;
     }
 
     public PulsarMailQueue newInstance(DockerPulsarExtension.DockerPulsar pulsar) {
-        config = pulsar.getConfiguration();
+        pulsarConfiguration = pulsar.getConfiguration();
+        int enqueueBufferSize = 10;
+        int requeueBufferSize = 10;
         return new PulsarMailQueue(
-                mailQueueName,
-                config,
+                new PulsarMailQueueConfiguration(mailQueueName, pulsarConfiguration, MAX_CONCURRENCY, enqueueBufferSize, requeueBufferSize),
                 blobIdFactory,
                 mimeMessageStore,
                 factory,
@@ -175,16 +182,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         assertThat(deadletterMessage).contains("BAD");
     }
 
-    @Test
-    // JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable
-    // java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
-    //    at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
-    @Tag(Unstable.TAG)
-    @Override
-    public void dequeueShouldBeConcurrent() {
-        MailQueueMetricContract.super.dequeueShouldBeConcurrent();
-    }
-
     @Test
     // JAMES-3808 PulsarMailQueueTest::clearShouldNotFailWhenBrowsingIterating is unstable
     // org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org