You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/11/01 04:33:20 UTC

[james-project] branch master updated (7a8335f -> 5eb8a9c)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 7a8335f  JAMES-3539 Fix WebPushClient
     new 34175c3  JAMES-3539 Cassandra implement for PushSubscriptionRepository
     new edc1623  JAMES-3539 Move TypeStateFactory to data-jmap
     new 0f40259  JAMES-3539 MemoryPushSubscriptionRepository should handle InvalidPushSubscriptionKeys error
     new 2983ac5  JAMES-3539 Introduce PushSubscriptionHelpers to avoid duplicated code
     new 5eb8a9c  JAMES-3539 MemoryPushSubscriptionRepository should leverage PushSubscriptionHelpers

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 server/data/data-jmap-cassandra/pom.xml            |   5 +
 .../CassandraPushSubscriptionDAO.java              | 175 +++++++++++++++++++++
 .../CassandraPushSubscriptionModule.java           |  57 +++++++
 .../CassandraPushSubscriptionRepository.java}      | 149 ++++++++----------
 .../tables/CassandraPushSubscriptionTable.java     |  28 ++--
 .../CassandraPushSubscriptionRepositoryTest.java}  |  62 +++++---
 .../pushsubscription/PushSubscriptionHelpers.java  |  60 +++++++
 .../MemoryPushSubscriptionRepository.java          |  37 ++---
 .../james/jmap/api}/change/TypeStateFactory.scala  |   3 +-
 .../james/jmap/api/model/PushSubscription.scala    |   6 +-
 .../PushSubscriptionRepositoryContract.scala       |  63 +++++++-
 .../james/jmap/change/JmapEventSerializer.scala    |   1 +
 .../apache/james/jmap/json/PushSerializer.scala    |   3 +-
 .../james/jmap/routes/EventSourceRoutes.scala      |   3 +-
 .../apache/james/jmap/routes/WebSocketRoutes.scala |   2 +-
 .../change/StateChangeEventSerializerTest.scala    |   2 +
 .../james/jmap/change/TypeStateFactoryTest.scala   |   2 +
 17 files changed, 507 insertions(+), 151 deletions(-)
 create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
 create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java
 copy server/data/{data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java => data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java} (58%)
 copy mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathV2Table.java => server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java (70%)
 copy server/data/{data-jmap/src/test/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepositoryTest.java => data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java} (60%)
 create mode 100644 server/data/data-jmap/src/main/java/org/apache/james/jmap/api/pushsubscription/PushSubscriptionHelpers.java
 rename server/{protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap => data/data-jmap/src/main/scala/org/apache/james/jmap/api}/change/TypeStateFactory.scala (97%)

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/05: JAMES-3539 MemoryPushSubscriptionRepository should handle InvalidPushSubscriptionKeys error

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0f4025952f1630458a008b36b812359a1bd9d3a7
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu Oct 28 21:07:54 2021 +0700

    JAMES-3539 MemoryPushSubscriptionRepository should handle InvalidPushSubscriptionKeys error
---
 .../pushsubscription/MemoryPushSubscriptionRepository.java    | 11 +++++++++++
 .../org/apache/james/jmap/api/model/PushSubscription.scala    |  4 +++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
index 61c5c5a..488b616 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
@@ -32,10 +32,12 @@ import javax.inject.Inject;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.model.DeviceClientIdInvalidException;
 import org.apache.james.jmap.api.model.ExpireTimeInvalidException;
+import org.apache.james.jmap.api.model.InvalidPushSubscriptionKeys;
 import org.apache.james.jmap.api.model.PushSubscription;
 import org.apache.james.jmap.api.model.PushSubscriptionCreationRequest;
 import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
 import org.apache.james.jmap.api.model.PushSubscriptionId;
+import org.apache.james.jmap.api.model.PushSubscriptionKeys;
 import org.apache.james.jmap.api.model.PushSubscriptionNotFoundException;
 import org.apache.james.jmap.api.model.TypeName;
 import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
@@ -70,6 +72,9 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
                 if (!isUniqueDeviceClientId(username, req.deviceClientId())) {
                     sink.error(new DeviceClientIdInvalidException(req.deviceClientId(), "deviceClientId must be unique"));
                 }
+                if (isInvalidPushSubscriptionKey(req.keys())) {
+                    sink.error(new InvalidPushSubscriptionKeys(req.keys().get()));
+                }
             })
             .thenReturn(PushSubscription.from(request,
                 evaluateExpiresTime(OptionConverters.toJava(request.expires().map(PushSubscriptionExpiredTime::value)))))
@@ -158,4 +163,10 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
         return table.row(username).values().stream()
             .noneMatch(subscription -> subscription.deviceClientId().equals(deviceClientId));
     }
