You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2022/09/21 19:21:58 UTC
[james-project] 01/04: james-3805 fix failing test
This is an automated email from the ASF dual-hosted git repository.
jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 14cdaadde7f764e5a3e47677a333b4e8890337b1
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 6 22:48:47 2022 +0200
james-3805 fix failing test
Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
.../backends/pulsar/DockerPulsarExtension.java | 6 ++-
.../apache/james/queue/api/MailQueueContract.java | 6 ++-
.../james/queue/pulsar/PulsarMailQueue.scala | 45 ++++++++++------------
.../pulsar/PulsarMailQueueConfiguration.scala | 12 ++++++
.../queue/pulsar/PulsarMailQueueFactory.scala | 15 ++++----
.../james/queue/pulsar/PulsarMailQueueTest.java | 25 ++++++------
6 files changed, 60 insertions(+), 49 deletions(-)
diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
index 138cb14150..957de0c7ed 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
@@ -65,8 +65,10 @@ public class DockerPulsarExtension implements
}
PulsarConfiguration pulsarConfiguration() {
- return new PulsarConfiguration(container.getPulsarBrokerUrl(),
- container.getHttpServiceUrl(), new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
+ return new PulsarConfiguration(
+ container.getPulsarBrokerUrl(),
+ container.getHttpServiceUrl(),
+ new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
}
public PulsarConfiguration getConfiguration() {
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index e237d5ea3d..dc32a86428 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -67,6 +67,10 @@ public interface MailQueueContract {
MailQueue getMailQueue();
+ default int getMailQueueMaxConcurrency() {
+ return Integer.MAX_VALUE;
+ }
+
default void enQueue(Mail mail) throws MailQueue.MailQueueException {
getMailQueue().enQueue(mail);
}
@@ -571,7 +575,7 @@ public interface MailQueueContract {
Flux.range(0, nbMails)
.flatMap(Throwing.function(i -> testee.enqueueReactive(defaultMail()
.name("name" + i)
- .build())))
+ .build())), getMailQueueMaxConcurrency())
.blockLast();
ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>();
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index c332149da3..3215c727cb 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.math.Ordered.orderingToOrdered
-import org.apache.james.backends.pulsar.{PulsarConfiguration, PulsarReader}
+import org.apache.james.backends.pulsar.PulsarReader
import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store}
import org.apache.james.blob.mail.MimeMessagePartsId
import org.apache.james.core.{MailAddress, MaybeSender}
@@ -85,8 +85,7 @@ private[pulsar] object schemas {
* A filter cannot remove messages that are enqueued after the call to the `remove` method.
*/
class PulsarMailQueue(
- name: MailQueueName,
- config: PulsarConfiguration,
+ config: PulsarMailQueueConfiguration,
blobIdFactory: BlobId.Factory,
mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -100,29 +99,27 @@ class PulsarMailQueue(
type MessageAsJson = String
- private val enqueueBufferSize = 10
- private val requeueBufferSize = 10
private val awaitTimeout = 10.seconds
- gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + name, () => getSize)
- private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString)
- private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString)
+ gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
+ private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + config.name.asString)
+ private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + config.name.asString)
private implicit val implicitSystem: ActorSystem = system
private implicit val ec: ExecutionContextExecutor = system.dispatcher
private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
- private implicit val client: PulsarAsyncClient = PulsarClient(config.brokerUri)
+ private implicit val client: PulsarAsyncClient = PulsarClient(config.pulsar.brokerUri)
private val admin = {
val builder = PulsarAdmin.builder()
- builder.serviceHttpUrl(config.adminUri).build()
+ builder.serviceHttpUrl(config.pulsar.adminUri).build()
}
- private val outTopic = Topic(s"persistent://${config.namespace.asString}/James-${name.asString()}")
- private val scheduledTopic = Topic(s"persistent://${config.namespace.asString}/${name.asString()}-scheduled")
- private val filterTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-${name.asString()}")
- private val filterScheduledTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-scheduled-${name.asString()}")
- private val subscription = Subscription("subscription-" + name.asString())
- private val scheduledSubscription = Subscription("scheduled-subscription-" + name.asString())
+ private val outTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}")
+ private val scheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled")
+ private val filterTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-${config.name.asString()}")
+ private val filterScheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-scheduled-${config.name.asString()}")
+ private val subscription = Subscription("subscription-" + config.name.asString())
+ private val scheduledSubscription = Subscription("scheduled-subscription-" + config.name.asString())
private val outTopicProducer = client.producer(ProducerConfig(outTopic, enableBatching = Some(false)))
private val scheduledTopicProducer = client.producer(ProducerConfig(scheduledTopic, enableBatching = Some(false)))
@@ -178,7 +175,7 @@ class PulsarMailQueue(
*/
private val enqueueFlow: RunnableGraph[SourceQueueWithComplete[(Mail, Duration, Promise[Done])]] =
Source
- .queue[(Mail, Duration, Promise[Done])](enqueueBufferSize, OverflowStrategy.backpressure)
+ .queue[(Mail, Duration, Promise[Done])](config.enqueueBufferSize, OverflowStrategy.backpressure, config.maxEnqueueConcurrency)
.flatMapConcat(saveMail.tupled)
.via(buildProducerMessage)
.wireTap(_ => enqueueMetric.increment())
@@ -190,7 +187,7 @@ class PulsarMailQueue(
* Scheduled messages go through this source when delay expires
*/
private val requeueFlow: RunnableGraph[SourceQueueWithComplete[ProducerMessage[MessageAsJson]]] = Source
- .queue[ProducerMessage[MessageAsJson]](requeueBufferSize, OverflowStrategy.backpressure)
+ .queue[ProducerMessage[MessageAsJson]](config.requeueBufferSize, OverflowStrategy.backpressure)
.via(debugLogger("requeue"))
.to(sinkOf(outTopicProducer))
@@ -230,7 +227,7 @@ class PulsarMailQueue(
streams.committableSource(consumer)
.via(filteringFlow(filterStage))
.map { case (mail, partsId, message) => new PulsarMailQueueItem(mail, partsId, message) }
- .map(mailQueueItemDecoratorFactory.decorate(_, name))
+ .map(mailQueueItemDecoratorFactory.decorate(_, config.name))
.alsoTo(counter)
// akka streams virtual publisher handles a subscription timeout to the
// exposed publisher which will terminate the stream if the timeout is not
@@ -294,13 +291,13 @@ class PulsarMailQueue(
private val filtersCommandFlowControl: Control =
filtersCommandFlow(
filterTopic,
- Subscription("filter-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+ Subscription("filter-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
filterStage
).run()
private val scheduledFiltersCommandFlowControl: Control =
filtersCommandFlow(
filterScheduledTopic,
- Subscription("filter-scheduled-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+ Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
filterScheduledStage
).run()
@@ -345,7 +342,7 @@ class PulsarMailQueue(
/**
* @inheritdoc
*/
- override val getName: MailQueueName = name
+ override val getName: MailQueueName = config.name
/**
* @inheritdoc
@@ -359,7 +356,7 @@ class PulsarMailQueue(
private def syncEnqueue(mail: Mail, delay: Duration): Unit = {
metricFactory.decorateSupplierWithTimerMetric(
- ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+ ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
() => Await.result(internalEnqueue(mail, delay), awaitTimeout)
)
}
@@ -369,7 +366,7 @@ class PulsarMailQueue(
*/
override def enqueueReactive(mail: Mail): Publisher[Void] = {
metricFactory.decoratePublisherWithTimerMetric(
- ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+ ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
Source.lazyFuture(() => internalEnqueue(mail, Duration.Undefined)).runWith(Sink.asPublisher[Void](fanout = true))
)
}
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
new file mode 100644
index 0000000000..06891ae685
--- /dev/null
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
@@ -0,0 +1,12 @@
+package org.apache.james.queue.pulsar
+
+import org.apache.james.backends.pulsar.PulsarConfiguration
+import org.apache.james.queue.api.MailQueueName
+
+case class PulsarMailQueueConfiguration(
+ name: MailQueueName,
+ pulsar: PulsarConfiguration,
+ maxEnqueueConcurrency: Int = 10,
+ enqueueBufferSize: Int = 10,
+ requeueBufferSize: Int = 10
+)
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
index 96fe42e6b2..fda7581ac2 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
@@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.Try
-class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
+class PulsarMailQueueFactory @Inject()(pulsarConfiguration: PulsarConfiguration,
blobIdFactory: BlobId.Factory,
mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -46,7 +46,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
) extends MailQueueFactory[PulsarMailQueue] {
private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = new AtomicReference(Map.empty)
private val admin =
- PulsarAdmin.builder().serviceHttpUrl(config.adminUri).build()
+ PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build()
private val system: ActorSystem = ActorSystem("pulsar-mailqueue")
@@ -60,7 +60,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
}
override def getQueue(name: MailQueueName, count: MailQueueFactory.PrefetchCount): Optional[PulsarMailQueue] = {
- Try(admin.topics().getInternalInfo(s"persistent://${config.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
+ Try(admin.topics().getInternalInfo(s"persistent://${pulsarConfiguration.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
createQueue(name, count)
).toJava
}
@@ -69,8 +69,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
queues.updateAndGet(map => {
val queue = map.get(name)
.fold(new PulsarMailQueue(
- name,
- config,
+ PulsarMailQueueConfiguration(name, pulsarConfiguration),
blobIdFactory,
mimeMessageStore,
mailQueueItemDecoratorFactory,
@@ -84,10 +83,10 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
override def listCreatedMailQueues(): util.Set[MailQueueName] =
admin.topics()
- .getList(config.namespace.asString)
+ .getList(pulsarConfiguration.namespace.asString)
.asScala
- .filter(_.startsWith(s"persistent://${config.namespace.asString}/James-"))
- .map(_.replace(s"persistent://${config.namespace.asString}/James-", ""))
+ .filter(_.startsWith(s"persistent://${pulsarConfiguration.namespace.asString}/James-"))
+ .map(_.replace(s"persistent://${pulsarConfiguration.namespace.asString}/James-", ""))
.map(MailQueueName.of)
.toSet
.asJava
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index dfdb5ba67d..36d118b09f 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -73,6 +73,7 @@ import scala.jdk.javaapi.OptionConverters;
@ExtendWith(DockerPulsarExtension.class)
public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract {
+ int MAX_CONCURRENCY = 10;
PulsarMailQueue mailQueue;
private HashBlobId.Factory blobIdFactory;
@@ -80,7 +81,7 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
private MailQueueItemDecoratorFactory factory;
private MailQueueName mailQueueName;
private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
- private PulsarConfiguration config;
+ private PulsarConfiguration pulsarConfiguration;
private ActorSystem system;
private MemoryBlobStoreDAO memoryBlobStore;
@@ -119,16 +120,22 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
return mailQueue;
}
+ @Override
+ public int getMailQueueMaxConcurrency() {
+ return MAX_CONCURRENCY;
+ }
+
@Override
public ManageableMailQueue getManageableMailQueue() {
return mailQueue;
}
public PulsarMailQueue newInstance(DockerPulsarExtension.DockerPulsar pulsar) {
- config = pulsar.getConfiguration();
+ pulsarConfiguration = pulsar.getConfiguration();
+ int enqueueBufferSize = 10;
+ int requeueBufferSize = 10;
return new PulsarMailQueue(
- mailQueueName,
- config,
+ new PulsarMailQueueConfiguration(mailQueueName, pulsarConfiguration, MAX_CONCURRENCY, enqueueBufferSize, requeueBufferSize),
blobIdFactory,
mimeMessageStore,
factory,
@@ -175,16 +182,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
assertThat(deadletterMessage).contains("BAD");
}
- @Test
- // JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable
- // java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
- // at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
- @Tag(Unstable.TAG)
- @Override
- public void dequeueShouldBeConcurrent() {
- MailQueueMetricContract.super.dequeueShouldBeConcurrent();
- }
-
@Test
// JAMES-3808 PulsarMailQueueTest::clearShouldNotFailWhenBrowsingIterating is unstable
// org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org