You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/11/01 04:20:49 UTC
[james-project] 04/08: JAMES-3539 Implement PushListener
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d821e5dfd992e0d10ef0ca9e1bd068d20bd45545
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 28 11:01:09 2021 +0700
JAMES-3539 Implement PushListener
---
.../james/jmap/pushsubscription/PushListener.scala | 68 ++++++++++
.../jmap/pushsubscription/PushListenerTest.scala | 148 +++++++++++++++++++++
2 files changed, 216 insertions(+)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
new file mode 100644
index 0000000..ff0dc01
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
@@ -0,0 +1,68 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.pushsubscription
+
+import java.nio.charset.StandardCharsets
+
+import javax.inject.Inject
+import org.apache.james.events.EventListener.ReactiveGroupEventListener
+import org.apache.james.events.{Event, Group}
+import org.apache.james.jmap.api.model.PushSubscription
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
+import org.apache.james.jmap.change.StateChangeEvent
+import org.apache.james.jmap.core.StateChange
+import org.apache.james.jmap.json.PushSerializer
+import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
+import play.api.libs.json.Json
+import reactor.core.scala.publisher.{SFlux, SMono}
+
+case class PushListenerGroup() extends Group {}
+
+class PushListener @Inject()(pushRepository: PushSubscriptionRepository,
+ webPushClient: WebPushClient,
+ pushSerializer: PushSerializer) extends ReactiveGroupEventListener {
+
+ override def getDefaultGroup: Group = PushListenerGroup()
+
+ override def reactiveEvent(event: Event): Publisher[Void] =
+ event match {
+ case event: StateChangeEvent =>
+ SFlux(pushRepository.list(event.username))
+ .filter(_.validated)
+ .flatMap(sendNotification(_, event), ReactorUtils.DEFAULT_CONCURRENCY)
+ .`then`()
+ case _ => SMono.empty
+ }
+
+ override def isHandling(event: Event): Boolean = event.isInstanceOf[StateChangeEvent]
+
+ private def sendNotification(pushSubscription: PushSubscription, stateChangeEvent: StateChangeEvent): Publisher[Unit] =
+ stateChangeEvent
+ .asStateChange
+ .filter(pushSubscription.types.toSet)
+ .fold(SMono.empty[Unit])(stateChange => SMono(webPushClient.push(pushSubscription.url, asPushRequest(stateChange))))
+
+ private def asPushRequest(stateChange: StateChange): PushRequest =
+ PushRequest(ttl = PushTTL.MAX, payload = asBytes(stateChange))
+
+ private def asBytes(stateChange: StateChange) =
+ Json.stringify(pushSerializer.serializeSSE(stateChange)).getBytes(StandardCharsets.UTF_8)
+}
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
new file mode 100644
index 0000000..b281a6c
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.pushsubscription
+
+import java.nio.charset.StandardCharsets
+import java.time.Clock
+import java.util.UUID
+
+import com.google.common.collect.ImmutableSet
+import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
+import org.apache.james.jmap.api.model.{DeviceClientId, PushSubscriptionCreationRequest, PushSubscriptionServerURL, TypeName}
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository
+import org.apache.james.jmap.change.{EmailDeliveryTypeName, EmailTypeName, MailboxTypeName, StateChangeEvent, TypeStateFactory}
+import org.apache.james.jmap.core.UuidState
+import org.apache.james.jmap.json.PushSerializer
+import org.apache.james.jmap.memory.pushsubscription.MemoryPushSubscriptionRepository
+import org.assertj.core.api.SoftAssertions
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, verify, verifyNoInteractions, when}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import reactor.core.scala.publisher.SMono
+
+import scala.jdk.OptionConverters._
+
+class PushListenerTest {
+ val bob: Username = Username.of("bob@localhost")
+ val bobAccountId: String = "405010d6c16c16dec36f3d7a7596c2d757ba7b57904adc4801a63e40914fd5c9"
+ val url: PushSubscriptionServerURL = PushSubscriptionServerURL.from("http://localhost:9999/push").get
+
+ var testee: PushListener = _
+ var pushSubscriptionRepository: PushSubscriptionRepository = _
+ var webPushClient: WebPushClient = _
+
+ @BeforeEach
+ def setUp(): Unit = {
+ val pushSerializer = PushSerializer(TypeStateFactory(ImmutableSet.of[TypeName](MailboxTypeName, EmailTypeName, EmailDeliveryTypeName)))
+
+ pushSubscriptionRepository = new MemoryPushSubscriptionRepository(Clock.systemUTC())
+ webPushClient = mock(classOf[WebPushClient])
+ testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer)
+
+ when(webPushClient.push(any(), any())).thenReturn(SMono.empty[Unit])
+ }
+
+ @Test
+ def shouldNotPushWhenNoSubscriptions(): Unit = {
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+ Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+ verifyNoInteractions(webPushClient)
+ }
+
+ @Test
+ def shouldNotPushWhenNotVerified(): Unit = {
+ SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit"),
+ url = url,
+ types = Seq(MailboxTypeName, EmailTypeName)))).block()
+
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+ Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+ verifyNoInteractions(webPushClient)
+ }
+
+ @Test
+ def shouldNotPushWhenTypeMismatch(): Unit = {
+ val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit"),
+ url = url,
+ types = Seq(EmailDeliveryTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+ Map(EmailTypeName -> UuidState(UUID.randomUUID()))))).block()
+
+ verifyNoInteractions(webPushClient)
+ }
+
+ @Test
+ def shouldPushWhenValidated(): Unit = {
+ val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+ val state1 = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+ Map(EmailTypeName -> state1)))).block()
+
+ val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+ verify(webPushClient).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(argumentCaptor.getValue.ttl).isEqualTo(PushTTL.MAX)
+ softly.assertThat(argumentCaptor.getValue.topic.toJava).isEmpty
+ softly.assertThat(argumentCaptor.getValue.urgency.toJava).isEmpty
+ softly.assertThat(new String(argumentCaptor.getValue.payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${state1.value.toString}"}}}""")
+ })
+ }
+
+ @Test
+ def unwantedTypesShouldBeFilteredOut(): Unit = {
+ val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, id)).block()
+
+ val state1 = UuidState(UUID.randomUUID())
+ val state2 = UuidState(UUID.randomUUID())
+ val state3 = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob,
+ Map(EmailTypeName -> state1, MailboxTypeName -> state2, EmailDeliveryTypeName -> state3)))).block()
+
+ val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+ verify(webPushClient).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(argumentCaptor.getValue.ttl).isEqualTo(PushTTL.MAX)
+ softly.assertThat(argumentCaptor.getValue.topic.toJava).isEmpty
+ softly.assertThat(argumentCaptor.getValue.urgency.toJava).isEmpty
+ softly.assertThat(new String(argumentCaptor.getValue.payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${state1.value.toString}","Mailbox":"${state2.value.toString}"}}}""")
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org