+
+    private boolean isInvalidPushSubscriptionKey(Option<PushSubscriptionKeys> keysOption) {
+        return OptionConverters.toJava(keysOption)
+            .map(key -> key.p256dh().isEmpty() || key.auth().isEmpty())
+            .orElse(false);
+    }
 }
diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
index 6d7b7ac..993b9e0 100644
--- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
@@ -116,4 +116,6 @@ case class PushSubscriptionNotFoundException(id: PushSubscriptionId) extends Run
 
 case class ExpireTimeInvalidException(expires: ZonedDateTime, message: String) extends RuntimeException
 
-case class DeviceClientIdInvalidException(deviceClientId: DeviceClientId, message: String) extends RuntimeException
\ No newline at end of file
+case class DeviceClientIdInvalidException(deviceClientId: DeviceClientId, message: String) extends RuntimeException
+
+case class InvalidPushSubscriptionKeys(keys: PushSubscriptionKeys) extends RuntimeException
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/05: JAMES-3539 Move TypeStateFactory to data-jmap

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit edc1623effeb9f2568a518a8ebea91d16dc5427b
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu Oct 28 12:09:49 2021 +0700

    JAMES-3539 Move TypeStateFactory to data-jmap
    
    We need TypeStateFactory at data-jmap level to parse string (stored in Cassandra) to TypeName
---
 .../scala/org/apache/james/jmap/api}/change/TypeStateFactory.scala     | 3 +--
 .../main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala  | 1 +
 .../src/main/scala/org/apache/james/jmap/json/PushSerializer.scala     | 3 ++-
 .../main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala    | 3 ++-
 .../src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala  | 2 +-
 .../org/apache/james/jmap/change/StateChangeEventSerializerTest.scala  | 2 ++
 .../test/scala/org/apache/james/jmap/change/TypeStateFactoryTest.scala | 2 ++
 7 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/TypeStateFactory.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/change/TypeStateFactory.scala
similarity index 97%
rename from server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/TypeStateFactory.scala
rename to server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/change/TypeStateFactory.scala
index fea1ad0..a994f0b 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/TypeStateFactory.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/change/TypeStateFactory.scala
@@ -17,7 +17,7 @@
  * under the License.                                           *
  * ************************************************************** */
 
-package org.apache.james.jmap.change
+package org.apache.james.jmap.api.change
 
 import javax.inject.Inject
 import org.apache.james.jmap.api.model.TypeName
@@ -33,4 +33,3 @@ case class TypeStateFactory @Inject()(setTypeName: java.util.Set[TypeName]) {
       .map(Right(_))
       .getOrElse(Left(new IllegalArgumentException(s"Unknown typeName $string")))
 }
-
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
index 5867d94..bad3974 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
@@ -26,6 +26,7 @@ import javax.inject.Inject
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
 import org.apache.james.events.{Event, EventSerializer}
+import org.apache.james.jmap.api.change.TypeStateFactory
 import org.apache.james.jmap.api.model.{State, TypeName}
 import org.apache.james.jmap.core.UuidState
 import org.apache.james.json.JsonGenericSerializer
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/PushSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/PushSerializer.scala
index 13313a2..eafba65 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/PushSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/PushSerializer.scala
@@ -20,8 +20,9 @@
 package org.apache.james.jmap.json
 
 import javax.inject.Inject
+import org.apache.james.jmap.api.change.TypeStateFactory
 import org.apache.james.jmap.api.model.{State, TypeName}
-import org.apache.james.jmap.change.{TypeState, TypeStateFactory}
+import org.apache.james.jmap.change.TypeState
 import org.apache.james.jmap.core.{AccountId, OutboundMessage, PingMessage, PushState, RequestId, StateChange, WebSocketError, WebSocketInboundMessage, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import play.api.libs.json.{Format, JsError, JsNull, JsObject, JsResult, JsString, JsSuccess, JsValue, Json, OWrites, Reads, Writes}
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
index f9e277f..5debc4e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
@@ -33,8 +33,9 @@ import javax.inject.{Inject, Named}
 import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE
+import org.apache.james.jmap.api.change.TypeStateFactory
 import org.apache.james.jmap.api.model.TypeName
-import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeStateFactory}
+import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
 import org.apache.james.jmap.core.{OutboundMessage, PingMessage, ProblemDetails, StateChange}
 import org.apache.james.jmap.exceptions.UnauthorizedException
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 233dc03..df627b7 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -31,7 +31,7 @@ import org.apache.james.core.Username
 import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
