You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/11/01 04:33:21 UTC

[james-project] 01/05: JAMES-3539 Cassandra implement for PushSubscriptionRepository

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 34175c390e230b215fd494c3fa8cd5dece6c715f
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed Oct 27 17:46:35 2021 +0700

    JAMES-3539 Cassandra implement for PushSubscriptionRepository
---
 server/data/data-jmap-cassandra/pom.xml            |   5 +
 .../CassandraPushSubscriptionDAO.java              | 175 +++++++++++++++++++++
 .../CassandraPushSubscriptionModule.java           |  57 +++++++
 .../CassandraPushSubscriptionRepository.java       | 146 +++++++++++++++++
 .../tables/CassandraPushSubscriptionTable.java     |  34 ++++
 .../CassandraPushSubscriptionRepositoryTest.java   |  62 ++++++++
 .../james/jmap/api/model/PushSubscription.scala    |   2 +
 .../PushSubscriptionRepositoryContract.scala       |  63 +++++++-
 8 files changed, 543 insertions(+), 1 deletion(-)

diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml
index ec5a898..e5b0db7 100644
--- a/server/data/data-jmap-cassandra/pom.xml
+++ b/server/data/data-jmap-cassandra/pom.xml
@@ -92,6 +92,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>metrics-tests</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
new file mode 100644
index 0000000..99b222d
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionDAO.java
@@ -0,0 +1,175 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.DEVICE_CLIENT_ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_AUTH_SECRET;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_PUBLIC_KEY;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.EXPIRES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TYPES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.URL;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.USER;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VALIDATED;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VERIFICATION_CODE;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.change.TypeStateFactory;
+import org.apache.james.jmap.api.model.DeviceClientId;
+import org.apache.james.jmap.api.model.PushSubscription;
+import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
+import org.apache.james.jmap.api.model.PushSubscriptionId;
+import org.apache.james.jmap.api.model.PushSubscriptionKeys;
+import org.apache.james.jmap.api.model.PushSubscriptionServerURL;
+import org.apache.james.jmap.api.model.TypeName;
+import org.apache.james.jmap.api.model.VerificationCode;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import scala.Option;
+import scala.collection.immutable.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+public class CassandraPushSubscriptionDAO {
+    private final TypeStateFactory typeStateFactory;
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement insert;
+    private final PreparedStatement selectAll;
+    private final PreparedStatement deleteOne;
+
+    @Inject
+    public CassandraPushSubscriptionDAO(Session session, TypeStateFactory typeStateFactory) {
+        executor = new CassandraAsyncExecutor(session);
+
+        insert = session.prepare(insertInto(TABLE_NAME)
+            .value(USER, bindMarker(USER))
+            .value(DEVICE_CLIENT_ID, bindMarker(DEVICE_CLIENT_ID))
+            .value(ID, bindMarker(ID))
+            .value(EXPIRES, bindMarker(EXPIRES))
+            .value(TYPES, bindMarker(TYPES))
+            .value(URL, bindMarker(URL))
+            .value(VERIFICATION_CODE, bindMarker(VERIFICATION_CODE))
+            .value(ENCRYPT_PUBLIC_KEY, bindMarker(ENCRYPT_PUBLIC_KEY))
+            .value(ENCRYPT_AUTH_SECRET, bindMarker(ENCRYPT_AUTH_SECRET))
+            .value(VALIDATED, bindMarker(VALIDATED)));
+
+        selectAll = session.prepare(select()
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER))));
+
+        deleteOne = session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(USER, bindMarker(USER)))
+            .and(eq(DEVICE_CLIENT_ID, bindMarker(DEVICE_CLIENT_ID))));
+
+        this.typeStateFactory = typeStateFactory;
+    }
+
+    public Mono<PushSubscription> insert(Username username, PushSubscription subscription) {
+        Set<String> typeNames = CollectionConverters.asJava(subscription.types()
+            .map(TypeName::asString)
+            .toSet());
+        Instant utcInstant = subscription.expires().value().withZoneSameInstant(ZoneOffset.UTC).toInstant();
+
+        BoundStatement insertSubscription = insert.bind()
+            .setString(USER, username.asString())
+            .setString(DEVICE_CLIENT_ID, subscription.deviceClientId())
+            .setUUID(ID, subscription.id().value())
+            .setTimestamp(EXPIRES, Date.from(utcInstant))
+            .setSet(TYPES, typeNames)
+            .setString(URL, subscription.url().value().toString())
+            .setString(VERIFICATION_CODE, subscription.verificationCode())
+            .setBool(VALIDATED, subscription.validated());
+
+        OptionConverters.toJava(subscription.keys())
+            .map(keys -> insertSubscription.setString(ENCRYPT_PUBLIC_KEY, keys.p256dh())
+                .setString(ENCRYPT_AUTH_SECRET, keys.auth()));
+
+        return executor.executeVoid(insertSubscription)
+            .thenReturn(subscription);
+    }
+
+    public Flux<PushSubscription> selectAll(Username username) {
+        return executor.executeRows(selectAll.bind().setString(USER, username.asString()))
+            .map(this::toPushSubscription);
+    }
+
+    public Mono<Void> deleteOne(Username username, String deviceClientId) {
+        return executor.executeVoid(deleteOne.bind()
+            .setString(USER, username.asString())
+            .setString(DEVICE_CLIENT_ID, deviceClientId));
+    }
+
+    private PushSubscription toPushSubscription(Row row) {
+        return PushSubscription.apply(
+            PushSubscriptionId.apply(row.getUUID(ID)),
+            DeviceClientId.apply(row.getString(DEVICE_CLIENT_ID)),
+            PushSubscriptionServerURL.from(row.getString(URL)).get(),
+            toKeys(row),
+            VerificationCode.apply(row.getString(VERIFICATION_CODE)),
+            row.getBool(VALIDATED),
+            toExpires(row),
+            toTypes(row));
+    }
+
+    private Option<PushSubscriptionKeys> toKeys(Row row) {
+        String p256dh = row.getString(ENCRYPT_PUBLIC_KEY);
+        String auth = row.getString(ENCRYPT_AUTH_SECRET);
+        if (p256dh == null && auth == null) {
+            return Option.empty();
+        } else {
+            return Option.apply(PushSubscriptionKeys.apply(p256dh, auth));
+        }
+    }
+
+    private PushSubscriptionExpiredTime toExpires(Row row) {
+        return PushSubscriptionExpiredTime.apply(
+            ZonedDateTime.ofInstant(row.getTimestamp(EXPIRES).toInstant(), ZoneOffset.UTC));
+    }
+
+    private Seq<TypeName> toTypes(Row row) {
+        return CollectionConverters.asScala(row.getSet(TYPES, String.class).stream()
+                .map(string -> typeStateFactory.parse(string).right().get())
+                .collect(ImmutableSet.toImmutableSet()))
+            .toSeq();
+    }
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java
new file mode 100644
index 0000000..49e1065
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionModule.java
@@ -0,0 +1,57 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static com.datastax.driver.core.DataType.cboolean;
+import static com.datastax.driver.core.DataType.frozenSet;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timestamp;
+import static com.datastax.driver.core.DataType.uuid;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.DEVICE_CLIENT_ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_AUTH_SECRET;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ENCRYPT_PUBLIC_KEY;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.EXPIRES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.ID;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.TYPES;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.URL;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.USER;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VALIDATED;
+import static org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable.VERIFICATION_CODE;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.jmap.cassandra.pushsubscription.tables.CassandraPushSubscriptionTable;
+
+public interface CassandraPushSubscriptionModule {
+    CassandraModule MODULE = CassandraModule.builder()
+        .table(CassandraPushSubscriptionTable.TABLE_NAME)
+        .comment("Hold user push subscriptions data")
+        .statement(statement -> statement
+            .addPartitionKey(USER, text())
+            .addClusteringColumn(DEVICE_CLIENT_ID, text())
+            .addColumn(ID, uuid())
+            .addColumn(EXPIRES, timestamp())
+            .addColumn(TYPES, frozenSet(text()))
+            .addColumn(URL, text())
+            .addColumn(VERIFICATION_CODE, text())
+            .addColumn(ENCRYPT_PUBLIC_KEY, text())
+            .addColumn(ENCRYPT_AUTH_SECRET, text())
+            .addColumn(VALIDATED, cboolean()))
+        .build();
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java
new file mode 100644
index 0000000..a6d2c16
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepository.java
@@ -0,0 +1,146 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.evaluateExpiresTime;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInThePast;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isInvalidPushSubscriptionKey;
+import static org.apache.james.jmap.api.pushsubscription.PushSubscriptionHelpers.isNotOutdatedSubscription;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.DeviceClientIdInvalidException;
+import org.apache.james.jmap.api.model.ExpireTimeInvalidException;
+import org.apache.james.jmap.api.model.InvalidPushSubscriptionKeys;
+import org.apache.james.jmap.api.model.PushSubscription;
+import org.apache.james.jmap.api.model.PushSubscriptionCreationRequest;
+import org.apache.james.jmap.api.model.PushSubscriptionExpiredTime;
+import org.apache.james.jmap.api.model.PushSubscriptionId;
+import org.apache.james.jmap.api.model.PushSubscriptionNotFoundException;
+import org.apache.james.jmap.api.model.TypeName;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+public class CassandraPushSubscriptionRepository implements PushSubscriptionRepository {
+    private final CassandraPushSubscriptionDAO dao;
+    private final Clock clock;
+
+    @Inject
+    public CassandraPushSubscriptionRepository(CassandraPushSubscriptionDAO dao, Clock clock) {
+        this.dao = dao;
+        this.clock = clock;
+    }
+
+    @Override
+    public Publisher<PushSubscription> save(Username username, PushSubscriptionCreationRequest request) {
+        return Mono.just(request)
+            .handle((req, sink) -> {
+                if (isInThePast(req.expires(), clock)) {
+                    sink.error(new ExpireTimeInvalidException(req.expires().get().value(), "expires must be greater than now"));
+                }
+                if (!isUniqueDeviceClientId(username, req.deviceClientId())) {
+                    sink.error(new DeviceClientIdInvalidException(req.deviceClientId(), "deviceClientId must be unique"));
+                }
+                if (isInvalidPushSubscriptionKey(req.keys())) {
+                    sink.error(new InvalidPushSubscriptionKeys(req.keys().get()));
+                }
+            })
+            .thenReturn(PushSubscription.from(request,
+                evaluateExpiresTime(OptionConverters.toJava(request.expires().map(PushSubscriptionExpiredTime::value)),
+                    clock)))
+            .flatMap(subscription -> dao.insert(username, subscription).thenReturn(subscription));
+    }
+
+    @Override
+    public Publisher<Void> updateExpireTime(Username username, PushSubscriptionId id, ZonedDateTime newExpire) {
+        return Mono.just(newExpire)
+            .handle((inputTime, sink) -> {
+                if (newExpire.isBefore(ZonedDateTime.now(clock))) {
+                    sink.error(new ExpireTimeInvalidException(inputTime, "expires must be greater than now"));
+                }
+            })
+            .then(retrieveByPushSubscriptionId(username, id)
+                .flatMap(subscription -> dao.insert(username,
+                    subscription.withExpires(evaluateExpiresTime(Optional.of(newExpire), clock))))
+                .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+                .then());
+    }
+
+    @Override
+    public Publisher<Void> updateTypes(Username username, PushSubscriptionId id, Set<TypeName> types) {
+        return retrieveByPushSubscriptionId(username, id)
+            .map(subscription -> subscription.withTypes(CollectionConverters.asScala(types).toSeq()))
+            .flatMap(newPushSubscription -> dao.insert(username, newPushSubscription))
+            .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+            .then();
+    }
+
+    @Override
+    public Publisher<Void> validateVerificationCode(Username username, PushSubscriptionId id) {
+        return retrieveByPushSubscriptionId(username, id)
+            .map(PushSubscription::verified)
+            .flatMap(newPushSubscription -> dao.insert(username, newPushSubscription))
+            .switchIfEmpty(Mono.error(() -> new PushSubscriptionNotFoundException(id)))
+            .then();
+    }
+
+    @Override
+    public Publisher<Void> revoke(Username username, PushSubscriptionId id) {
+        return Mono.from(retrieveByPushSubscriptionId(username, id))
+            .flatMap(subscription -> dao.deleteOne(username, subscription.deviceClientId()))
+            .switchIfEmpty(Mono.empty());
+    }
+
+    @Override
+    public Publisher<PushSubscription> get(Username username, Set<PushSubscriptionId> ids) {
+        return dao.selectAll(username)
+            .filter(subscription -> ids.contains(subscription.id()))
+            .filter(subscription -> isNotOutdatedSubscription(subscription, clock));
+    }
+
+    @Override
+    public Publisher<PushSubscription> list(Username username) {
+        return dao.selectAll(username)
+            .filter(subscription -> isNotOutdatedSubscription(subscription, clock));
+    }
+
+    private Mono<PushSubscription> retrieveByPushSubscriptionId(Username username, PushSubscriptionId id) {
+        return dao.selectAll(username).filter(subscription -> subscription.id().equals(id)).next();
+    }
+
+    private boolean isUniqueDeviceClientId(Username username, String deviceClientId) {
+        return Boolean.TRUE.equals(dao.selectAll(username)
+            .filter(subscription -> subscription.deviceClientId().equals(deviceClientId))
+            .count()
+            .map(value -> value == 0)
+            .block());
+    }
+
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java
new file mode 100644
index 0000000..7d56857
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/pushsubscription/tables/CassandraPushSubscriptionTable.java
@@ -0,0 +1,34 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription.tables;
+
+public interface CassandraPushSubscriptionTable {
+    String TABLE_NAME = "push_subscription";
+    String USER = "user";
+    String DEVICE_CLIENT_ID = "device_client_id";
+    String ID = "id";
+    String EXPIRES = "expires";
+    String TYPES = "types";
+    String URL = "url";
+    String VERIFICATION_CODE = "verification_code";
+    String ENCRYPT_PUBLIC_KEY = "encrypt_public_key";
+    String ENCRYPT_AUTH_SECRET = "encrypt_auth_secret";
+    String VALIDATED = "validated";
+}
\ No newline at end of file
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java
new file mode 100644
index 0000000..b411746
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/pushsubscription/CassandraPushSubscriptionRepositoryTest.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.pushsubscription;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.jmap.api.change.TypeStateFactory;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepository;
+import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepositoryContract;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+
+public class CassandraPushSubscriptionRepositoryTest implements PushSubscriptionRepositoryContract {
+    static final CassandraModule MODULE = CassandraPushSubscriptionModule.MODULE;
+
+    UpdatableTickingClock clock;
+    PushSubscriptionRepository pushSubscriptionRepository;
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
+
+    @BeforeEach
+    void setup(CassandraCluster cassandra) {
+        clock = new UpdatableTickingClock(PushSubscriptionRepositoryContract.NOW());
+        pushSubscriptionRepository = new CassandraPushSubscriptionRepository(
+            new CassandraPushSubscriptionDAO(cassandra.getConf(),
+                new TypeStateFactory(CollectionConverters.asJava(PushSubscriptionRepositoryContract.TYPE_NAME_SET()))),
+            clock);
+    }
+
+    @Override
+    public UpdatableTickingClock clock() {
+        return clock;
+    }
+
+    @Override
+    public PushSubscriptionRepository testee() {
+        return pushSubscriptionRepository;
+    }
+}
diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
index a88a5f8..6d7b7ac 100644
--- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
+++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/model/PushSubscription.scala
@@ -30,6 +30,8 @@ import com.google.crypto.tink.subtle.EllipticCurves
 
 import scala.util.Try
 
+import scala.util.Try
+
 object PushSubscriptionId {
   def generate(): PushSubscriptionId = PushSubscriptionId(UUID.randomUUID)
 }
diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
index cf6d28c..7c87498 100644
--- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
+++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/pushsubscription/PushSubscriptionRepositoryContract.scala
@@ -23,7 +23,7 @@ import java.net.URL
 import java.time.{Clock, Instant, ZoneId, ZonedDateTime}
 
 import org.apache.james.core.Username
-import org.apache.james.jmap.api.model.{DeviceClientId, DeviceClientIdInvalidException, ExpireTimeInvalidException, PushSubscription, PushSubscriptionCreationRequest, PushSubscriptionExpiredTime, PushSubscriptionId, PushSubscriptionNotFoundException, PushSubscriptionServerURL, State, TypeName}
+import org.apache.james.jmap.api.model.{DeviceClientId, DeviceClientIdInvalidException, ExpireTimeInvalidException, InvalidPushSubscriptionKeys, PushSubscription, PushSubscriptionCreationRequest, PushSubscriptionExpiredTime, PushSubscriptionId, PushSubscriptionKeys, PushSubscriptionNotFoundException, PushSubscriptionServerURL, State, TypeName}
 import org.apache.james.jmap.api.pushsubscription.PushSubscriptionRepositoryContract.{ALICE, INVALID_EXPIRE, MAX_EXPIRE, VALID_EXPIRE}
 import org.apache.james.utils.UpdatableTickingClock
 import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy}
@@ -61,6 +61,7 @@ case class CustomState(value: String) extends State {
 }
 
 object PushSubscriptionRepositoryContract {
+  val TYPE_NAME_SET: Set[TypeName] = Set(CustomTypeName1, CustomTypeName2)
   val NOW: Instant = Instant.parse("2021-10-25T07:05:39.160Z")
   val ZONE_ID: ZoneId = ZoneId.of("UTC")
   val CLOCK: Clock = Clock.fixed(NOW, ZONE_ID)
@@ -370,5 +371,65 @@ trait PushSubscriptionRepositoryContract {
     assertThatThrownBy(() => SMono.fromPublisher(testee.validateVerificationCode(ALICE, randomId)).block())
       .isInstanceOf(classOf[PushSubscriptionNotFoundException])
   }
+
+  @Test
+  def saveSubscriptionWithFullKeyPairShouldSucceed(): Unit = {
+    val fullKeyPair = Some(PushSubscriptionKeys.apply(p256dh = "p256h", auth = "auth"))
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = fullKeyPair)
+
+    val pushSubscriptionId1 = SMono.fromPublisher(testee.save(ALICE, validRequest)).block().id
+
+    val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId1).asJava)).collectSeq().block()
+
+    assertThat(pushSubscriptions.map(_.keys).toList.asJava).containsExactlyInAnyOrder(fullKeyPair)
+  }
+
+  @Test
+  def saveSubscriptionWithNoneKeyPairShouldSucceed(): Unit = {
+    val emptyKeyPair = None
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyKeyPair)
+    val pushSubscriptionId1 = SMono.fromPublisher(testee.save(ALICE, validRequest)).block().id
+
+    val pushSubscriptions = SFlux.fromPublisher(testee.get(ALICE, Set(pushSubscriptionId1).asJava)).collectSeq().block()
+
+    assertThat(pushSubscriptions.map(_.keys).toList.asJava).containsExactlyInAnyOrder(emptyKeyPair)
+  }
+
+  @Test
+  def saveSubscriptionWithEmptyP256hKeyShouldFail(): Unit = {
+    val emptyP256hKey = Some(PushSubscriptionKeys.apply(p256dh = "", auth = "auth"))
+
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyP256hKey)
+
+    assertThatThrownBy(() => SMono.fromPublisher(testee.save(ALICE, validRequest)).block())
+      .isInstanceOf(classOf[InvalidPushSubscriptionKeys])
+  }
+
+  @Test
+  def saveSubscriptionWithEmptyAuthKeyShouldFail(): Unit = {
+    val emptyAuthKey = Some(PushSubscriptionKeys.apply(p256dh = "p256dh", auth = ""))
+
+    val validRequest = PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("1"),
+      url = PushSubscriptionServerURL(new URL("https://example.com/push")),
+      types = Seq(CustomTypeName1),
+      keys = emptyAuthKey)
+
+    assertThatThrownBy(() => SMono.fromPublisher(testee.save(ALICE, validRequest)).block())
+      .isInstanceOf(classOf[InvalidPushSubscriptionKeys])
+  }
+
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org