You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/11/01 04:20:49 UTC

[james-project] 04/08: JAMES-3539 Implement PushListener

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d821e5dfd992e0d10ef0ca9e1bd068d20bd45545
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 28 11:01:09 2021 +0700

    JAMES-3539 Implement PushListener
---
 .../james/jmap/pushsubscription/PushListener.scala |  68 ++++++++++
 .../jmap/pushsubscription/PushListenerTest.scala   | 148 +++++++++++++++++++++
 2 files changed, 216 insertions(+)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
new file mode 100644
index 0000000..ff0dc01
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
@@ -0,0 +1,68 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.pushsubscription
+
+import java.nio.charset.StandardCharsets
+
+import javax.inject.Inject
+import org.apache.james.events.EventListener.ReactiveGroupEventListener
+import org.apache.james.events.{Event, Group}
+import org.apache.james.jmap.api.model.PushSubscription
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
+import org.apache.james.jmap.change.StateChangeEvent
+import org.apache.james.jmap.core.StateChange
+import org.apache.james.jmap.json.PushSerializer
+import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
+import play.api.libs.json.Json
+import reactor.core.scala.publisher.{SFlux, SMono}
+
+case class PushListenerGroup() extends Group {}
+
+class PushListener @Inject()(pushRepository: PushSubscriptionRepository,
+                   webPushClient: WebPushClient,
+                   pushSerializer: PushSerializer) extends ReactiveGroupEventListener {
+
+  override def getDefaultGroup: Group = PushListenerGroup()
+
+  override def reactiveEvent(event: Event): Publisher[Void] =
+    event match {
+      case event: StateChangeEvent =>
+        SFlux(pushRepository.list(event.username))
+          .filter(_.validated)
+          .flatMap(sendNotification(_, event), ReactorUtils.DEFAULT_CONCURRENCY)
+          .`then`()
+      case _ => SMono.empty
+    }
+
+  override def isHandling(event: Event): Boolean = event.isInstanceOf[StateChangeEvent]
+
+  private def sendNotification(pushSubscription: PushSubscription, stateChangeEvent: StateChangeEvent): Publisher[Unit] =
+    stateChangeEvent
+      .asStateChange
+      .filter(pushSubscription.types.toSet)
+      .fold(SMono.empty[Unit])(stateChange => SMono(webPushClient.push(pushSubscription.url, asPushRequest(stateChange))))
+
+  private def asPushRequest(stateChange: StateChange): PushRequest =
+    PushRequest(ttl = PushTTL.MAX, payload = asBytes(stateChange))
+
+  private def asBytes(stateChange: StateChange) =
+    Json.stringify(pushSerializer.serializeSSE(stateChange)).getBytes(StandardCharsets.UTF_8)
+}
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
new file mode 100644
index 0000000..b281a6c
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.pushsubscription
+
+import java.nio.charset.StandardCharsets
+import java.time.Clock
+import java.util.UUID
+
+import com.google.common.collect.ImmutableSet
+import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
+import org.apache.james.jmap.api.model.{DeviceClientId, PushSubscriptionCreationRequest, PushSubscriptionServerURL, TypeName}
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
+import org.apache.james.jmap.change.{EmailDeliveryTypeName, EmailTypeName, MailboxTypeName, StateChangeEvent, TypeStateFactory}
+import org.apache.james.jmap.core.UuidState
+import org.apache.james.jmap.json.PushSerializer
+import org.apache.james.jmap.memory.pushsubscription.MemoryPushSubscriptionRepository
+import org.assertj.core.api.SoftAssertions
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, verify, verifyNoInteractions, when}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import reactor.core.scala.publisher.SMono
+
+import scala.jdk.OptionConverters._
+
+class PushListenerTest {
+  val bob: Username = Username.of("bob@localhost")
+  val bobAccountId: String = "405010d6c16c16dec36f3d7a7596c2d757ba7b57904adc4801a63e40914fd5c9"
+  val url: PushSubscriptionServerURL = PushSubscriptionServerURL.from("http://localhost:9999/push").get
+
+  var testee: PushListener = _
+  var pushSubscriptionRepository: PushSubscriptionRepository = _
+  var webPushClient: WebPushClient = _
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val pushSerializer = PushSerializer(TypeStateFactory(ImmutableSet.of[TypeName](MailboxTypeName, EmailTypeName, EmailDeliveryTypeName)))
+
+    pushSubscriptionRepository = new MemoryPushSubscriptionRepository(Clock.systemUTC())
+    webPushClient = mock(classOf[WebPushClient])
+    testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer)
+
+    when(webPushClient.push(any(), any())).thenReturn(SMono.empty[Unit])
+  }
+
+  @Test
+  def shouldNotPushWhenNoSubscriptions(): Unit = {
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+      Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+    verifyNoInteractions(webPushClient)
+  }
+
+  @Test
+  def shouldNotPushWhenNotVerified(): Unit = {
+    SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit"),
+      url = url,
+      types = Seq(MailboxTypeName, EmailTypeName)))).block()
+
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+      Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+    verifyNoInteractions(webPushClient)
+  }
+
+  @Test
+  def shouldNotPushWhenTypeMismatch(): Unit = {
+    val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit"),
+      url = url,
+      types = Seq(EmailDeliveryTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+      Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+    verifyNoInteractions(webPushClient)
+  }
+
+  @Test
+  def shouldPushWhenValidated(): Unit = {
+    val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+    val state1 = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+      Map(EmailTypeName -> state1)))).block()
+
+    val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+    verify(webPushClient).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+
+    SoftAssertions.assertSoftly(softly => {
+      softly.assertThat(argumentCaptor.getValue.ttl).isEqualTo(PushTTL.MAX)
+      softly.assertThat(argumentCaptor.getValue.topic.toJava).isEmpty
+      softly.assertThat(argumentCaptor.getValue.urgency.toJava).isEmpty
+      softly.assertThat(new String(argumentCaptor.getValue.payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${state1.value.toString}"}}}""")
+    })
+  }
+
+  @Test
+  def unwantedTypesShouldBeFilteredOut(): Unit = {
+    val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+    val state1 = UuidState(UUID.randomUUID())
+    val state2 = UuidState(UUID.randomUUID())
+    val state3 = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+      Map(EmailTypeName -> state1, MailboxTypeName -> state2, EmailDeliveryTypeName -> state3)))).block()
+
+    val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+    verify(webPushClient).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+
+    SoftAssertions.assertSoftly(softly => {
+      softly.assertThat(argumentCaptor.getValue.ttl).isEqualTo(PushTTL.MAX)
+      softly.assertThat(argumentCaptor.getValue.topic.toJava).isEmpty
+      softly.assertThat(argumentCaptor.getValue.urgency.toJava).isEmpty
+      softly.assertThat(new String(argumentCaptor.getValue.payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${state1.value.toString}","Mailbox":"${state2.value.toString}"}}}""")
+    })
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org