-import org.apache.james.jmap.api.change.{EmailChangeRepository, MailboxChangeRepository}
+import org.apache.james.jmap.api.change.{EmailChangeRepository, MailboxChangeRepository, TypeStateFactory}
 import org.apache.james.jmap.api.model.{AccountId => JavaAccountId}
 import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, _}
 import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse, _}
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeEventSerializerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeEventSerializerTest.scala
index 4ba5f43..08f0010 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeEventSerializerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeEventSerializerTest.scala
@@ -21,6 +21,8 @@ package org.apache.james.jmap.change
 import org.apache.james.JsonSerializationVerifier
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
+import org.apache.james.jmap.api.change
+import org.apache.james.jmap.api.change.TypeStateFactory
 import org.apache.james.jmap.api.model.TypeName
 import org.apache.james.jmap.change.StateChangeEventSerializerTest.{EVENT, EVENT_EMPTY_TYPE_STATE_MAP, EVENT_ID, EVENT_JSON, EVENT_JSON_EMPTY_TYPE_STATE_MAP, EVENT_JSON_NO_DELIVERY, EVENT_NO_DELIVERY, USERNAME}
 import org.apache.james.jmap.core.UuidState
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/TypeStateFactoryTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/TypeStateFactoryTest.scala
index 2803b3c..61ba9b8 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/TypeStateFactoryTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/TypeStateFactoryTest.scala
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.change
 
+import org.apache.james.jmap.api.change
+import org.apache.james.jmap.api.change.TypeStateFactory
 import org.apache.james.jmap.api.model.TypeName
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.SoftAssertions

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/05: JAMES-3539 Introduce PushSubscriptionHelpers to avoid duplicated code

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2983ac57799349d102ac03440bd322dbcf6392e1
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Fri Oct 29 09:16:40 2021 +0700

    JAMES-3539 Introduce PushSubscriptionHelpers to avoid duplicated code
---
 .../pushsubscription/PushSubscriptionHelpers.java  | 60 ++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/pushsubscription/PushSubscriptionHelpers.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/pushsubscription/PushSubscriptionHelpers.java
new file mode 100644
index 0000000..e9fa422
--- /dev/null
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/pushsubscription/PushSubscriptionHelpers.java
@@ -0,0 +1,60 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.pushsubscription;
+
+import static org.apache.james.jmap.api.model.PushSubscription.EXPIRES_TIME_MAX_DAY;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+
+import org.apache.james.jmap.api.model.PushSubscription;
+import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
+import org.apache.james.jmap.api.model.PushSubscriptionKeys;
+
+import scala.Option;
+import scala.jdk.javaapi.OptionConverters;
+
+public class PushSubscriptionHelpers {
+    public static boolean isInThePast(PushSubscriptionExpiredTime expire, Clock clock) {
+        return expire.isBefore(ZonedDateTime.now(clock));
+    }
+
+    public static boolean isInThePast(Option<PushSubscriptionExpiredTime> expire, Clock clock) {
+        return expire.map(value -> isInThePast(value, clock)).getOrElse(() -> false);
+    }
+
+    public static PushSubscriptionExpiredTime evaluateExpiresTime(Optional<ZonedDateTime> inputTime, Clock clock) {
+        ZonedDateTime now = ZonedDateTime.now(clock);
+        ZonedDateTime maxExpiresTime = now.plusDays(EXPIRES_TIME_MAX_DAY());
+        return PushSubscriptionExpiredTime.apply(inputTime.filter(input -> input.isBefore(maxExpiresTime))
+            .orElse(maxExpiresTime));
+    }
+
+    public static boolean isNotOutdatedSubscription(PushSubscription subscription, Clock clock) {
+        return subscription.expires().isAfter(ZonedDateTime.now(clock));
+    }
+
+    public static boolean isInvalidPushSubscriptionKey(Option<PushSubscriptionKeys> keysOption) {
+        return OptionConverters.toJava(keysOption)
+            .map(key -> key.p256dh().isEmpty() || key.auth().isEmpty())
+            .orElse(false);
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/05: JAMES-3539 MemoryPushSubscriptionRepository should leverage PushSubscriptionHelpers

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5eb8a9c894c7949001a426cb1117a1de45ff9acf
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Fri Oct 29 09:17:28 2021 +0700

    JAMES-3539 MemoryPushSubscriptionRepository should leverage PushSubscriptionHelpers
---
 .../MemoryPushSubscriptionRepository.java          | 38 +++++-----------------
 1 file changed, 8 insertions(+), 30 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
index 488b616..c6b42ed 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/pushsubscription/MemoryPushSubscriptionRepository.java
@@ -19,7 +19,10 @@
 
 package org.apache.james.jmap.memory.pushsubscription;
 
-import static org.apache.james.jmap.api.model.PushSubscription.EXPIRES_TIME_MAX_DAY;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.evaluateExpiresTime;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInvalidPushSubscriptionKey;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription;
 
 import java.time.Clock;
 import java.time.ZonedDateTime;
@@ -37,7 +40,6 @@ import org.apache.james.jmap.api.model.PushSubscription;
 import org.apache.james.jmap.api.model.PushSubscriptionCreationRequest;
 import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
 import org.apache.james.jmap.api.model.PushSubscriptionId;
-import org.apache.james.jmap.api.model.PushSubscriptionKeys;
 import org.apache.james.jmap.api.model.PushSubscriptionNotFoundException;
 import org.apache.james.jmap.api.model.TypeName;
 import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
@@ -48,7 +50,6 @@ import com.google.common.collect.Table;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import scala.Option;
 import scala.jdk.javaapi.CollectionConverters;
 import scala.jdk.javaapi.OptionConverters;
 
@@ -66,7 +67,7 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
     public Publisher<PushSubscription> save(Username username, PushSubscriptionCreationRequest request) {
         return Mono.just(request)
             .handle((req, sink) -> {
-                if (isInThePast(req.expires())) {
+                if (isInThePast(req.expires(), clock)) {
                     sink.error(new ExpireTimeInvalidException(req.expires().get().value(), "expires must be greater than now"));
                 }
                 if (!isUniqueDeviceClientId(username, req.deviceClientId())) {
@@ -77,7 +78,8 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
                 }
             })
             .thenReturn(PushSubscription.from(request,
-                evaluateExpiresTime(OptionConverters.toJava(request.expires().map(PushSubscriptionExpiredTime::value)))))
+                evaluateExpiresTime(OptionConverters.toJava(request.expires().map(PushSubscriptionExpiredTime::value)),
+                    clock)))
             .doOnNext(pushSubscription -> table.put(username, pushSubscription.id(), pushSubscription));
     }
 
@@ -91,7 +93,7 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
             })
             .then(Mono.justOrEmpty(table.get(username, id))
                 .doOnNext(pushSubscription -> table.put(username, id,
-                    pushSubscription.withExpires(evaluateExpiresTime(Optional.of(newExpire)))))
+                    pushSubscription.withExpires(evaluateExpiresTime(Optional.of(newExpire), clock))))
                 .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
                 .then());
     }
