You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/28 09:39:18 UTC

[james-project] branch master updated: [JAMES-3696] makes PulsarMailQueue tests more stable

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 236984344f [JAMES-3696] makes PulsarMailQueue tests more stable
236984344f is described below

commit 236984344f7a71aa829260f8fd12b3dda29fe9a8
Author: Jean Helou <jh...@codamens.fr>
AuthorDate: Mon Sep 26 13:02:25 2022 +0200

    [JAMES-3696] makes PulsarMailQueue tests more stable
    
    Previous to this patch, filter registration was going through a pulsar delivery even for non distributed james. This meant tests that verify the filtering behaviour had to wait for the message to be consumed back from the queue.
    
    This lead us to introduce an artificial wait to ensure that the filter was correctly registered. Unfortunately, the delay was not always enough, especially on the CI which lead to flaky tests.
    
    I propose to add a small optmization which registers the filter with the receiving instance synchronously before publishing it to pulsar.
    This way, the wait time to filter  registration can be much smaller.
    
    Because the filter stage uses a set, adding the same filter multiple times should have a negligible impact on performances.
---
 .../scala/org/apache/james/queue/pulsar/FilterStage.scala  |  3 +++
 .../org/apache/james/queue/pulsar/PulsarMailQueue.scala    | 14 ++++++++++++++
 .../org/apache/james/queue/pulsar/PulsarMailQueueTest.java |  7 +------
 3 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/FilterStage.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/FilterStage.scala
index ce50e03738..07ac094850 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/FilterStage.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/FilterStage.scala
@@ -27,6 +27,9 @@ import org.apache.james.blob.api.BlobId
 import scala.math.Ordered.orderingToOrdered
 
 private[pulsar] class FilterStage(implicit val blobIdFactory:BlobId.Factory) extends Actor with ActorLogging {
+  // PulsarMailQueue#publishFilter relies on the stage being able to
+  // deduplicate filters. The deduplication capability comes from this being
+  // a Set and Filters being value objects.
   private var filters = Set.empty[Filter]
   private val name = self.path.name
 
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 58caee322c..bcbf67c86b 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -535,8 +535,22 @@ class PulsarMailQueue(
     }
   }
 
+  /**
+   * The publish filter implementation optimizes for the local/single instance case.
+   *
+   * This is reliant on the FilterStage implementation being able to deduplicate
+   * filters. The current implementation defined filters as value objects and stores
+   * them in a Set which will effectively dedpulicate them.
+   * @see org.apache.james.queue.pulsar.FilterStage.filters
+   * @param producer
+   * @param filter
+   */
   private def publishFilter(producer:Producer[String])(filter:Filter): Unit ={
     import Filter._
+    // Optimizes for the local/single instance case, the duplicated filter
+    // received through pulsar will be eliminated by the filter stage as
+    // filters are stored in a set @see org.apache.james.queue.pulsar.FilterStage.filters
+    filterStage ! filter
     producer.send(Json.stringify(Json.toJson(filter)))
   }
 
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 3743817c5b..a52ebb2be6 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.pulsar;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -34,14 +33,12 @@ import javax.mail.internet.MimeMessage;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.james.backends.pulsar.DockerPulsarExtension;
 import org.apache.james.backends.pulsar.PulsarConfiguration;
-import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
-import org.apache.james.junit.categories.Unstable;
 import org.apache.james.queue.api.DelayedMailQueueContract;
 import org.apache.james.queue.api.DelayedManageableMailQueueContract;
 import org.apache.james.queue.api.MailQueue;
@@ -60,10 +57,8 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.sksamuel.pulsar4s.ConsumerMessage;
@@ -112,7 +107,7 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
     @Override
     public void awaitRemove() {
         try {
-            Thread.sleep(100);
+            Thread.sleep(50);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org