You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/12/30 03:35:40 UTC

[james-project] 28/29: JAMES-3462 CassandraMailboxChangeRepository implementation

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d63b8e0f58a66e63569b9422a6359eadc2c36741
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Wed Dec 23 16:46:22 2020 +0700

    JAMES-3462 CassandraMailboxChangeRepository implementation
---
 .../change/CassandraMailboxChangeModule.java       |  60 +++++++
 .../change/CassandraMailboxChangeRepository.java   |  52 +++++-
 ...eRepository.java => CassandraStateFactory.java} |  36 +----
 .../change/MailboxChangeRepositoryDAO.java         | 176 +++++++++++++++++++++
 .../change/tables/CassandraMailboxChangeTable.java |  33 ++++
 .../CassandraMailboxChangeRepositoryTest.java      |  67 ++++++++
 .../change/MailboxChangeRepositoryContract.java    |  12 +-
 7 files changed, 393 insertions(+), 43 deletions(-)

diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java
new file mode 100644
index 0000000..9fa1cb5
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java
@@ -0,0 +1,60 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.DataType.cboolean;
+import static com.datastax.driver.core.DataType.frozenSet;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction.ASC;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.KeyCaching.ALL;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.rows;
+import static org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule.ZONED_DATE_TIME;
+import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_COUNT_CHANGE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.UPDATED;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+public interface CassandraMailboxChangeModule {
+    CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+        .comment("Holds MailboxChange definition. Used to manage Mailbox state in JMAP.")
+        .options(options -> options
+            .clusteringOrder(STATE, ASC)
+            .caching(ALL, rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
+        .statement(statement -> statement
+            .addPartitionKey(ACCOUNT_ID, text())
+            .addClusteringColumn(STATE, timeuuid())
+            .addUDTColumn(DATE, frozen(ZONED_DATE_TIME))
+            .addColumn(IS_DELEGATED, cboolean())
+            .addColumn(IS_COUNT_CHANGE, cboolean())
+            .addColumn(CREATED, frozenSet(timeuuid()))
+            .addColumn(UPDATED, frozenSet(timeuuid()))
+            .addColumn(DESTROYED, frozenSet(timeuuid())))
+        .build();
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
index 4fe0c0e..b0d9f05 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
@@ -21,39 +21,81 @@ package org.apache.james.jmap.cassandra.change;
 
 import java.util.Optional;
 
+import javax.inject.Inject;
+
 import org.apache.james.jmap.api.change.Limit;
 import org.apache.james.jmap.api.change.MailboxChange;
 import org.apache.james.jmap.api.change.MailboxChangeRepository;
 import org.apache.james.jmap.api.change.MailboxChanges;
 import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.exception.ChangeNotFoundException;
 import org.apache.james.jmap.api.model.AccountId;
 
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMailboxChangeRepository implements MailboxChangeRepository {
+    public static final Limit DEFAULT_NUMBER_OF_CHANGES = Limit.of(5);
+
+    private final MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO;
+
+    @Inject
+    public CassandraMailboxChangeRepository(MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO) {
+        this.mailboxChangeRepositoryDAO = mailboxChangeRepositoryDAO;
+    }
 
     @Override
     public Mono<Void> save(MailboxChange change) {
-        return Mono.empty();
+        return mailboxChangeRepositoryDAO.insert(change);
     }
 
     @Override
     public Mono<MailboxChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxChanges) {
-        return Mono.empty();
+        Preconditions.checkNotNull(accountId);
+        Preconditions.checkNotNull(state);
+        maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+        if (state.equals(State.INITIAL)) {
+            return mailboxChangeRepositoryDAO.getAllChanges(accountId)
+                .filter(change -> !change.isDelegated())
+                .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+        }
+
+        return mailboxChangeRepositoryDAO.getChangesSince(accountId, state)
+            .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+            .filter(change -> !change.isDelegated())
+            .filter(change -> !change.getState().equals(state))
+            .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
     }
 
     @Override
     public Mono<MailboxChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxChanges) {
-        return Mono.empty();
+        Preconditions.checkNotNull(accountId);
+        Preconditions.checkNotNull(state);
+        maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+        if (state.equals(State.INITIAL)) {
+            return mailboxChangeRepositoryDAO.getAllChanges(accountId)
+                .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+        }
+
+        return mailboxChangeRepositoryDAO.getChangesSince(accountId, state)
+            .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+            .filter(change -> !change.getState().equals(state))
+            .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
     }
 
     @Override
     public Mono<State> getLatestState(AccountId accountId) {
-        return Mono.just(State.INITIAL);
+        return mailboxChangeRepositoryDAO.latestStateNotDelegated(accountId)
+            .switchIfEmpty(Mono.just(State.INITIAL));
     }
 
     @Override
     public Mono<State> getLatestStateWithDelegation(AccountId accountId) {
-        return Mono.just(State.INITIAL);
+        return mailboxChangeRepositoryDAO.latestState(accountId)
+            .switchIfEmpty(Mono.just(State.INITIAL));
     }
 }
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
similarity index 54%
copy from server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
copy to server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
index 4fe0c0e..fce5dc5 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
@@ -19,41 +19,13 @@
 
 package org.apache.james.jmap.cassandra.change;
 
-import java.util.Optional;
-
-import org.apache.james.jmap.api.change.Limit;
-import org.apache.james.jmap.api.change.MailboxChange;
-import org.apache.james.jmap.api.change.MailboxChangeRepository;
-import org.apache.james.jmap.api.change.MailboxChanges;
 import org.apache.james.jmap.api.change.State;
-import org.apache.james.jmap.api.model.AccountId;
-
-import reactor.core.publisher.Mono;
-
-public class CassandraMailboxChangeRepository implements MailboxChangeRepository {
-
-    @Override
-    public Mono<Void> save(MailboxChange change) {
-        return Mono.empty();
-    }
-
-    @Override
-    public Mono<MailboxChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxChanges) {
-        return Mono.empty();
-    }
 
-    @Override
-    public Mono<MailboxChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxChanges) {
-        return Mono.empty();
-    }
-
-    @Override
-    public Mono<State> getLatestState(AccountId accountId) {
-        return Mono.just(State.INITIAL);
-    }
+import com.datastax.driver.core.utils.UUIDs;
 