@@ -140,33 +142,9 @@ public class MemoryPushSubscriptionRepository implements PushSubscriptionReposit
             .then();
     }
 
-    private boolean isInThePast(PushSubscriptionExpiredTime expire) {
-        return expire.isBefore(ZonedDateTime.now(clock));
-    }
-
-    private boolean isInThePast(Option<PushSubscriptionExpiredTime> expire) {
-        return expire.map(this::isInThePast).getOrElse(() -> false);
-    }
-
-    private PushSubscriptionExpiredTime evaluateExpiresTime(Optional<ZonedDateTime> inputTime) {
-        ZonedDateTime now = ZonedDateTime.now(clock);
-        ZonedDateTime maxExpiresTime = now.plusDays(EXPIRES_TIME_MAX_DAY());
-        return PushSubscriptionExpiredTime.apply(inputTime.filter(input -> input.isBefore(maxExpiresTime))
-            .orElse(maxExpiresTime));
-    }
-
-    private boolean isNotOutdatedSubscription(PushSubscription subscription, Clock clock) {
-        return subscription.expires().isAfter(ZonedDateTime.now(clock));
-    }
-
     private boolean isUniqueDeviceClientId(Username username, String deviceClientId) {
         return table.row(username).values().stream()
             .noneMatch(subscription -> subscription.deviceClientId().equals(deviceClientId));
     }
 
