You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2023/01/26 22:22:46 UTC

[james-project] branch pulsar-authentication created (now 3e2c33cca3)

This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a change to branch pulsar-authentication
in repository https://gitbox.apache.org/repos/asf/james-project.git


      at 3e2c33cca3 [JAMES-3687] refactors client creation to PulsarConfiguration

This branch includes the following new commits:

     new 01e9ef7a7e [JAMES-3687] pulls pulsar-client explicitely to enforce its version
     new a2ca77601d [JAMES-3687] adds support for authentication scheme in the configuration
     new 3e2c33cca3 [JAMES-3687] refactors client creation to PulsarConfiguration

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/03: [JAMES-3687] adds support for authentication scheme in the configuration

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch pulsar-authentication
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a2ca77601da3eae981ecf7d57e4942da59ecc620
Author: Jean Helou <jh...@codamens.fr>
AuthorDate: Thu Nov 24 23:28:00 2022 +0100

    [JAMES-3687] adds support for authentication scheme in the configuration
---
 .../backends/pulsar/PulsarConfiguration.scala      |  59 ++++++++++--
 .../backends/pulsar/DockerPulsarExtension.java     |   4 +-
 .../backends/pulsar/PulsarConfigurationTest.java   | 103 ++++++++++++++++++++-
 3 files changed, 157 insertions(+), 9 deletions(-)

diff --git a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
index b3cbda2bd8..7e790f29c6 100644
--- a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
+++ b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
@@ -1,4 +1,4 @@
-/****************************************************************
+/** **************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,16 +6,16 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
+ * ************************************************************** */
 
 package org.apache.james.backends.pulsar
 