+public class CassandraStateFactory implements State.Factory {
     @Override
-    public Mono<State> getLatestStateWithDelegation(AccountId accountId) {
-        return Mono.just(State.INITIAL);
+    public State generate() {
+        return State.of(UUIDs.timeBased());
     }
 }
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java
new file mode 100644
index 0000000..7232dbb
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java
@@ -0,0 +1,176 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_COUNT_CHANGE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.UPDATED;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.jmap.api.change.MailboxChange;
+import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.model.MailboxId;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UserType;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class MailboxChangeRepositoryDAO {
+    private final CassandraAsyncExecutor executor;
+    private final UserType zonedDateTimeUserType;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement selectAllStatement;
+    private final PreparedStatement selectFromStatement;
+    private final PreparedStatement selectLatestStatement;
+    private final PreparedStatement selectLatestNotDelegatedStatement;
+
+    @Inject
+    public MailboxChangeRepositoryDAO(Session session, CassandraTypesProvider cassandraTypesProvider) {
+        executor = new CassandraAsyncExecutor(session);
+        zonedDateTimeUserType = cassandraTypesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME);
+
+        insertStatement = session.prepare(insertInto(TABLE_NAME)
+            .value(ACCOUNT_ID, bindMarker(ACCOUNT_ID))
+            .value(STATE, bindMarker(STATE))
+            .value(DATE, bindMarker(DATE))
+            .value(IS_DELEGATED, bindMarker(IS_DELEGATED))
+            .value(IS_COUNT_CHANGE, bindMarker(IS_COUNT_CHANGE))
+            .value(CREATED, bindMarker(CREATED))
+            .value(UPDATED, bindMarker(UPDATED))
+            .value(DESTROYED, bindMarker(DESTROYED)));
+
+        selectAllStatement = session.prepare(select().from(TABLE_NAME)
+            .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+            .orderBy(asc(STATE)));
+
+        selectFromStatement = session.prepare(select().from(TABLE_NAME)
+            .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+            .and(gte(STATE, bindMarker(STATE)))
+            .orderBy(asc(STATE)));
+
+        selectLatestStatement = session.prepare(select(STATE)
+            .from(TABLE_NAME)
+            .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+            .orderBy(desc(STATE))
+            .limit(1));
+
+        selectLatestNotDelegatedStatement = session.prepare(select(STATE)
+            .from(TABLE_NAME)
+            .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+            .and(eq(IS_DELEGATED, false))
+            .orderBy(desc(STATE))
+            .limit(1)
+            .allowFiltering());
+    }
+
+    Mono<Void> insert(MailboxChange change) {
+        return executor.executeVoid(insertStatement.bind()
+            .setString(ACCOUNT_ID, change.getAccountId().getIdentifier())
+            .setUUID(STATE, change.getState().getValue())
+            .setBool(IS_COUNT_CHANGE, change.isCountChange())
+            .setBool(IS_DELEGATED, change.isDelegated())
+            .setSet(CREATED, toUuidSet(change.getCreated()), UUID.class)
+            .setSet(UPDATED, toUuidSet(change.getUpdated()), UUID.class)
+            .setSet(DESTROYED, toUuidSet(change.getDestroyed()), UUID.class)
+            .setUDTValue(DATE, CassandraZonedDateTimeModule.toUDT(zonedDateTimeUserType, change.getDate())));
+    }
+
+    private ImmutableSet<UUID> toUuidSet(List<MailboxId> idSet) {
+        return idSet.stream()
+            .filter(CassandraId.class::isInstance)
+            .map(CassandraId.class::cast)
+            .map(CassandraId::asUuid)
+            .collect(Guavate.toImmutableSet());
+    }
+
+    Flux<MailboxChange> getAllChanges(AccountId accountId) {
+        return executor.executeRows(selectAllStatement.bind()
+            .setString(ACCOUNT_ID, accountId.getIdentifier()))
+            .map(this::readRow);
+    }
+
+    Flux<MailboxChange> getChangesSince(AccountId accountId, State state) {
+        return executor.executeRows(selectFromStatement.bind()
+                .setString(ACCOUNT_ID, accountId.getIdentifier())
+                .setUUID(STATE, state.getValue()))
+            .map(this::readRow);
+    }
+
+    Mono<State> latestState(AccountId accountId) {
+        return executor.executeSingleRow(selectLatestStatement.bind()
+            .setString(ACCOUNT_ID, accountId.getIdentifier()))
+            .map(row -> State.of(row.getUUID(STATE)));
+    }
+
+    Mono<State> latestStateNotDelegated(AccountId accountId) {
+        return executor.executeSingleRow(selectLatestNotDelegatedStatement.bind()
+            .setString(ACCOUNT_ID, accountId.getIdentifier()))
+            .map(row -> State.of(row.getUUID(STATE)));
+    }
+
+    private MailboxChange readRow(Row row) {
+        return MailboxChange.builder()
+            .accountId(AccountId.fromString(row.getString(ACCOUNT_ID)))
+            .state(State.of(row.getUUID(STATE)))
+            .date(CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(DATE)))
+            .isCountChange(row.getBool(IS_COUNT_CHANGE))
+            .delegated(row.getBool(IS_DELEGATED))
+            .created(toIdSet(row.getSet(CREATED, UUID.class)))
+            .updated(toIdSet(row.getSet(UPDATED, UUID.class)))
+            .destroyed(toIdSet(row.getSet(DESTROYED, UUID.class)))
+            .build();
+    }
+
+    private ImmutableList<MailboxId> toIdSet(Set<UUID> uuidSet) {
+        return uuidSet.stream()
+            .map(CassandraId::of)
+            .collect(Guavate.toImmutableList());
+    }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java
new file mode 100644
index 0000000..392ebbc
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change.tables;
+
+public interface CassandraMailboxChangeTable {
+
+    String TABLE_NAME = "mailbox_change";
+    String ACCOUNT_ID = "account_id";
+    String STATE = "state";
+    String DATE = "date";
+    String IS_DELEGATED = "is_delegated";
+    String IS_COUNT_CHANGE = "is_count_change";
+    String CREATED = "created";
+    String UPDATED = "updated";
+    String DESTROYED = "destroyed";
+}
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java
new file mode 100644
index 0000000..a6e743a
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.jmap.api.change.MailboxChange;
+import org.apache.james.jmap.api.change.MailboxChangeRepository;
+import org.apache.james.jmap.api.change.MailboxChangeRepositoryContract;
+import org.apache.james.jmap.api.change.State;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.model.MailboxId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraMailboxChangeRepositoryTest implements MailboxChangeRepositoryContract {
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+        CassandraModule.aggregateModules(CassandraMailboxChangeModule.MODULE,
+            CassandraSchemaVersionModule.MODULE,
+            CassandraZonedDateTimeModule.MODULE));
+
+    MailboxChangeRepository mailboxChangeRepository;
+    MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO;
+
+    @BeforeEach
+    public void setUp(CassandraCluster cassandra) {
+        mailboxChangeRepositoryDAO = new MailboxChangeRepositoryDAO(cassandra.getConf(), cassandra.getTypesProvider());
+        mailboxChangeRepository = new CassandraMailboxChangeRepository(mailboxChangeRepositoryDAO);
+    }
+
+    @Override
+    public State.Factory stateFactory() {
+        return new CassandraStateFactory();
+    }
+
+    @Override
+    public MailboxChangeRepository mailboxChangeRepository() {
+        return mailboxChangeRepository;
+    }
+
+    @Override
+    public MailboxId generateNewMailboxId() {
+        return CassandraId.timeBased();
+    }
+}
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
index 381714d..4034849 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
@@ -539,8 +539,8 @@ public interface MailboxChangeRepositoryContract {
             .created(ImmutableList.of(id2))
             .build();
 
-        repository.save(change1);
-        repository.save(change2);
+        repository.save(change1).block();
+        repository.save(change2).block();
 
         assertThat(repository.getSinceState(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
             .isFalse();
@@ -568,8 +568,8 @@ public interface MailboxChangeRepositoryContract {
             .updated(ImmutableList.of(id2))
             .build();
 
-        repository.save(change1);
-        repository.save(change2);
+        repository.save(change1).block();
+        repository.save(change2).block();
 
         assertThat(repository.getSinceState(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
             .isFalse();
@@ -597,8 +597,8 @@ public interface MailboxChangeRepositoryContract {
             .updated(ImmutableList.of(id2))
             .build();
 
-        repository.save(change1);
-        repository.save(change2);
+        repository.save(change1).block();
+        repository.save(change2).block();
 
         assertThat(repository.getSinceStateWithDelegation(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
             .isTrue();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org