You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/12/30 03:35:40 UTC
[james-project] 28/29: JAMES-3462 CassandraMailboxChangeRepository
implementation
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d63b8e0f58a66e63569b9422a6359eadc2c36741
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Wed Dec 23 16:46:22 2020 +0700
JAMES-3462 CassandraMailboxChangeRepository implementation
---
.../change/CassandraMailboxChangeModule.java | 60 +++++++
.../change/CassandraMailboxChangeRepository.java | 52 +++++-
...eRepository.java => CassandraStateFactory.java} | 36 +----
.../change/MailboxChangeRepositoryDAO.java | 176 +++++++++++++++++++++
.../change/tables/CassandraMailboxChangeTable.java | 33 ++++
.../CassandraMailboxChangeRepositoryTest.java | 67 ++++++++
.../change/MailboxChangeRepositoryContract.java | 12 +-
7 files changed, 393 insertions(+), 43 deletions(-)
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java
new file mode 100644
index 0000000..9fa1cb5
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeModule.java
@@ -0,0 +1,60 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.DataType.cboolean;
+import static com.datastax.driver.core.DataType.frozenSet;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction.ASC;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.KeyCaching.ALL;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.rows;
+import static org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule.ZONED_DATE_TIME;
+import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_COUNT_CHANGE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.UPDATED;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+public interface CassandraMailboxChangeModule {
+ CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+ .comment("Holds MailboxChange definition. Used to manage Mailbox state in JMAP.")
+ .options(options -> options
+ .clusteringOrder(STATE, ASC)
+ .caching(ALL, rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
+ .statement(statement -> statement
+ .addPartitionKey(ACCOUNT_ID, text())
+ .addClusteringColumn(STATE, timeuuid())
+ .addUDTColumn(DATE, frozen(ZONED_DATE_TIME))
+ .addColumn(IS_DELEGATED, cboolean())
+ .addColumn(IS_COUNT_CHANGE, cboolean())
+ .addColumn(CREATED, frozenSet(timeuuid()))
+ .addColumn(UPDATED, frozenSet(timeuuid()))
+ .addColumn(DESTROYED, frozenSet(timeuuid())))
+ .build();
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
index 4fe0c0e..b0d9f05 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
@@ -21,39 +21,81 @@ package org.apache.james.jmap.cassandra.change;
import java.util.Optional;
+import javax.inject.Inject;
+
import org.apache.james.jmap.api.change.Limit;
import org.apache.james.jmap.api.change.MailboxChange;
import org.apache.james.jmap.api.change.MailboxChangeRepository;
import org.apache.james.jmap.api.change.MailboxChanges;
import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.exception.ChangeNotFoundException;
import org.apache.james.jmap.api.model.AccountId;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CassandraMailboxChangeRepository implements MailboxChangeRepository {
+ public static final Limit DEFAULT_NUMBER_OF_CHANGES = Limit.of(5);
+
+ private final MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO;
+
+ @Inject
+ public CassandraMailboxChangeRepository(MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO) {
+ this.mailboxChangeRepositoryDAO = mailboxChangeRepositoryDAO;
+ }
@Override
public Mono<Void> save(MailboxChange change) {
- return Mono.empty();
+ return mailboxChangeRepositoryDAO.insert(change);
}
@Override
public Mono<MailboxChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxChanges) {
- return Mono.empty();
+ Preconditions.checkNotNull(accountId);
+ Preconditions.checkNotNull(state);
+ maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+ if (state.equals(State.INITIAL)) {
+ return mailboxChangeRepositoryDAO.getAllChanges(accountId)
+ .filter(change -> !change.isDelegated())
+ .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+ }
+
+ return mailboxChangeRepositoryDAO.getChangesSince(accountId, state)
+ .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+ .filter(change -> !change.isDelegated())
+ .filter(change -> !change.getState().equals(state))
+ .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
}
@Override
public Mono<MailboxChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxChanges) {
- return Mono.empty();
+ Preconditions.checkNotNull(accountId);
+ Preconditions.checkNotNull(state);
+ maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+ if (state.equals(State.INITIAL)) {
+ return mailboxChangeRepositoryDAO.getAllChanges(accountId)
+ .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+ }
+
+ return mailboxChangeRepositoryDAO.getChangesSince(accountId, state)
+ .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+ .filter(change -> !change.getState().equals(state))
+ .collect(new MailboxChanges.MailboxChangesBuilder.MailboxChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
}
@Override
public Mono<State> getLatestState(AccountId accountId) {
- return Mono.just(State.INITIAL);
+ return mailboxChangeRepositoryDAO.latestStateNotDelegated(accountId)
+ .switchIfEmpty(Mono.just(State.INITIAL));
}
@Override
public Mono<State> getLatestStateWithDelegation(AccountId accountId) {
- return Mono.just(State.INITIAL);
+ return mailboxChangeRepositoryDAO.latestState(accountId)
+ .switchIfEmpty(Mono.just(State.INITIAL));
}
}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
similarity index 54%
copy from server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
copy to server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
index 4fe0c0e..fce5dc5 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepository.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraStateFactory.java
@@ -19,41 +19,13 @@
package org.apache.james.jmap.cassandra.change;
-import java.util.Optional;
-
-import org.apache.james.jmap.api.change.Limit;
-import org.apache.james.jmap.api.change.MailboxChange;
-import org.apache.james.jmap.api.change.MailboxChangeRepository;
-import org.apache.james.jmap.api.change.MailboxChanges;
import org.apache.james.jmap.api.change.State;
-import org.apache.james.jmap.api.model.AccountId;
-
-import reactor.core.publisher.Mono;
-
-public class CassandraMailboxChangeRepository implements MailboxChangeRepository {
-
- @Override
- public Mono<Void> save(MailboxChange change) {
- return Mono.empty();
- }
-
- @Override
- public Mono<MailboxChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxChanges) {
- return Mono.empty();
- }
- @Override
- public Mono<MailboxChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxChanges) {
- return Mono.empty();
- }
-
- @Override
- public Mono<State> getLatestState(AccountId accountId) {
- return Mono.just(State.INITIAL);
- }
+import com.datastax.driver.core.utils.UUIDs;
+public class CassandraStateFactory implements State.Factory {
@Override
- public Mono<State> getLatestStateWithDelegation(AccountId accountId) {
- return Mono.just(State.INITIAL);
+ public State generate() {
+ return State.of(UUIDs.timeBased());
}
}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java
new file mode 100644
index 0000000..7232dbb
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/MailboxChangeRepositoryDAO.java
@@ -0,0 +1,176 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_COUNT_CHANGE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraMailboxChangeTable.UPDATED;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.jmap.api.change.MailboxChange;
+import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.model.MailboxId;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UserType;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class MailboxChangeRepositoryDAO {
+ private final CassandraAsyncExecutor executor;
+ private final UserType zonedDateTimeUserType;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement selectAllStatement;
+ private final PreparedStatement selectFromStatement;
+ private final PreparedStatement selectLatestStatement;
+ private final PreparedStatement selectLatestNotDelegatedStatement;
+
+ @Inject
+ public MailboxChangeRepositoryDAO(Session session, CassandraTypesProvider cassandraTypesProvider) {
+ executor = new CassandraAsyncExecutor(session);
+ zonedDateTimeUserType = cassandraTypesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME);
+
+ insertStatement = session.prepare(insertInto(TABLE_NAME)
+ .value(ACCOUNT_ID, bindMarker(ACCOUNT_ID))
+ .value(STATE, bindMarker(STATE))
+ .value(DATE, bindMarker(DATE))
+ .value(IS_DELEGATED, bindMarker(IS_DELEGATED))
+ .value(IS_COUNT_CHANGE, bindMarker(IS_COUNT_CHANGE))
+ .value(CREATED, bindMarker(CREATED))
+ .value(UPDATED, bindMarker(UPDATED))
+ .value(DESTROYED, bindMarker(DESTROYED)));
+
+ selectAllStatement = session.prepare(select().from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .orderBy(asc(STATE)));
+
+ selectFromStatement = session.prepare(select().from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .and(gte(STATE, bindMarker(STATE)))
+ .orderBy(asc(STATE)));
+
+ selectLatestStatement = session.prepare(select(STATE)
+ .from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .orderBy(desc(STATE))
+ .limit(1));
+
+ selectLatestNotDelegatedStatement = session.prepare(select(STATE)
+ .from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .and(eq(IS_DELEGATED, false))
+ .orderBy(desc(STATE))
+ .limit(1)
+ .allowFiltering());
+ }
+
+ Mono<Void> insert(MailboxChange change) {
+ return executor.executeVoid(insertStatement.bind()
+ .setString(ACCOUNT_ID, change.getAccountId().getIdentifier())
+ .setUUID(STATE, change.getState().getValue())
+ .setBool(IS_COUNT_CHANGE, change.isCountChange())
+ .setBool(IS_DELEGATED, change.isDelegated())
+ .setSet(CREATED, toUuidSet(change.getCreated()), UUID.class)
+ .setSet(UPDATED, toUuidSet(change.getUpdated()), UUID.class)
+ .setSet(DESTROYED, toUuidSet(change.getDestroyed()), UUID.class)
+ .setUDTValue(DATE, CassandraZonedDateTimeModule.toUDT(zonedDateTimeUserType, change.getDate())));
+ }
+
+ private ImmutableSet<UUID> toUuidSet(List<MailboxId> idSet) {
+ return idSet.stream()
+ .filter(CassandraId.class::isInstance)
+ .map(CassandraId.class::cast)
+ .map(CassandraId::asUuid)
+ .collect(Guavate.toImmutableSet());
+ }
+
+ Flux<MailboxChange> getAllChanges(AccountId accountId) {
+ return executor.executeRows(selectAllStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(this::readRow);
+ }
+
+ Flux<MailboxChange> getChangesSince(AccountId accountId, State state) {
+ return executor.executeRows(selectFromStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier())
+ .setUUID(STATE, state.getValue()))
+ .map(this::readRow);
+ }
+
+ Mono<State> latestState(AccountId accountId) {
+ return executor.executeSingleRow(selectLatestStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(row -> State.of(row.getUUID(STATE)));
+ }
+
+ Mono<State> latestStateNotDelegated(AccountId accountId) {
+ return executor.executeSingleRow(selectLatestNotDelegatedStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(row -> State.of(row.getUUID(STATE)));
+ }
+
+ private MailboxChange readRow(Row row) {
+ return MailboxChange.builder()
+ .accountId(AccountId.fromString(row.getString(ACCOUNT_ID)))
+ .state(State.of(row.getUUID(STATE)))
+ .date(CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(DATE)))
+ .isCountChange(row.getBool(IS_COUNT_CHANGE))
+ .delegated(row.getBool(IS_DELEGATED))
+ .created(toIdSet(row.getSet(CREATED, UUID.class)))
+ .updated(toIdSet(row.getSet(UPDATED, UUID.class)))
+ .destroyed(toIdSet(row.getSet(DESTROYED, UUID.class)))
+ .build();
+ }
+
+ private ImmutableList<MailboxId> toIdSet(Set<UUID> uuidSet) {
+ return uuidSet.stream()
+ .map(CassandraId::of)
+ .collect(Guavate.toImmutableList());
+ }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java
new file mode 100644
index 0000000..392ebbc
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraMailboxChangeTable.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change.tables;
+
+public interface CassandraMailboxChangeTable {
+
+ String TABLE_NAME = "mailbox_change";
+ String ACCOUNT_ID = "account_id";
+ String STATE = "state";
+ String DATE = "date";
+ String IS_DELEGATED = "is_delegated";
+ String IS_COUNT_CHANGE = "is_count_change";
+ String CREATED = "created";
+ String UPDATED = "updated";
+ String DESTROYED = "destroyed";
+}
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java
new file mode 100644
index 0000000..a6e743a
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraMailboxChangeRepositoryTest.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.jmap.api.change.MailboxChange;
+import org.apache.james.jmap.api.change.MailboxChangeRepository;
+import org.apache.james.jmap.api.change.MailboxChangeRepositoryContract;
+import org.apache.james.jmap.api.change.State;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.model.MailboxId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraMailboxChangeRepositoryTest implements MailboxChangeRepositoryContract {
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(CassandraMailboxChangeModule.MODULE,
+ CassandraSchemaVersionModule.MODULE,
+ CassandraZonedDateTimeModule.MODULE));
+
+ MailboxChangeRepository mailboxChangeRepository;
+ MailboxChangeRepositoryDAO mailboxChangeRepositoryDAO;
+
+ @BeforeEach
+ public void setUp(CassandraCluster cassandra) {
+ mailboxChangeRepositoryDAO = new MailboxChangeRepositoryDAO(cassandra.getConf(), cassandra.getTypesProvider());
+ mailboxChangeRepository = new CassandraMailboxChangeRepository(mailboxChangeRepositoryDAO);
+ }
+
+ @Override
+ public State.Factory stateFactory() {
+ return new CassandraStateFactory();
+ }
+
+ @Override
+ public MailboxChangeRepository mailboxChangeRepository() {
+ return mailboxChangeRepository;
+ }
+
+ @Override
+ public MailboxId generateNewMailboxId() {
+ return CassandraId.timeBased();
+ }
+}
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
index 381714d..4034849 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/change/MailboxChangeRepositoryContract.java
@@ -539,8 +539,8 @@ public interface MailboxChangeRepositoryContract {
.created(ImmutableList.of(id2))
.build();
- repository.save(change1);
- repository.save(change2);
+ repository.save(change1).block();
+ repository.save(change2).block();
assertThat(repository.getSinceState(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
.isFalse();
@@ -568,8 +568,8 @@ public interface MailboxChangeRepositoryContract {
.updated(ImmutableList.of(id2))
.build();
- repository.save(change1);
- repository.save(change2);
+ repository.save(change1).block();
+ repository.save(change2).block();
assertThat(repository.getSinceState(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
.isFalse();
@@ -597,8 +597,8 @@ public interface MailboxChangeRepositoryContract {
.updated(ImmutableList.of(id2))
.build();
- repository.save(change1);
- repository.save(change2);
+ repository.save(change1).block();
+ repository.save(change2).block();
assertThat(repository.getSinceStateWithDelegation(ACCOUNT_ID, State.INITIAL, Optional.empty()).block().isCountChangesOnly())
.isTrue();
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org