@@ -27,16 +27,45 @@ object PulsarConfiguration {
   val BROKER_URI_PROPERTY_NAME = "broker.uri"
   val ADMIN_URI_PROPERTY_NAME = "admin.uri"
   val NAMESPACE_PROPERTY_NAME = "namespace"
+  val AUTHENTICATION_TYPE_PROPERTY_NAME = "authentication.type"
+  val AUTHENTICATION_TYPE_NO_AUTH = "no-auth"
+  val AUTHENTICATION_TYPE_AUTH_TOKEN = "token"
+  val AUTHENTICATION_TYPE_AUTH_BASIC = "basic"
+  val AUTHENTICATION_TOKEN_PROPERTY_NAME = "authentication.token"
+  val AUTHENTICATION_BASIC_USERID_PROPERTY_NAME = "authentication.basic.userId"
+  val AUTHENTICATION_BASIC_PASSWORD_PROPERTY_NAME = "authentication.basic.password"
+
 
   def from(configuration: Configuration): PulsarConfiguration = {
     val brokerUri: String = extractUri(configuration, BROKER_URI_PROPERTY_NAME)
     val adminUri: String = extractUri(configuration, ADMIN_URI_PROPERTY_NAME)
+    val authTypeString: String = configuration.getString(AUTHENTICATION_TYPE_PROPERTY_NAME, AUTHENTICATION_TYPE_NO_AUTH);
+    val auth = authTypeString match {
+      case AUTHENTICATION_TYPE_NO_AUTH => Auth.NoAuth
+
+      case AUTHENTICATION_TYPE_AUTH_TOKEN =>
+        val token = configuration.getString(AUTHENTICATION_TOKEN_PROPERTY_NAME)
+        if (Strings.isNullOrEmpty(token))
+          throw new IllegalStateException(s"You need to specify a non-empty value for ${AUTHENTICATION_TOKEN_PROPERTY_NAME}")
+        Auth.Token(token)
+
+      case AUTHENTICATION_TYPE_AUTH_BASIC =>
+        val userId = configuration.getString(AUTHENTICATION_BASIC_USERID_PROPERTY_NAME)
+        if (Strings.isNullOrEmpty(userId))
+          throw new IllegalStateException(s"You need to specify a non-empty value for ${AUTHENTICATION_BASIC_USERID_PROPERTY_NAME}")
+        val password = configuration.getString(AUTHENTICATION_BASIC_PASSWORD_PROPERTY_NAME)
+        if (Strings.isNullOrEmpty(password))
+          throw new IllegalStateException(s"You need to specify a non-empty value for ${AUTHENTICATION_BASIC_PASSWORD_PROPERTY_NAME}")
+        Auth.Basic(userId, password)
 
+      case _ =>
+        throw new NotImplementedError(s"Authentication type $authTypeString is not implemented")
+    }
 
     val namespace = configuration.getString(NAMESPACE_PROPERTY_NAME)
     if (Strings.isNullOrEmpty(namespace))
       throw new IllegalStateException(s"You need to specify the pulsar namespace as ${NAMESPACE_PROPERTY_NAME}")
-    new PulsarConfiguration(brokerUri, adminUri, Namespace(namespace))
+    new PulsarConfiguration(brokerUri, adminUri, Namespace(namespace), auth)
   }
 
   private def extractUri(configuration: Configuration, uriPropertyName: String): String = {
@@ -55,4 +84,20 @@ object PulsarConfiguration {
 
 case class Namespace(asString: String)
 
-case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace)
\ No newline at end of file
+sealed trait Auth
+
+object Auth {
+  def noAuth() = NoAuth
+
+  case object NoAuth extends Auth
+
+  def token(value: String) = Token(value)
+
+  case class Token(value: String) extends Auth
+
+  def basic(userId: String, password: String) = Basic(userId, password)
+
+  case class Basic(userId: String, password: String) extends Auth
+}
+
+case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace, auth: Auth = Auth.NoAuth)
\ No newline at end of file
diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
index 12b418676f..d535ba390d 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
@@ -70,7 +70,9 @@ public class DockerPulsarExtension implements
         return new PulsarConfiguration(
                 container.getPulsarBrokerUrl(),
                 container.getHttpServiceUrl(),
-                new Namespace("test/" + RandomStringUtils.randomAlphabetic(10)));
+                new Namespace("test/" + RandomStringUtils.randomAlphabetic(10)),
+                Auth.noAuth()
+        );
     }
 
     public PulsarConfiguration getConfiguration() {
diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/PulsarConfigurationTest.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/PulsarConfigurationTest.java
index 43eb4463a5..9fedee288f 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/PulsarConfigurationTest.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/PulsarConfigurationTest.java
@@ -20,7 +20,10 @@
 package org.apache.james.backends.pulsar;
 
 import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.james.backends.pulsar.Auth.NoAuth$;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.NotImplementedException;
+import scala.NotImplementedError;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -155,8 +158,106 @@ class PulsarConfigurationTest {
 
         configuration.addProperty("namespace", namespace);
         assertThat(PulsarConfiguration.from(configuration))
-                .isEqualTo(new PulsarConfiguration(brokerUri, adminUri, new Namespace(namespace)));
+                .isEqualTo(new PulsarConfiguration(brokerUri, adminUri, new Namespace(namespace), Auth.noAuth()));
     }
+    @Test
+    void fromShouldThrowWithTokenAuthenticationWhenTokenIsMissing() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        String brokerUri = "pulsar://localhost.test:6650/";
+        String adminUri = "http://localhost:8090";
+        String authenticationType = "token";
+
+        configuration.addProperty("broker.uri", brokerUri);
+        configuration.addProperty("admin.uri", adminUri);
+        configuration.addProperty("authentication.type", authenticationType);
+
+        String namespace = "namespace";
+
+        configuration.addProperty("namespace", namespace);
+        assertThatThrownBy(() -> PulsarConfiguration.from(configuration))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("You need to specify a non-empty value for " + PulsarConfiguration.AUTHENTICATION_TOKEN_PROPERTY_NAME());
+    }
+    @Test
+    void fromShouldReturnTheConfigurationWithTokenAuthenticationWhenRequiredParametersAreGiven() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        String brokerUri = "pulsar://localhost.test:6650/";
+        String adminUri = "http://localhost:8090";
+        String authenticationType = "basic";
+        String authenticationUserId = "userId";
+        String authenticationPassword = "password";
+
+        configuration.addProperty("broker.uri", brokerUri);
+        configuration.addProperty("admin.uri", adminUri);
+        configuration.addProperty("authentication.type", authenticationType);
+        configuration.addProperty("authentication.basic.userId", authenticationUserId);
+        configuration.addProperty("authentication.basic.password", authenticationPassword);
+
+        String namespace = "namespace";
+
+        configuration.addProperty("namespace", namespace);
+        assertThat(PulsarConfiguration.from(configuration))
+                .isEqualTo(new PulsarConfiguration(brokerUri, adminUri, new Namespace(namespace), Auth.basic(authenticationUserId, authenticationPassword)));
+    }
+    @Test
+    void fromShouldThrowWithBasicAuthenticationWhenUserIdIsMissing() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        String brokerUri = "pulsar://localhost.test:6650/";
+        String adminUri = "http://localhost:8090";
+        String authenticationType = "basic";
+        String authenticationUserId = "userId";
+        String authenticationPassword = "password";
+
+        configuration.addProperty("broker.uri", brokerUri);
+        configuration.addProperty("admin.uri", adminUri);
+        configuration.addProperty("authentication.type", authenticationType);
+        configuration.addProperty("authentication.basic.password", authenticationPassword);
 