-    private boolean isInvalidPushSubscriptionKey(Option<PushSubscriptionKeys> keysOption) {
-        return OptionConverters.toJava(keysOption)
-            .map(key -> key.p256dh().isEmpty() || key.auth().isEmpty())
-            .orElse(false);
-    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/05: JAMES-3539 Cassandra implement for PushSubscriptionRepository

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 34175c390e230b215fd494c3fa8cd5dece6c715f
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed Oct 27 17:46:35 2021 +0700

    JAMES-3539 Cassandra implement for PushSubscriptionRepository
---
 server/data/data-jmap-cassandra/pom.xml            |   5 +
 .../CassandraPushSubscriptionDAO.java              | 175 +++++++++++++++++++++
 .../CassandraPushSubscriptionModule.java           |  57 +++++++
 .../CassandraPushSubscriptionRepository.java       | 146 +++++++++++++++++
 .../tables/CassandraPushSubscriptionTable.java     |  34 ++++
 .../CassandraPushSubscriptionRepositoryTest.java   |  62 ++++++++
 .../james/jmap/api/model/PushSubscription.scala    |   2 +
 .../PushSubscriptionRepositoryContract.scala       |  63 +++++++-
 8 files changed, 543 insertions(+), 1 deletion(-)

diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml
index ec5a898..e5b0db7 100644
--- a/server/data/data-jmap-cassandra/pom.xml
+++ b/server/data/data-jmap-cassandra/pom.xml
@@ -92,6 +92,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>metrics-tests</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
new file mode 100644
index 0000000..99b222d
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
@@ -0,0 +1,175 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.DEVICE_CLIENT_ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_AUTH_SECRET;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_PUBLIC_KEY;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.EXPIRES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TYPES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.URL;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.USER;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VALIDATED;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VERIFICATION_CODE;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.change.TypeStateFactory;
+import org.apache.james.jmap.api.model.DeviceClientId;
+import org.apache.james.jmap.api.model.PushSubscription;
+import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
+import org.apache.james.jmap.api.model.PushSubscriptionId;
+import org.apache.james.jmap.api.model.PushSubscriptionKeys;
+import org.apache.james.jmap.api.model.PushSubscriptionServerURL;
+import org.apache.james.jmap.api.model.TypeName;
+import org.apache.james.jmap.api.model.VerificationCode;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import scala.Option;
+import scala.collection.immutable.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+public class CassandraPushSubscriptionDAO {
+    private final TypeStateFactory typeStateFactory;
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement insert;
+    private final PreparedStatement selectAll;
+    private final PreparedStatement deleteOne;
+
+    @Inject
+    public CassandraPushSubscriptionDAO(Session session, TypeStateFactory typeStateFactory) {
+        executor = new CassandraAsyncExecutor(session);
+
+        insert = session.prepare(insertInto(TABLE_NAME)
+            .value(USER, bindMarker(USER))
+            .value(DEVICE_CLIENT_ID, bindMarker(DEVICE_CLIENT_ID))
+            .value(ID, bindMarker(ID))
+            .value(EXPIRES, bindMarker(EXPIRES))
+            .value(TYPES, bindMarker(TYPES))
+            .value(URL, bindMarker(URL))
+            .value(VERIFICATION_CODE, bindMarker(VERIFICATION_CODE))
+            .value(ENCRYPT_PUBLIC_KEY, bindMarker(ENCRYPT_PUBLIC_KEY))
+            .value(ENCRYPT_AUTH_SECRET, bindMarker(ENCRYPT_AUTH_SECRET))
+            .value(VALIDATED, bindMarker(VALIDATED)));
+
+        selectAll = session.prepare(select()
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER))));
+
+        deleteOne = session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER)))
+            .and(eq(DEVICE_CLIENT_ID, bindMarker(DEVICE_CLIENT_ID))));
+
+        this.typeStateFactory = typeStateFactory;
+    }
+
+    public Mono<PushSubscription> insert(Username username, PushSubscription subscription) {
+        Set<String> typeNames = CollectionConverters.asJava(subscription.types()
+            .map(TypeName::asString)
+            .toSet());
+        Instant utcInstant = subscription.expires().value().withZoneSameInstant(ZoneOffset.UTC).toInstant();
+
+        BoundStatement insertSubscription = insert.bind()
+            .setString(USER, username.asString())
+            .setString(DEVICE_CLIENT_ID, subscription.deviceClientId())
+            .setUUID(ID, subscription.id().value())
+            .setTimestamp(EXPIRES, Date.from(utcInstant))
+            .setSet(TYPES, typeNames)
+            .setString(URL, subscription.url().value().toString())
+            .setString(VERIFICATION_CODE, subscription.verificationCode())
+            .setBool(VALIDATED, subscription.validated());
+
+        OptionConverters.toJava(subscription.keys())
+            .map(keys -> insertSubscription.setString(ENCRYPT_PUBLIC_KEY, keys.p256dh())
+                .setString(ENCRYPT_AUTH_SECRET, keys.auth()));
+
+        return executor.executeVoid(insertSubscription)
+            .thenReturn(subscription);
+    }
+
+    public Flux<PushSubscription> selectAll(Username username) {
+        return executor.executeRows(selectAll.bind().setString(USER, username.asString()))
+            .map(this::toPushSubscription);
+    }
+
+    public Mono<Void> deleteOne(Username username, String deviceClientId) {
+        return executor.executeVoid(deleteOne.bind()
+            .setString(USER, username.asString())
+            .setString(DEVICE_CLIENT_ID, deviceClientId));
+    }
+
+    private PushSubscription toPushSubscription(Row row) {
+        return PushSubscription.apply(
+            PushSubscriptionId.apply(row.getUUID(ID)),
+            DeviceClientId.apply(row.getString(DEVICE_CLIENT_ID)),
+            PushSubscriptionServerURL.from(row.getString(URL)).get(),
+            toKeys(row),
+            VerificationCode.apply(row.getString(VERIFICATION_CODE)),
+            row.getBool(VALIDATED),
+            toExpires(row),
+            toTypes(row));
+    }
+
+    private Option<PushSubscriptionKeys> toKeys(Row row) {
+        String p256dh = row.getString(ENCRYPT_PUBLIC_KEY);
+        String auth = row.getString(ENCRYPT_AUTH_SECRET);
+        if (p256dh == null && auth == null) {
+            return Option.empty();
+        } else {
+            return Option.apply(PushSubscriptionKeys.apply(p256dh, auth));
+        }
+    }
+
+    private PushSubscriptionExpiredTime toExpires(Row row) {
+        return PushSubscriptionExpiredTime.apply(
+            ZonedDateTime.ofInstant(row.getTimestamp(EXPIRES).toInstant(), ZoneOffset.UTC));
+    }
+
+    private Seq<TypeName> toTypes(Row row) {
+        return CollectionConverters.asScala(row.getSet(TYPES, String.class).stream()
+                .map(string -> typeStateFactory.parse(string).right().get())
+                .collect(ImmutableSet.toImmutableSet()))
+            .toSeq();
+    }
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java
new file mode 100644
index 0000000..49e1065
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java
@@ -0,0 +1,57 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static com.datastax.driver.core.DataType.cboolean;
+import static com.datastax.driver.core.DataType.frozenSet;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timestamp;
+import static com.datastax.driver.core.DataType.uuid;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.DEVICE_CLIENT_ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_AUTH_SECRET;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_PUBLIC_KEY;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.EXPIRES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TYPES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.URL;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.USER;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VALIDATED;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VERIFICATION_CODE;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable;
+
+public interface CassandraPushSubscriptionModule {
+    CassandraModule MODULE = CassandraModule.builder()
+        .table(CassandraPushSubscriptionTable.TABLE_NAME)
+        .comment("Hold user push subscriptions data")
+        .statement(statement -> statement
+            .addPartitionKey(USER, text())
+            .addClusteringColumn(DEVICE_CLIENT_ID, text())
+            .addColumn(ID, uuid())
+            .addColumn(EXPIRES, timestamp())
+            .addColumn(TYPES, frozenSet(text()))
+            .addColumn(URL, text())
+            .addColumn(VERIFICATION_CODE, text())
+            .addColumn(ENCRYPT_PUBLIC_KEY, text())
+            .addColumn(ENCRYPT_AUTH_SECRET, text())
+            .addColumn(VALIDATED, cboolean()))
+        .build();
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java
new file mode 100644
index 0000000..a6d2c16
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java
@@ -0,0 +1,146 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.evaluateExpiresTime;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInvalidPushSubscriptionKey;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.DeviceClientIdInvalidException;
+import org.apache.james.jmap.api.model.ExpireTimeInvalidException;
+import org.apache.james.jmap.api.model.InvalidPushSubscriptionKeys;
+import org.apache.james.jmap.api.model.PushSubscription;
+import org.apache.james.jmap.api.model.PushSubscriptionCreationRequest;
+import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
+import org.apache.james.jmap.api.model.PushSubscriptionId;
+import org.apache.james.jmap.api.model.PushSubscriptionNotFoundException;
+import org.apache.james.jmap.api.model.TypeName;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+public class CassandraPushSubscriptionRepository implements PushSubscriptionRepository {
+    private final CassandraPushSubscriptionDAO dao;
+    private final Clock clock;
+
+    @Inject
+    public CassandraPushSubscriptionRepository(CassandraPushSubscriptionDAO dao, Clock clock) {
+        this.dao = dao;
+        this.clock = clock;
+    }
+
+    @Override
+    public Publisher<PushSubscription> save(Username username, PushSubscriptionCreationRequest request) {
+        return Mono.just(request)
+            .handle((req, sink) -> {
+                if (isInThePast(req.expires(), clock)) {
+                    sink.error(new ExpireTimeInvalidException(req.expires().get().value(), "expires must be greater than now"));
+                }
+                if (!isUniqueDeviceClientId(username, req.deviceClientId())) {
+                    sink.error(new DeviceClientIdInvalidException(req.deviceClientId(), "deviceClientId must be unique"));
+                }
+                if (isInvalidPushSubscriptionKey(req.keys())) {
+                    sink.error(new InvalidPushSubscriptionKeys(req.keys().get()));
+                }
+            })
+            .thenReturn(PushSubscription.from(request,
+                evaluateExpiresTime(OptionConverters.toJava(request.expires().map(PushSubscriptionExpiredTime::value)),
+                    clock)))
+            .flatMap(subscription -> dao.insert(username, subscription).thenReturn(subscription));
+    }
+
+    @Override
+    public Publisher<Void> updateExpireTime(Username username, PushSubscriptionId id, ZonedDateTime newExpire) {
+        return Mono.just(newExpire)
+            .handle((inputTime, sink) -> {
+                if (newExpire.isBefore(ZonedDateTime.now(clock))) {
+                    sink.error(new ExpireTimeInvalidException(inputTime, "expires must be greater than now"));
+                }
+            })
+            .then(retrieveByPushSubscriptionId(username, id)
+                .flatMap(subscription -> dao.insert(username,
+                    subscription.withExpires(evaluateExpiresTime(Optional.of(newExpire), clock))))
+                .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+                .then());
+    }
+
+    @Override
+    public Publisher<Void> updateTypes(Username username, PushSubscriptionId id, Set<TypeName> types) {
+        return retrieveByPushSubscriptionId(username, id)
+            .map(subscription -> subscription.withTypes(CollectionConverters.asScala(types).toSeq()))
+            .flatMap(newPushSubscription -> dao.insert(username, newPushSubscription))
+            .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+            .then();
+    }
+
+    @Override
+    public Publisher<Void> validateVerificationCode(Username username, PushSubscriptionId id) {
+        return retrieveByPushSubscriptionId(username, id)
+            .map(PushSubscription::verified)
+            .flatMap(newPushSubscription -> dao.insert(username, newPushSubscription))
+            .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+            .then();
+    }
+
+    @Override
+    public Publisher<Void> revoke(Username username, PushSubscriptionId id) {
+        return Mono.from(retrieveByPushSubscriptionId(username, id))
+            .flatMap(subscription -> dao.deleteOne(username, subscription.deviceClientId()))
+            .switchIfEmpty(Mono.empty());
+    }
+
+    @Override
+    public Publisher<PushSubscription> get(Username username, Set<PushSubscriptionId> ids) {
+        return dao.selectAll(username)
+            .filter(subscription -> ids.contains(subscription.id()))
+            .filter(subscription -> isNotOutdatedSubscription(subscription, clock));
+    }
+
+    @Override
+    public Publisher<PushSubscription> list(Username username) {
+        return dao.selectAll(username)
+            .filter(subscription -> isNotOutdatedSubscription(subscription, clock));
+    }
+
+    private Mono<PushSubscription> retrieveByPushSubscriptionId(Username username, PushSubscriptionId id) {
+        return dao.selectAll(username).filter(subscription -> subscription.id().equals(id)).next();
+    }
+
+    private boolean isUniqueDeviceClientId(Username username, String deviceClientId) {
+        return Boolean.TRUE.equals(dao.selectAll(username)
+            .filter(subscription -> subscription.deviceClientId().equals(deviceClientId))
+            .count()
+            .map(value -> value == 0)
+            .block());
+    }
+
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java
new file mode 100644
index 0000000..7d56857
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java
@@ -0,0 +1,34 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription.tables;
+
+public interface CassandraPushSubscriptionTable {
+    String TABLE_NAME = "push_subscription";
+    String USER = "user";
+    String DEVICE_CLIENT_ID = "device_client_id";
+    String ID = "id";
+    String EXPIRES = "expires";
+    String TYPES = "types";
+    String URL = "url";
+    String VERIFICATION_CODE = "verification_code";
+    String ENCRYPT_PUBLIC_KEY = "encrypt_public_key";
+    String ENCRYPT_AUTH_SECRET = "encrypt_auth_secret";
+    String VALIDATED = "validated";
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java
new file mode 100644
index 0000000..b411746
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.jmap.api.change.TypeStateFactory;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepositoryContract;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+
+public class CassandraPushSubscriptionRepositoryTest implements PushSubscriptionRepositoryContract {
+    static final CassandraModule MODULE = CassandraPushSubscriptionModule.MODULE;
+
+    UpdatableTickingClock clock;
+    PushSubscriptionRepository pushSubscriptionRepository;
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
+
+    @BeforeEach
+    void setup(CassandraCluster cassandra) {
+        clock = new UpdatableTickingClock(PushSubscriptionRepositoryContract.NOW());
+        pushSubscriptionRepository = new CassandraPushSubscriptionRepository(
+            new CassandraPushSubscriptionDAO(cassandra.getConf(),
+                new TypeStateFactory(CollectionConverters.asJava(PushSubscriptionRepositoryContract.TYPE_NAME_SET()))),
+            clock);
+    }
+
+    @Override
+    public UpdatableTickingClock clock() {
+        return clock;
+    }
+
+    @Override
+    public PushSubscriptionRepository testee() {
+        return pushSubscriptionRepository;
+    }
+}
diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
index a88a5f8..6d7b7ac 100644
--- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
@@ -30,6 +30,8 @@ import com.google.crypto.tink.subtle.EllipticCurves
 
 import scala.util.Try
 
+import scala.util.Try
+
 object PushSubscriptionId {
   def generate(): PushSubscriptionId = PushSubscriptionId(UUID.randomUUID)
 }
diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
index cf6d28c..7c87498 100644
--- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
+++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
@@ -23,7 +23,7 @@ import java.net.URL
 import java.time.{Clock, Instant, ZoneId, ZonedDateTime}
 
 import org.apache.james.core.Username
-import org.apache.james.jmap.api.model.{DeviceClientId, DeviceClientIdInvalidException, ExpireTimeInvalidException, PushSubscription, PushSubscriptionCreationRequest, PushSubscriptionExpiredTime, PushSubscriptionId, PushSubscriptionNotFoundException, PushSubscriptionServerURL, State, TypeName}
+import org.apache.james.jmap.api.model.{DeviceClientId, DeviceClientIdInvalidException, ExpireTimeInvalidException, InvalidPushSubscriptionKeys, PushSubscription, PushSubscriptionCreationRequest, PushSubscriptionExpiredTime, PushSubscriptionId, PushSubscriptionKeys, PushSubscriptionNotFoundException, PushSubscriptionServerURL, State, TypeName}
 import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepositoryContract.{ALICE, INVALID_EXPIRE, MAX_EXPIRE, VALID_EXPIRE}
 import org.apache.james.utils.UpdatableTickingClock
 import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy}
@@ -61,6 +61,7 @@ case class CustomState(value: String) extends State {
 }
 
 object PushSubscriptionRepositoryContract {
+  val TYPE_NAME_SET: Set[TypeName] = Set(CustomTypeName1, CustomTypeName2)
   val NOW: Instant = Instant.parse("2021-10-25T07:05:39.160Z")
   val ZONE_ID: ZoneId = ZoneId.of("UTC")
   val CLOCK: Clock = Clock.fixed(NOW, ZONE_ID)
@@ -370,5 +371,65 @@ trait PushSubscriptionRepositoryContract {
     assertThatThrownBy(() => SMono.fromPublisher(testee.validateVerificationCode(ALICE, randomId)).block())
       .isInstanceOf(classOf[PushSubscriptionNotFoundException])
   }
+
+  @Test
+  def saveSubscriptionWithFullKeyPairShouldSucceed(): Unit = {
+    val fullKeyPair = Some(PushSubscriptionKeys.apply(p256dh = "p256h", auth = "auth"))
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = fullKeyPair)
+
+    val pushSubscriptionId1 = SMono.fromPublisher(testee.save(ALICE, validRequest)).block().id
+
+    val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId1).asJava)).collectSeq().block()
+
+    assertThat(pushSubscriptions.map(_.keys).toList.asJava).containsExactlyInAnyOrder(fullKeyPair)
+  }
+
+  @Test
+  def saveSubscriptionWithNoneKeyPairShouldSucceed(): Unit = {
+    val emptyKeyPair = None
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyKeyPair)
+    val pushSubscriptionId1 = SMono.fromPublisher(testee.save(ALICE, validRequest)).block().id
+
+    val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId1).asJava)).collectSeq().block()
+
+    assertThat(pushSubscriptions.map(_.keys).toList.asJava).containsExactlyInAnyOrder(emptyKeyPair)
+  }
+
+  @Test
+  def saveSubscriptionWithEmptyP256hKeyShouldFail(): Unit = {
+    val emptyP256hKey = Some(PushSubscriptionKeys.apply(p256dh = "", auth = "auth"))
+
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyP256hKey)
+
+    assertThatThrownBy(() => SMono.fromPublisher(testee.save(ALICE, validRequest)).block())
+      .isInstanceOf(classOf[InvalidPushSubscriptionKeys])
+  }
+
+  @Test
+  def saveSubscriptionWithEmptyAuthKeyShouldFail(): Unit = {
+    val emptyAuthKey = Some(PushSubscriptionKeys.apply(p256dh = "p256dh", auth = ""))
+
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyAuthKey)
+
+    assertThatThrownBy(() => SMono.fromPublisher(testee.save(ALICE, validRequest)).block())
+      .isInstanceOf(classOf[InvalidPushSubscriptionKeys])
+  }
+
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org