You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/14 03:26:11 UTC
[james-project] 06/11: JAMES-3470 CassandraEmailChangeRepository
implementation
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0c6015b92358d0844750652b5dd5c8f049c26698
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Mon Jan 11 10:10:15 2021 +0700
JAMES-3470 CassandraEmailChangeRepository implementation
---
.../change/CassandraEmailChangeModule.java | 58 +++++++
.../change/CassandraEmailChangeRepository.java | 56 ++++++-
.../cassandra/change/EmailChangeRepositoryDAO.java | 172 +++++++++++++++++++++
.../change/tables/CassandraEmailChangeTable.java | 31 ++++
.../change/CassandraEmailChangeRepositoryTest.java | 53 +++++++
5 files changed, 363 insertions(+), 7 deletions(-)
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeModule.java
new file mode 100644
index 0000000..abe9f3d
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeModule.java
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.DataType.cboolean;
+import static com.datastax.driver.core.DataType.frozenSet;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction.ASC;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.KeyCaching.ALL;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.rows;
+import static org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule.ZONED_DATE_TIME;
+import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.UPDATED;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+public interface CassandraEmailChangeModule {
+ CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+ .comment("Holds EmailChange definition. Used to manage Email state in JMAP.")
+ .options(options -> options
+ .clusteringOrder(STATE, ASC)
+ .caching(ALL, rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
+ .statement(statement -> statement
+ .addPartitionKey(ACCOUNT_ID, text())
+ .addClusteringColumn(STATE, timeuuid())
+ .addUDTColumn(DATE, frozen(ZONED_DATE_TIME))
+ .addColumn(IS_DELEGATED, cboolean())
+ .addColumn(CREATED, frozenSet(timeuuid()))
+ .addColumn(UPDATED, frozenSet(timeuuid()))
+ .addColumn(DESTROYED, frozenSet(timeuuid())))
+ .build();
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepository.java
index 1f67589..59b1f77 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepository.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepository.java
@@ -21,39 +21,81 @@ package org.apache.james.jmap.cassandra.change;
import java.util.Optional;
+import javax.inject.Inject;
+
import org.apache.james.jmap.api.change.EmailChange;
import org.apache.james.jmap.api.change.EmailChangeRepository;
import org.apache.james.jmap.api.change.EmailChanges;
import org.apache.james.jmap.api.change.Limit;
import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.exception.ChangeNotFoundException;
import org.apache.james.jmap.api.model.AccountId;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CassandraEmailChangeRepository implements EmailChangeRepository {
+ public static final Limit DEFAULT_NUMBER_OF_CHANGES = Limit.of(5);
+
+ private final EmailChangeRepositoryDAO emailChangeRepositoryDAO;
+
+ @Inject
+ public CassandraEmailChangeRepository(EmailChangeRepositoryDAO emailChangeRepositoryDAO) {
+ this.emailChangeRepositoryDAO = emailChangeRepositoryDAO;
+ }
@Override
public Mono<Void> save(EmailChange change) {
- return Mono.empty();
+ return emailChangeRepositoryDAO.insert(change);
}
@Override
- public Mono<EmailChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxIdsToReturn) {
- return Mono.empty();
+ public Mono<EmailChanges> getSinceState(AccountId accountId, State state, Optional<Limit> maxChanges) {
+ Preconditions.checkNotNull(accountId);
+ Preconditions.checkNotNull(state);
+ maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+ if (state.equals(State.INITIAL)) {
+ return emailChangeRepositoryDAO.getAllChanges(accountId)
+ .filter(change -> !change.isDelegated())
+ .collect(new EmailChanges.Builder.EmailChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+ }
+
+ return emailChangeRepositoryDAO.getChangesSince(accountId, state)
+ .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+ .filter(change -> !change.isDelegated())
+ .filter(change -> !change.getState().equals(state))
+ .collect(new EmailChanges.Builder.EmailChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
}
@Override
- public Mono<EmailChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxIdsToReturn) {
- return Mono.empty();
+ public Mono<EmailChanges> getSinceStateWithDelegation(AccountId accountId, State state, Optional<Limit> maxChanges) {
+ Preconditions.checkNotNull(accountId);
+ Preconditions.checkNotNull(state);
+ maxChanges.ifPresent(limit -> Preconditions.checkArgument(limit.getValue() > 0, "maxChanges must be a positive integer"));
+
+ if (state.equals(State.INITIAL)) {
+ return emailChangeRepositoryDAO.getAllChanges(accountId)
+ .collect(new EmailChanges.Builder.EmailChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
+ }
+
+ return emailChangeRepositoryDAO.getChangesSince(accountId, state)
+ .switchIfEmpty(Flux.error(new ChangeNotFoundException(state, String.format("State '%s' could not be found", state.getValue()))))
+ .filter(change -> !change.getState().equals(state))
+ .collect(new EmailChanges.Builder.EmailChangeCollector(state, maxChanges.orElse(DEFAULT_NUMBER_OF_CHANGES)));
}
@Override
public Mono<State> getLatestState(AccountId accountId) {
- return Mono.just(State.INITIAL);
+ return emailChangeRepositoryDAO.latestStateNotDelegated(accountId)
+ .switchIfEmpty(Mono.just(State.INITIAL));
}
@Override
public Mono<State> getLatestStateWithDelegation(AccountId accountId) {
- return Mono.just(State.INITIAL);
+ return emailChangeRepositoryDAO.latestState(accountId)
+ .switchIfEmpty(Mono.just(State.INITIAL));
}
}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/EmailChangeRepositoryDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/EmailChangeRepositoryDAO.java
new file mode 100644
index 0000000..4799908
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/EmailChangeRepositoryDAO.java
@@ -0,0 +1,172 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.ACCOUNT_ID;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.CREATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.DATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.DESTROYED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.IS_DELEGATED;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.STATE;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.change.tables.CassandraEmailChangeTable.UPDATED;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.jmap.api.change.EmailChange;
+import org.apache.james.jmap.api.change.State;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.model.MessageId;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UserType;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class EmailChangeRepositoryDAO {
+ private final CassandraAsyncExecutor executor;
+ private final UserType zonedDateTimeUserType;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement selectAllStatement;
+ private final PreparedStatement selectFromStatement;
+ private final PreparedStatement selectLatestStatement;
+ private final PreparedStatement selectLatestNotDelegatedStatement;
+
+ @Inject
+ public EmailChangeRepositoryDAO(Session session, CassandraTypesProvider cassandraTypesProvider) {
+ executor = new CassandraAsyncExecutor(session);
+ zonedDateTimeUserType = cassandraTypesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME);
+
+ insertStatement = session.prepare(insertInto(TABLE_NAME)
+ .value(ACCOUNT_ID, bindMarker(ACCOUNT_ID))
+ .value(STATE, bindMarker(STATE))
+ .value(DATE, bindMarker(DATE))
+ .value(IS_DELEGATED, bindMarker(IS_DELEGATED))
+ .value(CREATED, bindMarker(CREATED))
+ .value(UPDATED, bindMarker(UPDATED))
+ .value(DESTROYED, bindMarker(DESTROYED)));
+
+ selectAllStatement = session.prepare(select().from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .orderBy(asc(STATE)));
+
+ selectFromStatement = session.prepare(select().from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .and(gte(STATE, bindMarker(STATE)))
+ .orderBy(asc(STATE)));
+
+ selectLatestStatement = session.prepare(select(STATE)
+ .from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .orderBy(desc(STATE))
+ .limit(1));
+
+ selectLatestNotDelegatedStatement = session.prepare(select(STATE)
+ .from(TABLE_NAME)
+ .where(eq(ACCOUNT_ID, bindMarker(ACCOUNT_ID)))
+ .and(eq(IS_DELEGATED, false))
+ .orderBy(desc(STATE))
+ .limit(1)
+ .allowFiltering());
+ }
+
+ Mono<Void> insert(EmailChange change) {
+ return executor.executeVoid(insertStatement.bind()
+ .setString(ACCOUNT_ID, change.getAccountId().getIdentifier())
+ .setUUID(STATE, change.getState().getValue())
+ .setBool(IS_DELEGATED, change.isDelegated())
+ .setSet(CREATED, toUuidSet(change.getCreated()), UUID.class)
+ .setSet(UPDATED, toUuidSet(change.getUpdated()), UUID.class)
+ .setSet(DESTROYED, toUuidSet(change.getDestroyed()), UUID.class)
+ .setUDTValue(DATE, CassandraZonedDateTimeModule.toUDT(zonedDateTimeUserType, change.getDate())));
+ }
+
+ private ImmutableSet<UUID> toUuidSet(List<MessageId> idSet) {
+ return idSet.stream()
+ .filter(CassandraMessageId.class::isInstance)
+ .map(CassandraMessageId.class::cast)
+ .map(CassandraMessageId::get)
+ .collect(Guavate.toImmutableSet());
+ }
+
+ Flux<EmailChange> getAllChanges(AccountId accountId) {
+ return executor.executeRows(selectAllStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(this::readRow);
+ }
+
+ Flux<EmailChange> getChangesSince(AccountId accountId, State state) {
+ return executor.executeRows(selectFromStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier())
+ .setUUID(STATE, state.getValue()))
+ .map(this::readRow);
+ }
+
+ Mono<State> latestState(AccountId accountId) {
+ return executor.executeSingleRow(selectLatestStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(row -> State.of(row.getUUID(STATE)));
+ }
+
+ Mono<State> latestStateNotDelegated(AccountId accountId) {
+ return executor.executeSingleRow(selectLatestNotDelegatedStatement.bind()
+ .setString(ACCOUNT_ID, accountId.getIdentifier()))
+ .map(row -> State.of(row.getUUID(STATE)));
+ }
+
+ private EmailChange readRow(Row row) {
+ return EmailChange.builder()
+ .accountId(AccountId.fromString(row.getString(ACCOUNT_ID)))
+ .state(State.of(row.getUUID(STATE)))
+ .date(CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(DATE)))
+ .isDelegated(row.getBool(IS_DELEGATED))
+ .created(toIdSet(row.getSet(CREATED, UUID.class)))
+ .updated(toIdSet(row.getSet(UPDATED, UUID.class)))
+ .destroyed(toIdSet(row.getSet(DESTROYED, UUID.class)))
+ .build();
+ }
+
+ private ImmutableList<MessageId> toIdSet(Set<UUID> uuidSet) {
+ return uuidSet.stream()
+ .map(CassandraMessageId.Factory::of)
+ .collect(Guavate.toImmutableList());
+ }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraEmailChangeTable.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraEmailChangeTable.java
new file mode 100644
index 0000000..945f038
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/change/tables/CassandraEmailChangeTable.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change.tables;
+
+public interface CassandraEmailChangeTable {
+ String TABLE_NAME = "email_change";
+ String ACCOUNT_ID = "account_id";
+ String STATE = "state";
+ String DATE = "date";
+ String IS_DELEGATED = "is_delegated";
+ String CREATED = "created";
+ String UPDATED = "updated";
+ String DESTROYED = "destroyed";
+}
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepositoryTest.java
new file mode 100644
index 0000000..50af3ff
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/change/CassandraEmailChangeRepositoryTest.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.change;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.jmap.api.change.EmailChangeRepository;
+import org.apache.james.jmap.api.change.EmailChangeRepositoryContract;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraEmailChangeRepositoryTest implements EmailChangeRepositoryContract {
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(CassandraEmailChangeModule.MODULE,
+ CassandraSchemaVersionModule.MODULE,
+ CassandraZonedDateTimeModule.MODULE));
+
+ EmailChangeRepository emailChangeRepository;
+ EmailChangeRepositoryDAO emailChangeRepositoryDAO;
+
+ @BeforeEach
+ public void setUp(CassandraCluster cassandra) {
+ emailChangeRepositoryDAO = new EmailChangeRepositoryDAO(cassandra.getConf(), cassandra.getTypesProvider());
+ emailChangeRepository = new CassandraEmailChangeRepository(emailChangeRepositoryDAO);
+ }
+
+ @Override
+ public EmailChangeRepository emailChangeRepository() {
+ return emailChangeRepository;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org