+        String namespace = "namespace";
+
+        configuration.addProperty("namespace", namespace);
+        assertThatThrownBy(() -> PulsarConfiguration.from(configuration))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("You need to specify a non-empty value for " + PulsarConfiguration.AUTHENTICATION_BASIC_USERID_PROPERTY_NAME());
+    }
+    @Test
+    void fromShouldThrowWithBasicAuthenticationWhenPasswordIsMissing() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        String brokerUri = "pulsar://localhost.test:6650/";
+        String adminUri = "http://localhost:8090";
+        String authenticationType = "basic";
+        String authenticationUserId = "userId";
+        String authenticationPassword = "password";
+
+        configuration.addProperty("broker.uri", brokerUri);
+        configuration.addProperty("admin.uri", adminUri);
+        configuration.addProperty("authentication.type", authenticationType);
+        configuration.addProperty("authentication.basic.userId", authenticationUserId);
+
+        String namespace = "namespace";
+
+        configuration.addProperty("namespace", namespace);
 
+        assertThatThrownBy(() -> PulsarConfiguration.from(configuration))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("You need to specify a non-empty value for " + PulsarConfiguration.AUTHENTICATION_BASIC_PASSWORD_PROPERTY_NAME());
+    }
+
+    @Test
+    void fromShouldThrowWithUnknownAuthenticationType() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        String brokerUri = "pulsar://localhost.test:6650/";
+        String adminUri = "http://localhost:8090";
+
+        configuration.addProperty("broker.uri", brokerUri);
+        configuration.addProperty("admin.uri", adminUri);
+        configuration.addProperty("authentication.type", "biscuit");
+
+        String namespace = "namespace";
+
+        configuration.addProperty("namespace", namespace);
+        assertThatThrownBy(() -> PulsarConfiguration.from(configuration))
+                .isInstanceOf(NotImplementedError.class)
+                .hasMessage("Authentication type biscuit is not implemented");
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/03: [JAMES-3687] pulls pulsar-client explicitely to enforce its version

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch pulsar-authentication
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 01e9ef7a7e320cc16ee4383285f688be6c7e8e2b
Author: Jean Helou <jh...@codamens.fr>
AuthorDate: Tue Nov 8 23:21:01 2022 +0100

    [JAMES-3687] pulls pulsar-client explicitely to enforce its version
    
    Without this the version is resolved throuh transitive dependencies
    which may be misaligned with the version of the admin client. This
    resulted in conflicts in the shaded netty brought in both clients.
---
 backends-common/pulsar/pom.xml | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/backends-common/pulsar/pom.xml b/backends-common/pulsar/pom.xml
index 678b44654c..24f3223097 100644
--- a/backends-common/pulsar/pom.xml
+++ b/backends-common/pulsar/pom.xml
@@ -85,6 +85,11 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-configuration2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>2.9.3</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-admin</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/03: [JAMES-3687] refactors client creation to PulsarConfiguration

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch pulsar-authentication
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3e2c33cca34e28df4e7b99bcb437e7fdc544ed1d
Author: Jean Helou <jh...@codamens.fr>
AuthorDate: Thu Nov 24 23:47:46 2022 +0100

    [JAMES-3687] refactors client creation to PulsarConfiguration
---
 .../backends/pulsar/PulsarConfiguration.scala      | 30 ++++++++++-
 .../james/queue/pulsar/PulsarMailQueue.scala       | 59 ++++++++++------------
 .../queue/pulsar/PulsarMailQueueFactory.scala      |  7 ++-
 3 files changed, 60 insertions(+), 36 deletions(-)

diff --git a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
index 7e790f29c6..e50842821b 100644
--- a/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
+++ b/backends-common/pulsar/src/main/scala/org/apache/james/backends/pulsar/PulsarConfiguration.scala
@@ -22,6 +22,11 @@ package org.apache.james.backends.pulsar
 import java.net.{URI, URISyntaxException}
 import org.apache.commons.configuration2.Configuration
 import com.google.common.base.Strings
