You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/22 10:10:01 UTC
[6/6] james-project git commit: JAMES-2647 Cassandra migration task
for mapping sources
JAMES-2647 Cassandra migration task for mapping sources
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d51c7085
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d51c7085
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d51c7085
Branch: refs/heads/master
Commit: d51c7085443c947b3f77e64163d55800dc6e782b
Parents: 8938f5f
Author: Rene Cordier <rc...@linagora.com>
Authored: Mon Jan 21 15:02:55 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:09:38 2019 +0700
----------------------------------------------------------------------
.../versions/CassandraSchemaVersionManager.java | 2 +-
.../guice/protocols/webadmin-cassandra/pom.xml | 4 +
.../modules/server/CassandraRoutesModule.java | 3 +
server/data/data-cassandra/pom.xml | 5 +
.../cassandra/CassandraMappingsSourcesDAO.java | 8 +-
.../CassandraRecipientRewriteTable.java | 9 +-
.../CassandraRecipientRewriteTableDAO.java | 45 ++----
.../migration/MappingsSourcesMigration.java | 65 ++++++++
.../CassandraMappingsSourcesDAOTest.java | 1 -
.../CassandraRecipientRewriteTableDAOTest.java | 10 +-
.../migration/MappingsSourcesMigrationTest.java | 151 +++++++++++++++++++
11 files changed, 256 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index d953f7e..172fdb5 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -34,7 +34,7 @@ import com.google.common.base.Preconditions;
public class CassandraSchemaVersionManager {
public static final SchemaVersion MIN_VERSION = new SchemaVersion(2);
- public static final SchemaVersion MAX_VERSION = new SchemaVersion(6);
+ public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/pom.xml b/server/container/guice/protocols/webadmin-cassandra/pom.xml
index 6554da8..4f7fbcb 100644
--- a/server/container/guice/protocols/webadmin-cassandra/pom.xml
+++ b/server/container/guice/protocols/webadmin-cassandra/pom.xml
@@ -41,6 +41,10 @@
<artifactId>james-server-webadmin-cassandra</artifactId>
</dependency>
<dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-data-cassandra</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 7513df4..01e4caa 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -26,6 +26,7 @@ import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation;
import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration;
+import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
@@ -41,6 +42,7 @@ public class CassandraRoutesModule extends AbstractModule {
private static final SchemaVersion FROM_V3_TO_V4 = new SchemaVersion(3);
private static final SchemaVersion FROM_V4_TO_V5 = new SchemaVersion(4);
private static final SchemaVersion FROM_V5_TO_V6 = new SchemaVersion(5);
+ private static final SchemaVersion FROM_V6_TO_V7 = new SchemaVersion(6);
@Override
protected void configure() {
@@ -57,6 +59,7 @@ public class CassandraRoutesModule extends AbstractModule {
allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class);
allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class);
allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
+ allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class);
bind(SchemaVersion.class)
.annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/pom.xml b/server/data/data-cassandra/pom.xml
index 01780b5..bd3220a 100644
--- a/server/data/data-cassandra/pom.xml
+++ b/server/data/data-cassandra/pom.xml
@@ -150,6 +150,11 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
index 777a50e..6ff3072 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -41,14 +41,14 @@ import com.datastax.driver.core.Session;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-class CassandraMappingsSourcesDAO {
+public class CassandraMappingsSourcesDAO {
private final CassandraAsyncExecutor executor;
private final PreparedStatement insertStatement;
private final PreparedStatement deleteStatement;
private final PreparedStatement retrieveSourcesStatement;
@Inject
- CassandraMappingsSourcesDAO(Session session) {
+ public CassandraMappingsSourcesDAO(Session session) {
this.executor = new CassandraAsyncExecutor(session);
this.insertStatement = prepareInsertStatement(session);
this.deleteStatement = prepareDelete(session);
@@ -77,7 +77,7 @@ class CassandraMappingsSourcesDAO {
.and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE))));
}
- Mono<Void> addMapping(Mapping mapping, MappingSource source) {
+ public Mono<Void> addMapping(Mapping mapping, MappingSource source) {
return executor.executeVoidReactor(insertStatement.bind()
.setString(MAPPING_TYPE, mapping.getType().asPrefix())
.setString(MAPPING_VALUE, mapping.getMappingValue())
@@ -91,7 +91,7 @@ class CassandraMappingsSourcesDAO {
.setString(SOURCE, source.asMailAddressString()));
}
- Flux<MappingSource> retrieveSources(Mapping mapping) {
+ public Flux<MappingSource> retrieveSources(Mapping mapping) {
return executor.executeReactor(retrieveSourcesStatement.bind()
.setString(MAPPING_TYPE, mapping.getType().asPrefix())
.setString(MAPPING_VALUE, mapping.getMappingValue()))
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 4c7a92b..0a51ec1 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -30,6 +30,8 @@ import org.apache.james.rrt.lib.Mappings;
import org.apache.james.rrt.lib.MappingsImpl;
import org.apache.james.util.OptionalUtils;
+import com.github.steveash.guavate.Guavate;
+
public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
@@ -63,7 +65,12 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
@Override
public Map<MappingSource, Mappings> getAllMappings() {
- return cassandraRecipientRewriteTableDAO.getAllMappings().block();
+ return cassandraRecipientRewriteTableDAO.getAllMappings()
+ .collect(Guavate.toImmutableMap(
+ pair -> pair.getLeft(),
+ pair -> MappingsImpl.fromMappings(pair.getRight()),
+ Mappings::union))
+ .block();
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index 52f3516..fe31f1f 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -29,24 +29,23 @@ import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTab
import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
-import java.util.Map;
-
import javax.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.rrt.lib.Mapping;
import org.apache.james.rrt.lib.MappingSource;
-import org.apache.james.rrt.lib.Mappings;
import org.apache.james.rrt.lib.MappingsImpl;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-class CassandraRecipientRewriteTableDAO {
+public class CassandraRecipientRewriteTableDAO {
private final CassandraAsyncExecutor executor;
private final CassandraUtils cassandraUtils;
private final PreparedStatement insertStatement;
@@ -55,7 +54,7 @@ class CassandraRecipientRewriteTableDAO {
private final PreparedStatement retrieveAllMappingsStatement;
@Inject
- CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+ public CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
this.executor = new CassandraAsyncExecutor(session);
this.cassandraUtils = cassandraUtils;
this.insertStatement = prepareInsertStatement(session);
@@ -91,7 +90,7 @@ class CassandraRecipientRewriteTableDAO {
.value(MAPPING, bindMarker(MAPPING)));
}
- Mono<Void> addMapping(MappingSource source, Mapping mapping) {
+ public Mono<Void> addMapping(MappingSource source, Mapping mapping) {
return executor.executeVoidReactor(insertStatement.bind()
.setString(USER, source.getFixedUser())
.setString(DOMAIN, source.getFixedDomain())
@@ -116,35 +115,11 @@ class CassandraRecipientRewriteTableDAO {
.filter(mappings -> !mappings.isEmpty());
}
- Mono<Map<MappingSource, Mappings>> getAllMappings() {
+ public Flux<Pair<MappingSource, Mapping>> getAllMappings() {
return executor.executeReactor(retrieveAllMappingsStatement.bind())
- .map(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
- .collect(Guavate.toImmutableMap(
- UserMapping::getSource,
- UserMapping::toMapping,
- Mappings::union)));
- }
-
- private static class UserMapping {
- private final MappingSource source;
- private final String mapping;
-
- UserMapping(MappingSource source, String mapping) {
- this.source = source;
- this.mapping = mapping;
- }
-
- MappingSource getSource() {
- return source;
- }
-
- String getMapping() {
- return mapping;
- }
-
- Mappings toMapping() {
- return MappingsImpl.fromRawString(getMapping());
- }
+ .flatMapMany(Flux::fromIterable)
+ .map(row -> Pair.of(
+ MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)),
+ Mapping.of(row.getString(MAPPING))));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
new file mode 100644
index 0000000..226add2
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MappingsSourcesMigration implements Migration {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MappingsSourcesMigration.class);
+ private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+ private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+ @Inject
+ public MappingsSourcesMigration(CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO,
+ CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) {
+ this.cassandraRecipientRewriteTableDAO = cassandraRecipientRewriteTableDAO;
+ this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO;
+ }
+
+
+ @Override
+ public Result run() {
+ return cassandraRecipientRewriteTableDAO.getAllMappings()
+ .flatMap(this::migrate)
+ .reduce(Result.COMPLETED, Task::combine)
+ .doOnError(e -> LOGGER.error("Error while migrating mappings sources", e))
+ .onErrorResume(e -> Mono.just(Result.PARTIAL))
+ .block();
+ }
+
+ private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) {
+ return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), mappingEntry.getLeft())
+ .map(any -> Result.COMPLETED)
+ .doOnError(e -> LOGGER.error("Error while performing migration of mappings sources", e))
+ .onErrorResume(e -> Mono.just(Result.PARTIAL));
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
index 5d0a125..b277a4e 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.core.Domain;
import org.apache.james.rrt.lib.Mapping;
import org.apache.james.rrt.lib.MappingSource;
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
index 3abc541..1654166 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -58,7 +58,7 @@ class CassandraRecipientRewriteTableDAOTest {
@Test
void getAllMappingsShouldReturnEmptyByDefault() {
- assertThat(dao.getAllMappings().block()).isEmpty();
+ assertThat(dao.getAllMappings().collectList().block()).isEmpty();
}
@Test
@@ -72,7 +72,7 @@ class CassandraRecipientRewriteTableDAOTest {
void getAllMappingsShouldReturnStoredMapping() {
dao.addMapping(SOURCE, MAPPING).block();
- assertThat(dao.getAllMappings().block()).contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+ assertThat(dao.getAllMappings().collectList().block()).contains(Pair.of(SOURCE, MAPPING));
}
@Test
@@ -90,7 +90,7 @@ class CassandraRecipientRewriteTableDAOTest {
dao.removeMapping(SOURCE, MAPPING).block();
- assertThat(dao.getAllMappings().block()).isEmpty();
+ assertThat(dao.getAllMappings().collectList().block()).isEmpty();
}
@Test
@@ -107,7 +107,7 @@ class CassandraRecipientRewriteTableDAOTest {
dao.addMapping(SOURCE, MAPPING).block();
dao.addMapping(SOURCE, MAPPING_2).block();
- assertThat(dao.getAllMappings().block())
- .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING, MAPPING_2)));
+ assertThat(dao.getAllMappings().collectList().block())
+ .contains(Pair.of(SOURCE, MAPPING), Pair.of(SOURCE, MAPPING_2));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
new file mode 100644
index 0000000..d2c1c69
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
@@ -0,0 +1,151 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRRTModule;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+
+class MappingsSourcesMigrationTest {
+ private static final int THREAD_COUNT = 10;
+ private static final int OPERATION_COUNT = 10;
+ private static final int MAPPING_COUNT = 100;
+
+ private static final String USER = "test";
+ private static final String ADDRESS = "test@domain";
+ private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+ private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+ private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+ private CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+ private MappingsSourcesMigration migration;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+ cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
+
+ migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+ }
+
+ @Test
+ void emptyMigrationShouldSucceed() {
+ assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED);
+ }
+
+ @Test
+ void migrationShouldSucceedWithData() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED);
+ }
+
+ @Test
+ void migrationShouldCreateMappingSourceFromMapping() {
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+ migration.run();
+
+ assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+ .containsExactly(SOURCE);
+ }
+
+ @Test
+ void migrationShouldCreateMultipleMappingSourcesFromMappings() {
+ MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST);
+
+ cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+ cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block();
+
+ migration.run();
+
+ assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+ .containsOnly(SOURCE, source2);
+ }
+
+ @Test
+ void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() {
+ CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+ CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+ migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+ when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException()));
+
+ assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+ }
+
+ @Test
+ void migrationShouldReturnPartialAddMappingFails() {
+ CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+ CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+ migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+ when(cassandraRecipientRewriteTableDAO.getAllMappings())
+ .thenReturn(Flux.just(Pair.of(SOURCE, MAPPING)));
+ when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class)))
+ .thenThrow(new RuntimeException());
+
+ assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+ }
+
+ @Test
+ void migrationShouldBeIdempotentWhenRunMultipleTimes() throws ExecutionException, InterruptedException {
+ IntStream.range(0, MAPPING_COUNT)
+ .forEach(i -> cassandraRecipientRewriteTableDAO
+ .addMapping(MappingSource.parse("source" + i + "@domain"), MAPPING).block());
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> migration.run())
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+ .hasSize(MAPPING_COUNT);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org