+import com.sksamuel.pulsar4s.{PulsarAsyncClient, PulsarClient, PulsarClientConfig}
+import org.apache.pulsar.client.admin.PulsarAdmin
+import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, AuthenticationDisabled, AuthenticationToken}
+
+import scala.jdk.CollectionConverters.MapHasAsJava
 
 object PulsarConfiguration {
   val BROKER_URI_PROPERTY_NAME = "broker.uri"
@@ -100,4 +105,27 @@ object Auth {
   case class Basic(userId: String, password: String) extends Auth
 }
 
-case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace, auth: Auth = Auth.NoAuth)
\ No newline at end of file
+case class PulsarConfiguration(brokerUri: String, adminUri: String, namespace: Namespace, auth: Auth = Auth.NoAuth) {
+  private val pulsarAuth = auth match {
+    case Auth.NoAuth => new AuthenticationDisabled()
+    case Auth.Token(value) => new AuthenticationToken(value)
+    case Auth.Basic(userId, password) =>
+      val basic = new AuthenticationBasic()
+      basic.configure(Map("userId" -> userId, "password" -> password).asJava)
+      basic
+  }
+
+  def adminClient(): PulsarAdmin =
+    PulsarAdmin.builder()
+      .serviceHttpUrl(adminUri)
+      .authentication(pulsarAuth)
+      .build()
+
+  def asyncClient(): PulsarAsyncClient =
+    PulsarClient(
+      PulsarClientConfig(
+        serviceUrl = brokerUri,
+        authentication = Some(pulsarAuth)
+      )
+    )
+}
\ No newline at end of file
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index a6b352cde8..1b6e6f9b16 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -1,4 +1,4 @@
-/****************************************************************
+/** **************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,23 +6,19 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
+ * ************************************************************** */
 
 package org.apache.james.queue.pulsar
 
-import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
-import java.util.concurrent.TimeUnit
-import java.util.{Date, UUID}
-
 import akka.actor.{ActorRef, ActorSystem}
 import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, SourceQueueWithComplete, StreamConverters}
 import akka.stream.{Attributes, OverflowStrategy}
@@ -31,8 +27,6 @@ import akka.{Done, NotUsed}
 import com.sksamuel.pulsar4s._
 import com.sksamuel.pulsar4s.akka.streams
 import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
-import javax.mail.MessagingException
-import javax.mail.internet.MimeMessage
 import org.apache.james.backends.pulsar.PulsarReader
 import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
@@ -44,13 +38,17 @@ import org.apache.james.queue.api._
 import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
 import org.apache.james.server.core.MailImpl
 import org.apache.mailet._
-import org.apache.pulsar.client.admin.PulsarAdmin
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
 import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, SubscriptionType}
 import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
 import play.api.libs.json._
 
+import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
+import java.util.concurrent.TimeUnit
+import java.util.{Date, UUID}
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -85,14 +83,14 @@ private[pulsar] object schemas {
  * A filter cannot remove messages that are enqueued after the call to the `remove` method.
  */
 class PulsarMailQueue(
-  config: PulsarMailQueueConfiguration,
-  blobIdFactory: BlobId.Factory,
-  mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
-  mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
-  metricFactory: MetricFactory,
-  gaugeRegistry: GaugeRegistry,
-  system: ActorSystem
-) extends MailQueue with ManageableMailQueue {
+                       config: PulsarMailQueueConfiguration,
+                       blobIdFactory: BlobId.Factory,
+                       mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
+                       mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
+                       metricFactory: MetricFactory,
+                       gaugeRegistry: GaugeRegistry,
+                       system: ActorSystem
+                     ) extends MailQueue with ManageableMailQueue {
 
   import schemas._
   import serializers._
@@ -110,11 +108,8 @@ class PulsarMailQueue(
   private implicit val implicitSystem: ActorSystem = system
   private implicit val ec: ExecutionContextExecutor = system.dispatcher
   private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
-  private implicit val client: PulsarAsyncClient = PulsarClient(config.pulsar.brokerUri)
-  private val admin = {
-    val builder = PulsarAdmin.builder()
-    builder.serviceHttpUrl(config.pulsar.adminUri).build()
-  }
+  private implicit val client: PulsarAsyncClient = config.pulsar.asyncClient()
+  private val admin = config.pulsar.adminClient()
 
   private val outTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}")
   private val scheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled")
@@ -212,7 +207,7 @@ class PulsarMailQueue(
   private val filterScheduledStage: ActorRef = system.actorOf(FilterStage.props)
   private val requeueMessage = Flow.apply[CommittableMessage[String]]
     .via(filteringFlow(filterScheduledStage))
-    .flatMapConcat{case (_,_,message) => Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => message))}
+    .flatMapConcat { case (_, _, message) => Source.future(requeue.offer(ProducerMessage(message.message.value)).map(_ => message)) }
     .flatMapConcat(message => Source.future(message.ack(cumulative = false)))
     .toMat(Sink.ignore)(Keep.none)
 
@@ -244,7 +239,7 @@ class PulsarMailQueue(
       .toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial = 1, max = 1)))(Keep.both)
   }
 
-  private def filteringFlow(filterActor:ActorRef) = {
+  private def filteringFlow(filterActor: ActorRef) = {
     implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS)
     Flow.apply[CommittableMessage[String]].map(message =>
       (Json.fromJson[MailMetadata](Json.parse(message.message.value)).get,
@@ -261,7 +256,7 @@ class PulsarMailQueue(
           val partsId = metadata.partsId
           Source
             .fromPublisher(readMimeMessage(partsId))
-            .collect{ case Some(message) => message }
+            .collect { case Some(message) => message }
             .map(message => (readMail(metadata, message), partsId, committableMessage))
       }
   }
@@ -325,7 +320,7 @@ class PulsarMailQueue(
    *
    * @see [[FilterStage]]
    */
-  private def filtersCommandFlow(topic:Topic, filterSubscription: Subscription, filteringStage: ActorRef) = {
+  private def filtersCommandFlow(topic: Topic, filterSubscription: Subscription, filteringStage: ActorRef) = {
     val logInvalidFilterPayload = Flow.apply[JsResult[Filter]]
       .collectType[JsError]
       .map(error => "unable to parse filter" + Json.prettyPrint(JsError.toJson(error)))
@@ -543,11 +538,12 @@ class PulsarMailQueue(
    * This is reliant on the FilterStage implementation being able to deduplicate
    * filters. The current implementation defined filters as value objects and stores
    * them in a Set which will effectively dedpulicate them.
+   *
    * @see org.apache.james.queue.pulsar.FilterStage.filters
    * @param producer
    * @param filter
    */
-  private def publishFilter(producer:Producer[String])(filter:Filter): Unit ={
+  private def publishFilter(producer: Producer[String])(filter: Filter): Unit = {
     import Filter._
     // Optimizes for the local/single instance case, the duplicated filter
     // received through pulsar will be eliminated by the filter stage as
@@ -586,13 +582,14 @@ class PulsarMailQueue(
           .bodyBlobId(blobIdFactory.from(metadata.bodyBlobId))
           .build()
         Source.fromPublisher(readMimeMessage(partsId))
-          .collect{ case Some(message) => message }
+          .collect { case Some(message) => message }
           .map(message => readMail(metadata, message))
       })
 
     new ManageableMailQueue.MailQueueIterator() {
       private val javaStream = browseableMails.runWith(StreamConverters.asJavaStream[Mail]())
       private val iterator = javaStream.iterator()
+
       /**
        * @inheritdoc
        */
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
index fda7581ac2..536f8575cb 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
@@ -20,12 +20,13 @@
 package org.apache.james.queue.pulsar
 
 import akka.actor.ActorSystem
-import org.apache.james.backends.pulsar.PulsarConfiguration
+import org.apache.james.backends.pulsar.{Auth, PulsarConfiguration}
 import org.apache.james.blob.api.{BlobId, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
 import org.apache.james.metrics.api.{GaugeRegistry, MetricFactory}
 import org.apache.james.queue.api.{MailQueueFactory, MailQueueItemDecoratorFactory, MailQueueName}
 import org.apache.pulsar.client.admin.PulsarAdmin
+import org.apache.pulsar.client.impl.auth.{AuthenticationBasic, AuthenticationToken}
 
 import java.util
 import java.util.Optional
@@ -45,9 +46,7 @@ class PulsarMailQueueFactory @Inject()(pulsarConfiguration: PulsarConfiguration,
   gaugeRegistry: GaugeRegistry
 ) extends MailQueueFactory[PulsarMailQueue] {
   private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = new AtomicReference(Map.empty)
-  private val admin =
-    PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build()
-
+  private val admin = pulsarConfiguration.adminClient()
   private val system: ActorSystem = ActorSystem("pulsar-mailqueue")
 
   @PreDestroy


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org